authentik.providers.oauth2.tests.test_backchannel_logout

Test OAuth2 Back-Channel Logout implementation

  1"""Test OAuth2 Back-Channel Logout implementation"""
  2
  3from unittest.mock import Mock, patch
  4
  5import jwt
  6from django.test import RequestFactory
  7from django.utils import timezone
  8from dramatiq.results.errors import ResultFailure
  9from requests import Response
 10from requests.exceptions import HTTPError, Timeout
 11
 12from authentik.core.models import Application, AuthenticatedSession, Session
 13from authentik.core.tests.utils import create_test_admin_user, create_test_flow
 14from authentik.lib.generators import generate_id
 15from authentik.providers.oauth2.id_token import hash_session_key
 16from authentik.providers.oauth2.models import (
 17    AccessToken,
 18    OAuth2LogoutMethod,
 19    OAuth2Provider,
 20    RedirectURI,
 21    RedirectURIMatchingMode,
 22    RefreshToken,
 23)
 24from authentik.providers.oauth2.tasks import send_backchannel_logout_request
 25from authentik.providers.oauth2.tests.utils import OAuthTestCase
 26from authentik.providers.oauth2.utils import create_logout_token
 27
 28
 29class TestBackChannelLogout(OAuthTestCase):
 30    """Test Back-Channel Logout functionality"""
 31
 32    def setUp(self) -> None:
 33        super().setUp()
 34        self.factory = RequestFactory()
 35        self.user = create_test_admin_user()
 36        self.app = Application.objects.create(name=generate_id(), slug="test-app")
 37        self.provider = OAuth2Provider.objects.create(
 38            name=generate_id(),
 39            authorization_flow=create_test_flow(),
 40            redirect_uris=[
 41                RedirectURI(RedirectURIMatchingMode.STRICT, "http://testserver/callback"),
 42            ],
 43            signing_key=self.keypair,
 44        )
 45        self.app.provider = self.provider
 46        self.app.save()
 47
 48    def _create_session(self, session_key=None):
 49        """Create a session with the given key or a generated one"""
 50        session_key = session_key or f"session-{generate_id()}"
 51        session = Session.objects.create(
 52            session_key=session_key,
 53            expires=timezone.now() + timezone.timedelta(hours=1),
 54            last_ip="255.255.255.255",
 55        )
 56        auth_session = AuthenticatedSession.objects.create(
 57            session=session,
 58            user=self.user,
 59        )
 60        return auth_session
 61
 62    def _create_token(
 63        self, provider, user, session=None, token_type="access", token_id=None
 64    ):  # nosec
 65        """Create a token of the specified type"""
 66        token_id = token_id or f"{token_type}-token-{generate_id()}"
 67        kwargs = {
 68            "provider": provider,
 69            "user": user,
 70            "session": session,
 71            "token": token_id,
 72            "_id_token": "{}",
 73            "auth_time": timezone.now(),
 74        }
 75
 76        if token_type == "access":  # nosec
 77            return AccessToken.objects.create(**kwargs)
 78        else:  # refresh
 79            return RefreshToken.objects.create(**kwargs)
 80
 81    def _create_provider(self, name=None):
 82        """Create an OAuth2 provider"""
 83        name = name or f"provider-{generate_id()}"
 84        provider = OAuth2Provider.objects.create(
 85            name=name,
 86            authorization_flow=create_test_flow(),
 87            redirect_uris=[
 88                RedirectURI(RedirectURIMatchingMode.STRICT, f"http://{name}/callback"),
 89            ],
 90            signing_key=self.keypair,
 91        )
 92        return provider
 93
 94    def _create_logout_token(
 95        self,
 96        provider: OAuth2Provider | None = None,
 97        session_id: str | None = None,
 98        sub: str | None = None,
 99    ):
100        """Create a logout token with the given parameters"""
101        provider = provider or self.provider
102
103        # Create a token with the same issuer that the view will expect
104        # Use the same request object that will be used in the test
105        request = self.factory.post("/backchannel_logout")
106
107        return create_logout_token(
108            iss=provider.get_issuer(request),
109            provider=provider,
110            session_key=session_id,
111            sub=sub,
112        )
113
114    def _decode_token(self, token, provider=None):
115        """Helper to decode and validate a JWT token"""
116        decoded = self._decode_token_complete(token, provider)
117        return decoded["payload"]
118
119    def _decode_token_complete(self, token, provider=None):
120        """Helper to decode and validate a JWT token into a header, and payload dict"""
121        provider = provider or self.provider
122        key, alg = provider.jwt_key
123        if alg != "HS256":
124            key = provider.signing_key.public_key
125        return jwt.decode_complete(
126            token, key, algorithms=[alg], options={"verify_exp": False, "verify_aud": False}
127        )
128
129    def test_create_logout_token_variants(self):
130        """Test creating logout tokens with different combinations of parameters"""
131        # Test case 1: With session_id only
132        session_id = "test-session-123"
133        token1 = self._create_logout_token(session_id=session_id)
134        decoded1 = self._decode_token(token1)
135
136        self.assertIn("iss", decoded1)
137        self.assertEqual(decoded1["aud"], self.provider.client_id)
138        self.assertIn("iat", decoded1)
139        self.assertIn("jti", decoded1)
140        self.assertEqual(decoded1["sid"], hash_session_key(session_id))
141        self.assertIn("events", decoded1)
142        self.assertIn("http://schemas.openid.net/event/backchannel-logout", decoded1["events"])
143        self.assertNotIn("sub", decoded1)
144
145        # Test case 2: With sub only
146        sub = "user-123"
147        token2 = self._create_logout_token(sub=sub)
148        decoded2 = self._decode_token(token2)
149
150        self.assertEqual(decoded2["sub"], sub)
151        self.assertIn("events", decoded2)
152        self.assertIn("http://schemas.openid.net/event/backchannel-logout", decoded2["events"])
153        self.assertNotIn("sid", decoded2)
154
155        # Test case 3: With both session_id and sub
156        token3 = self._create_logout_token(session_id=session_id, sub=sub)
157        decoded3 = self._decode_token(token3)
158
159        self.assertEqual(decoded3["sid"], hash_session_key(session_id))
160        self.assertEqual(decoded3["sub"], sub)
161        self.assertIn("events", decoded3)
162
163    def test_create_logout_token_header_type(self):
164        """Test creating logout tokens and checking if the token header type is correct"""
165        session_id = "test-session-123"
166        token1 = self._create_logout_token(session_id=session_id)
167
168        decoded = self._decode_token_complete(token1)
169
170        self.assertIsNotNone(decoded["header"])
171        self.assertEqual(decoded["header"]["typ"], "logout+jwt")
172
173    @patch("authentik.providers.oauth2.tasks.get_http_session")
174    def test_send_backchannel_logout_request_scenarios(self, mock_get_session):
175        """Test various scenarios for backchannel logout request task"""
176        # Setup provider with backchannel logout URI
177
178        self.provider.logout_uri = "http://testserver/backchannel_logout"
179        self.provider.logout_method = OAuth2LogoutMethod.BACKCHANNEL
180        self.provider.save()
181
182        # Setup mock session and response
183        mock_session = Mock()
184        mock_get_session.return_value = mock_session
185        mock_response = Mock(spec=Response)
186        mock_response.status_code = 200
187        mock_response.raise_for_status.return_value = None  # No exception for successful request
188        mock_session.post.return_value = mock_response
189
190        result = send_backchannel_logout_request.send(
191            self.provider.pk, "http://testserver", sub="test-user-uid"
192        )
193        self.assertTrue(result)
194        mock_session.post.assert_called_once()
195        call_args = mock_session.post.call_args
196        self.assertIn("logout_token", call_args[1]["data"])
197        self.assertEqual(
198            call_args[1]["headers"]["Content-Type"], "application/x-www-form-urlencoded"
199        )
200
201        # Scenario 2: Failed request (400 response) - should raise exception
202        mock_session.post.reset_mock()
203        error_response = Mock(spec=Response)
204        error_response.status_code = 400
205        error_response.raise_for_status.side_effect = HTTPError("HTTP 400")
206        mock_session.post.return_value = error_response
207        with self.assertRaises(ResultFailure):
208            send_backchannel_logout_request.send(
209                self.provider.pk, "http://testserver", sub="test-user-uid"
210            ).get_result()
211
212        # Scenario 3: No URI configured
213        mock_session.post.reset_mock()
214        self.provider.logout_uri = ""
215        self.provider.save()
216        result = send_backchannel_logout_request.send(
217            self.provider.pk, "http://testserver", sub="test-user-uid"
218        ).get_result()
219        self.assertIsNone(result)
220        mock_session.post.assert_not_called()
221
222        # Scenario 4: No sub provided - should fail
223        result = send_backchannel_logout_request.send(
224            self.provider.pk, "http://testserver"
225        ).get_result()
226        self.assertIsNone(result)
227
228        # Scenario 5: Non-existent provider
229        result = send_backchannel_logout_request.send(
230            99999, "http://testserver", sub="test-user-uid"
231        ).get_result()
232        self.assertIsNone(result)
233
234        # Scenario 6: Request timeout
235        mock_session.post.side_effect = Timeout("Request timed out")
236        self.provider.logout_uri = "http://testserver/backchannel_logout"
237        self.provider.logout_method = OAuth2LogoutMethod.BACKCHANNEL
238        self.provider.save()
239        with self.assertRaises(ResultFailure):
240            send_backchannel_logout_request.send(
241                self.provider.pk, "http://testserver", sub="test-user-uid"
242            ).get_result()
class TestBackChannelLogout(authentik.providers.oauth2.tests.utils.OAuthTestCase):
 30class TestBackChannelLogout(OAuthTestCase):
 31    """Test Back-Channel Logout functionality"""
 32
 33    def setUp(self) -> None:
 34        super().setUp()
 35        self.factory = RequestFactory()
 36        self.user = create_test_admin_user()
 37        self.app = Application.objects.create(name=generate_id(), slug="test-app")
 38        self.provider = OAuth2Provider.objects.create(
 39            name=generate_id(),
 40            authorization_flow=create_test_flow(),
 41            redirect_uris=[
 42                RedirectURI(RedirectURIMatchingMode.STRICT, "http://testserver/callback"),
 43            ],
 44            signing_key=self.keypair,
 45        )
 46        self.app.provider = self.provider
 47        self.app.save()
 48
 49    def _create_session(self, session_key=None):
 50        """Create a session with the given key or a generated one"""
 51        session_key = session_key or f"session-{generate_id()}"
 52        session = Session.objects.create(
 53            session_key=session_key,
 54            expires=timezone.now() + timezone.timedelta(hours=1),
 55            last_ip="255.255.255.255",
 56        )
 57        auth_session = AuthenticatedSession.objects.create(
 58            session=session,
 59            user=self.user,
 60        )
 61        return auth_session
 62
 63    def _create_token(
 64        self, provider, user, session=None, token_type="access", token_id=None
 65    ):  # nosec
 66        """Create a token of the specified type"""
 67        token_id = token_id or f"{token_type}-token-{generate_id()}"
 68        kwargs = {
 69            "provider": provider,
 70            "user": user,
 71            "session": session,
 72            "token": token_id,
 73            "_id_token": "{}",
 74            "auth_time": timezone.now(),
 75        }
 76
 77        if token_type == "access":  # nosec
 78            return AccessToken.objects.create(**kwargs)
 79        else:  # refresh
 80            return RefreshToken.objects.create(**kwargs)
 81
 82    def _create_provider(self, name=None):
 83        """Create an OAuth2 provider"""
 84        name = name or f"provider-{generate_id()}"
 85        provider = OAuth2Provider.objects.create(
 86            name=name,
 87            authorization_flow=create_test_flow(),
 88            redirect_uris=[
 89                RedirectURI(RedirectURIMatchingMode.STRICT, f"http://{name}/callback"),
 90            ],
 91            signing_key=self.keypair,
 92        )
 93        return provider
 94
 95    def _create_logout_token(
 96        self,
 97        provider: OAuth2Provider | None = None,
 98        session_id: str | None = None,
 99        sub: str | None = None,
100    ):
101        """Create a logout token with the given parameters"""
102        provider = provider or self.provider
103
104        # Create a token with the same issuer that the view will expect
105        # Use the same request object that will be used in the test
106        request = self.factory.post("/backchannel_logout")
107
108        return create_logout_token(
109            iss=provider.get_issuer(request),
110            provider=provider,
111            session_key=session_id,
112            sub=sub,
113        )
114
115    def _decode_token(self, token, provider=None):
116        """Helper to decode and validate a JWT token"""
117        decoded = self._decode_token_complete(token, provider)
118        return decoded["payload"]
119
120    def _decode_token_complete(self, token, provider=None):
121        """Helper to decode and validate a JWT token into a header, and payload dict"""
122        provider = provider or self.provider
123        key, alg = provider.jwt_key
124        if alg != "HS256":
125            key = provider.signing_key.public_key
126        return jwt.decode_complete(
127            token, key, algorithms=[alg], options={"verify_exp": False, "verify_aud": False}
128        )
129
130    def test_create_logout_token_variants(self):
131        """Test creating logout tokens with different combinations of parameters"""
132        # Test case 1: With session_id only
133        session_id = "test-session-123"
134        token1 = self._create_logout_token(session_id=session_id)
135        decoded1 = self._decode_token(token1)
136
137        self.assertIn("iss", decoded1)
138        self.assertEqual(decoded1["aud"], self.provider.client_id)
139        self.assertIn("iat", decoded1)
140        self.assertIn("jti", decoded1)
141        self.assertEqual(decoded1["sid"], hash_session_key(session_id))
142        self.assertIn("events", decoded1)
143        self.assertIn("http://schemas.openid.net/event/backchannel-logout", decoded1["events"])
144        self.assertNotIn("sub", decoded1)
145
146        # Test case 2: With sub only
147        sub = "user-123"
148        token2 = self._create_logout_token(sub=sub)
149        decoded2 = self._decode_token(token2)
150
151        self.assertEqual(decoded2["sub"], sub)
152        self.assertIn("events", decoded2)
153        self.assertIn("http://schemas.openid.net/event/backchannel-logout", decoded2["events"])
154        self.assertNotIn("sid", decoded2)
155
156        # Test case 3: With both session_id and sub
157        token3 = self._create_logout_token(session_id=session_id, sub=sub)
158        decoded3 = self._decode_token(token3)
159
160        self.assertEqual(decoded3["sid"], hash_session_key(session_id))
161        self.assertEqual(decoded3["sub"], sub)
162        self.assertIn("events", decoded3)
163
164    def test_create_logout_token_header_type(self):
165        """Test creating logout tokens and checking if the token header type is correct"""
166        session_id = "test-session-123"
167        token1 = self._create_logout_token(session_id=session_id)
168
169        decoded = self._decode_token_complete(token1)
170
171        self.assertIsNotNone(decoded["header"])
172        self.assertEqual(decoded["header"]["typ"], "logout+jwt")
173
174    @patch("authentik.providers.oauth2.tasks.get_http_session")
175    def test_send_backchannel_logout_request_scenarios(self, mock_get_session):
176        """Test various scenarios for backchannel logout request task"""
177        # Setup provider with backchannel logout URI
178
179        self.provider.logout_uri = "http://testserver/backchannel_logout"
180        self.provider.logout_method = OAuth2LogoutMethod.BACKCHANNEL
181        self.provider.save()
182
183        # Setup mock session and response
184        mock_session = Mock()
185        mock_get_session.return_value = mock_session
186        mock_response = Mock(spec=Response)
187        mock_response.status_code = 200
188        mock_response.raise_for_status.return_value = None  # No exception for successful request
189        mock_session.post.return_value = mock_response
190
191        result = send_backchannel_logout_request.send(
192            self.provider.pk, "http://testserver", sub="test-user-uid"
193        )
194        self.assertTrue(result)
195        mock_session.post.assert_called_once()
196        call_args = mock_session.post.call_args
197        self.assertIn("logout_token", call_args[1]["data"])
198        self.assertEqual(
199            call_args[1]["headers"]["Content-Type"], "application/x-www-form-urlencoded"
200        )
201
202        # Scenario 2: Failed request (400 response) - should raise exception
203        mock_session.post.reset_mock()
204        error_response = Mock(spec=Response)
205        error_response.status_code = 400
206        error_response.raise_for_status.side_effect = HTTPError("HTTP 400")
207        mock_session.post.return_value = error_response
208        with self.assertRaises(ResultFailure):
209            send_backchannel_logout_request.send(
210                self.provider.pk, "http://testserver", sub="test-user-uid"
211            ).get_result()
212
213        # Scenario 3: No URI configured
214        mock_session.post.reset_mock()
215        self.provider.logout_uri = ""
216        self.provider.save()
217        result = send_backchannel_logout_request.send(
218            self.provider.pk, "http://testserver", sub="test-user-uid"
219        ).get_result()
220        self.assertIsNone(result)
221        mock_session.post.assert_not_called()
222
223        # Scenario 4: No sub provided - should fail
224        result = send_backchannel_logout_request.send(
225            self.provider.pk, "http://testserver"
226        ).get_result()
227        self.assertIsNone(result)
228
229        # Scenario 5: Non-existent provider
230        result = send_backchannel_logout_request.send(
231            99999, "http://testserver", sub="test-user-uid"
232        ).get_result()
233        self.assertIsNone(result)
234
235        # Scenario 6: Request timeout
236        mock_session.post.side_effect = Timeout("Request timed out")
237        self.provider.logout_uri = "http://testserver/backchannel_logout"
238        self.provider.logout_method = OAuth2LogoutMethod.BACKCHANNEL
239        self.provider.save()
240        with self.assertRaises(ResultFailure):
241            send_backchannel_logout_request.send(
242                self.provider.pk, "http://testserver", sub="test-user-uid"
243            ).get_result()

Test Back-Channel Logout functionality

def setUp(self) -> None:
33    def setUp(self) -> None:
34        super().setUp()
35        self.factory = RequestFactory()
36        self.user = create_test_admin_user()
37        self.app = Application.objects.create(name=generate_id(), slug="test-app")
38        self.provider = OAuth2Provider.objects.create(
39            name=generate_id(),
40            authorization_flow=create_test_flow(),
41            redirect_uris=[
42                RedirectURI(RedirectURIMatchingMode.STRICT, "http://testserver/callback"),
43            ],
44            signing_key=self.keypair,
45        )
46        self.app.provider = self.provider
47        self.app.save()

Hook method for setting up the test fixture before exercising it.

def test_create_logout_token_variants(self):
130    def test_create_logout_token_variants(self):
131        """Test creating logout tokens with different combinations of parameters"""
132        # Test case 1: With session_id only
133        session_id = "test-session-123"
134        token1 = self._create_logout_token(session_id=session_id)
135        decoded1 = self._decode_token(token1)
136
137        self.assertIn("iss", decoded1)
138        self.assertEqual(decoded1["aud"], self.provider.client_id)
139        self.assertIn("iat", decoded1)
140        self.assertIn("jti", decoded1)
141        self.assertEqual(decoded1["sid"], hash_session_key(session_id))
142        self.assertIn("events", decoded1)
143        self.assertIn("http://schemas.openid.net/event/backchannel-logout", decoded1["events"])
144        self.assertNotIn("sub", decoded1)
145
146        # Test case 2: With sub only
147        sub = "user-123"
148        token2 = self._create_logout_token(sub=sub)
149        decoded2 = self._decode_token(token2)
150
151        self.assertEqual(decoded2["sub"], sub)
152        self.assertIn("events", decoded2)
153        self.assertIn("http://schemas.openid.net/event/backchannel-logout", decoded2["events"])
154        self.assertNotIn("sid", decoded2)
155
156        # Test case 3: With both session_id and sub
157        token3 = self._create_logout_token(session_id=session_id, sub=sub)
158        decoded3 = self._decode_token(token3)
159
160        self.assertEqual(decoded3["sid"], hash_session_key(session_id))
161        self.assertEqual(decoded3["sub"], sub)
162        self.assertIn("events", decoded3)

Test creating logout tokens with different combinations of parameters

def test_create_logout_token_header_type(self):
164    def test_create_logout_token_header_type(self):
165        """Test creating logout tokens and checking if the token header type is correct"""
166        session_id = "test-session-123"
167        token1 = self._create_logout_token(session_id=session_id)
168
169        decoded = self._decode_token_complete(token1)
170
171        self.assertIsNotNone(decoded["header"])
172        self.assertEqual(decoded["header"]["typ"], "logout+jwt")

Test creating logout tokens and checking if the token header type is correct

@patch('authentik.providers.oauth2.tasks.get_http_session')
def test_send_backchannel_logout_request_scenarios(self, mock_get_session):
174    @patch("authentik.providers.oauth2.tasks.get_http_session")
175    def test_send_backchannel_logout_request_scenarios(self, mock_get_session):
176        """Test various scenarios for backchannel logout request task"""
177        # Setup provider with backchannel logout URI
178
179        self.provider.logout_uri = "http://testserver/backchannel_logout"
180        self.provider.logout_method = OAuth2LogoutMethod.BACKCHANNEL
181        self.provider.save()
182
183        # Setup mock session and response
184        mock_session = Mock()
185        mock_get_session.return_value = mock_session
186        mock_response = Mock(spec=Response)
187        mock_response.status_code = 200
188        mock_response.raise_for_status.return_value = None  # No exception for successful request
189        mock_session.post.return_value = mock_response
190
191        result = send_backchannel_logout_request.send(
192            self.provider.pk, "http://testserver", sub="test-user-uid"
193        )
194        self.assertTrue(result)
195        mock_session.post.assert_called_once()
196        call_args = mock_session.post.call_args
197        self.assertIn("logout_token", call_args[1]["data"])
198        self.assertEqual(
199            call_args[1]["headers"]["Content-Type"], "application/x-www-form-urlencoded"
200        )
201
202        # Scenario 2: Failed request (400 response) - should raise exception
203        mock_session.post.reset_mock()
204        error_response = Mock(spec=Response)
205        error_response.status_code = 400
206        error_response.raise_for_status.side_effect = HTTPError("HTTP 400")
207        mock_session.post.return_value = error_response
208        with self.assertRaises(ResultFailure):
209            send_backchannel_logout_request.send(
210                self.provider.pk, "http://testserver", sub="test-user-uid"
211            ).get_result()
212
213        # Scenario 3: No URI configured
214        mock_session.post.reset_mock()
215        self.provider.logout_uri = ""
216        self.provider.save()
217        result = send_backchannel_logout_request.send(
218            self.provider.pk, "http://testserver", sub="test-user-uid"
219        ).get_result()
220        self.assertIsNone(result)
221        mock_session.post.assert_not_called()
222
223        # Scenario 4: No sub provided - should fail
224        result = send_backchannel_logout_request.send(
225            self.provider.pk, "http://testserver"
226        ).get_result()
227        self.assertIsNone(result)
228
229        # Scenario 5: Non-existent provider
230        result = send_backchannel_logout_request.send(
231            99999, "http://testserver", sub="test-user-uid"
232        ).get_result()
233        self.assertIsNone(result)
234
235        # Scenario 6: Request timeout
236        mock_session.post.side_effect = Timeout("Request timed out")
237        self.provider.logout_uri = "http://testserver/backchannel_logout"
238        self.provider.logout_method = OAuth2LogoutMethod.BACKCHANNEL
239        self.provider.save()
240        with self.assertRaises(ResultFailure):
241            send_backchannel_logout_request.send(
242                self.provider.pk, "http://testserver", sub="test-user-uid"
243            ).get_result()

Test various scenarios for backchannel logout request task