authentik.providers.oauth2.tests.test_backchannel_logout
Test OAuth2 Back-Channel Logout implementation
1"""Test OAuth2 Back-Channel Logout implementation""" 2 3from unittest.mock import Mock, patch 4 5import jwt 6from django.test import RequestFactory 7from django.utils import timezone 8from dramatiq.results.errors import ResultFailure 9from requests import Response 10from requests.exceptions import HTTPError, Timeout 11 12from authentik.core.models import Application, AuthenticatedSession, Session 13from authentik.core.tests.utils import create_test_admin_user, create_test_flow 14from authentik.lib.generators import generate_id 15from authentik.providers.oauth2.id_token import hash_session_key 16from authentik.providers.oauth2.models import ( 17 AccessToken, 18 OAuth2LogoutMethod, 19 OAuth2Provider, 20 RedirectURI, 21 RedirectURIMatchingMode, 22 RefreshToken, 23) 24from authentik.providers.oauth2.tasks import send_backchannel_logout_request 25from authentik.providers.oauth2.tests.utils import OAuthTestCase 26from authentik.providers.oauth2.utils import create_logout_token 27 28 29class TestBackChannelLogout(OAuthTestCase): 30 """Test Back-Channel Logout functionality""" 31 32 def setUp(self) -> None: 33 super().setUp() 34 self.factory = RequestFactory() 35 self.user = create_test_admin_user() 36 self.app = Application.objects.create(name=generate_id(), slug="test-app") 37 self.provider = OAuth2Provider.objects.create( 38 name=generate_id(), 39 authorization_flow=create_test_flow(), 40 redirect_uris=[ 41 RedirectURI(RedirectURIMatchingMode.STRICT, "http://testserver/callback"), 42 ], 43 signing_key=self.keypair, 44 ) 45 self.app.provider = self.provider 46 self.app.save() 47 48 def _create_session(self, session_key=None): 49 """Create a session with the given key or a generated one""" 50 session_key = session_key or f"session-{generate_id()}" 51 session = Session.objects.create( 52 session_key=session_key, 53 expires=timezone.now() + timezone.timedelta(hours=1), 54 last_ip="255.255.255.255", 55 ) 56 auth_session = AuthenticatedSession.objects.create( 57 session=session, 58 user=self.user, 59 ) 60 return auth_session 61 62 def _create_token( 63 self, provider, user, session=None, token_type="access", token_id=None 64 ): # nosec 65 """Create a token of the specified type""" 66 token_id = token_id or f"{token_type}-token-{generate_id()}" 67 kwargs = { 68 "provider": provider, 69 "user": user, 70 "session": session, 71 "token": token_id, 72 "_id_token": "{}", 73 "auth_time": timezone.now(), 74 } 75 76 if token_type == "access": # nosec 77 return AccessToken.objects.create(**kwargs) 78 else: # refresh 79 return RefreshToken.objects.create(**kwargs) 80 81 def _create_provider(self, name=None): 82 """Create an OAuth2 provider""" 83 name = name or f"provider-{generate_id()}" 84 provider = OAuth2Provider.objects.create( 85 name=name, 86 authorization_flow=create_test_flow(), 87 redirect_uris=[ 88 RedirectURI(RedirectURIMatchingMode.STRICT, f"http://{name}/callback"), 89 ], 90 signing_key=self.keypair, 91 ) 92 return provider 93 94 def _create_logout_token( 95 self, 96 provider: OAuth2Provider | None = None, 97 session_id: str | None = None, 98 sub: str | None = None, 99 ): 100 """Create a logout token with the given parameters""" 101 provider = provider or self.provider 102 103 # Create a token with the same issuer that the view will expect 104 # Use the same request object that will be used in the test 105 request = self.factory.post("/backchannel_logout") 106 107 return create_logout_token( 108 iss=provider.get_issuer(request), 109 provider=provider, 110 session_key=session_id, 111 sub=sub, 112 ) 113 114 def _decode_token(self, token, provider=None): 115 """Helper to decode and validate a JWT token""" 116 decoded = self._decode_token_complete(token, provider) 117 return decoded["payload"] 118 119 def _decode_token_complete(self, token, provider=None): 120 """Helper to decode and validate a JWT token into a header, and payload dict""" 121 provider = provider or self.provider 122 key, alg = provider.jwt_key 123 if alg != "HS256": 124 key = provider.signing_key.public_key 125 return jwt.decode_complete( 126 token, key, algorithms=[alg], options={"verify_exp": False, "verify_aud": False} 127 ) 128 129 def test_create_logout_token_variants(self): 130 """Test creating logout tokens with different combinations of parameters""" 131 # Test case 1: With session_id only 132 session_id = "test-session-123" 133 token1 = self._create_logout_token(session_id=session_id) 134 decoded1 = self._decode_token(token1) 135 136 self.assertIn("iss", decoded1) 137 self.assertEqual(decoded1["aud"], self.provider.client_id) 138 self.assertIn("iat", decoded1) 139 self.assertIn("jti", decoded1) 140 self.assertEqual(decoded1["sid"], hash_session_key(session_id)) 141 self.assertIn("events", decoded1) 142 self.assertIn("http://schemas.openid.net/event/backchannel-logout", decoded1["events"]) 143 self.assertNotIn("sub", decoded1) 144 145 # Test case 2: With sub only 146 sub = "user-123" 147 token2 = self._create_logout_token(sub=sub) 148 decoded2 = self._decode_token(token2) 149 150 self.assertEqual(decoded2["sub"], sub) 151 self.assertIn("events", decoded2) 152 self.assertIn("http://schemas.openid.net/event/backchannel-logout", decoded2["events"]) 153 self.assertNotIn("sid", decoded2) 154 155 # Test case 3: With both session_id and sub 156 token3 = self._create_logout_token(session_id=session_id, sub=sub) 157 decoded3 = self._decode_token(token3) 158 159 self.assertEqual(decoded3["sid"], hash_session_key(session_id)) 160 self.assertEqual(decoded3["sub"], sub) 161 self.assertIn("events", decoded3) 162 163 def test_create_logout_token_header_type(self): 164 """Test creating logout tokens and checking if the token header type is correct""" 165 session_id = "test-session-123" 166 token1 = self._create_logout_token(session_id=session_id) 167 168 decoded = self._decode_token_complete(token1) 169 170 self.assertIsNotNone(decoded["header"]) 171 self.assertEqual(decoded["header"]["typ"], "logout+jwt") 172 173 @patch("authentik.providers.oauth2.tasks.get_http_session") 174 def test_send_backchannel_logout_request_scenarios(self, mock_get_session): 175 """Test various scenarios for backchannel logout request task""" 176 # Setup provider with backchannel logout URI 177 178 self.provider.logout_uri = "http://testserver/backchannel_logout" 179 self.provider.logout_method = OAuth2LogoutMethod.BACKCHANNEL 180 self.provider.save() 181 182 # Setup mock session and response 183 mock_session = Mock() 184 mock_get_session.return_value = mock_session 185 mock_response = Mock(spec=Response) 186 mock_response.status_code = 200 187 mock_response.raise_for_status.return_value = None # No exception for successful request 188 mock_session.post.return_value = mock_response 189 190 result = send_backchannel_logout_request.send( 191 self.provider.pk, "http://testserver", sub="test-user-uid" 192 ) 193 self.assertTrue(result) 194 mock_session.post.assert_called_once() 195 call_args = mock_session.post.call_args 196 self.assertIn("logout_token", call_args[1]["data"]) 197 self.assertEqual( 198 call_args[1]["headers"]["Content-Type"], "application/x-www-form-urlencoded" 199 ) 200 201 # Scenario 2: Failed request (400 response) - should raise exception 202 mock_session.post.reset_mock() 203 error_response = Mock(spec=Response) 204 error_response.status_code = 400 205 error_response.raise_for_status.side_effect = HTTPError("HTTP 400") 206 mock_session.post.return_value = error_response 207 with self.assertRaises(ResultFailure): 208 send_backchannel_logout_request.send( 209 self.provider.pk, "http://testserver", sub="test-user-uid" 210 ).get_result() 211 212 # Scenario 3: No URI configured 213 mock_session.post.reset_mock() 214 self.provider.logout_uri = "" 215 self.provider.save() 216 result = send_backchannel_logout_request.send( 217 self.provider.pk, "http://testserver", sub="test-user-uid" 218 ).get_result() 219 self.assertIsNone(result) 220 mock_session.post.assert_not_called() 221 222 # Scenario 4: No sub provided - should fail 223 result = send_backchannel_logout_request.send( 224 self.provider.pk, "http://testserver" 225 ).get_result() 226 self.assertIsNone(result) 227 228 # Scenario 5: Non-existent provider 229 result = send_backchannel_logout_request.send( 230 99999, "http://testserver", sub="test-user-uid" 231 ).get_result() 232 self.assertIsNone(result) 233 234 # Scenario 6: Request timeout 235 mock_session.post.side_effect = Timeout("Request timed out") 236 self.provider.logout_uri = "http://testserver/backchannel_logout" 237 self.provider.logout_method = OAuth2LogoutMethod.BACKCHANNEL 238 self.provider.save() 239 with self.assertRaises(ResultFailure): 240 send_backchannel_logout_request.send( 241 self.provider.pk, "http://testserver", sub="test-user-uid" 242 ).get_result()
30class TestBackChannelLogout(OAuthTestCase): 31 """Test Back-Channel Logout functionality""" 32 33 def setUp(self) -> None: 34 super().setUp() 35 self.factory = RequestFactory() 36 self.user = create_test_admin_user() 37 self.app = Application.objects.create(name=generate_id(), slug="test-app") 38 self.provider = OAuth2Provider.objects.create( 39 name=generate_id(), 40 authorization_flow=create_test_flow(), 41 redirect_uris=[ 42 RedirectURI(RedirectURIMatchingMode.STRICT, "http://testserver/callback"), 43 ], 44 signing_key=self.keypair, 45 ) 46 self.app.provider = self.provider 47 self.app.save() 48 49 def _create_session(self, session_key=None): 50 """Create a session with the given key or a generated one""" 51 session_key = session_key or f"session-{generate_id()}" 52 session = Session.objects.create( 53 session_key=session_key, 54 expires=timezone.now() + timezone.timedelta(hours=1), 55 last_ip="255.255.255.255", 56 ) 57 auth_session = AuthenticatedSession.objects.create( 58 session=session, 59 user=self.user, 60 ) 61 return auth_session 62 63 def _create_token( 64 self, provider, user, session=None, token_type="access", token_id=None 65 ): # nosec 66 """Create a token of the specified type""" 67 token_id = token_id or f"{token_type}-token-{generate_id()}" 68 kwargs = { 69 "provider": provider, 70 "user": user, 71 "session": session, 72 "token": token_id, 73 "_id_token": "{}", 74 "auth_time": timezone.now(), 75 } 76 77 if token_type == "access": # nosec 78 return AccessToken.objects.create(**kwargs) 79 else: # refresh 80 return RefreshToken.objects.create(**kwargs) 81 82 def _create_provider(self, name=None): 83 """Create an OAuth2 provider""" 84 name = name or f"provider-{generate_id()}" 85 provider = OAuth2Provider.objects.create( 86 name=name, 87 authorization_flow=create_test_flow(), 88 redirect_uris=[ 89 RedirectURI(RedirectURIMatchingMode.STRICT, f"http://{name}/callback"), 90 ], 91 signing_key=self.keypair, 92 ) 93 return provider 94 95 def _create_logout_token( 96 self, 97 provider: OAuth2Provider | None = None, 98 session_id: str | None = None, 99 sub: str | None = None, 100 ): 101 """Create a logout token with the given parameters""" 102 provider = provider or self.provider 103 104 # Create a token with the same issuer that the view will expect 105 # Use the same request object that will be used in the test 106 request = self.factory.post("/backchannel_logout") 107 108 return create_logout_token( 109 iss=provider.get_issuer(request), 110 provider=provider, 111 session_key=session_id, 112 sub=sub, 113 ) 114 115 def _decode_token(self, token, provider=None): 116 """Helper to decode and validate a JWT token""" 117 decoded = self._decode_token_complete(token, provider) 118 return decoded["payload"] 119 120 def _decode_token_complete(self, token, provider=None): 121 """Helper to decode and validate a JWT token into a header, and payload dict""" 122 provider = provider or self.provider 123 key, alg = provider.jwt_key 124 if alg != "HS256": 125 key = provider.signing_key.public_key 126 return jwt.decode_complete( 127 token, key, algorithms=[alg], options={"verify_exp": False, "verify_aud": False} 128 ) 129 130 def test_create_logout_token_variants(self): 131 """Test creating logout tokens with different combinations of parameters""" 132 # Test case 1: With session_id only 133 session_id = "test-session-123" 134 token1 = self._create_logout_token(session_id=session_id) 135 decoded1 = self._decode_token(token1) 136 137 self.assertIn("iss", decoded1) 138 self.assertEqual(decoded1["aud"], self.provider.client_id) 139 self.assertIn("iat", decoded1) 140 self.assertIn("jti", decoded1) 141 self.assertEqual(decoded1["sid"], hash_session_key(session_id)) 142 self.assertIn("events", decoded1) 143 self.assertIn("http://schemas.openid.net/event/backchannel-logout", decoded1["events"]) 144 self.assertNotIn("sub", decoded1) 145 146 # Test case 2: With sub only 147 sub = "user-123" 148 token2 = self._create_logout_token(sub=sub) 149 decoded2 = self._decode_token(token2) 150 151 self.assertEqual(decoded2["sub"], sub) 152 self.assertIn("events", decoded2) 153 self.assertIn("http://schemas.openid.net/event/backchannel-logout", decoded2["events"]) 154 self.assertNotIn("sid", decoded2) 155 156 # Test case 3: With both session_id and sub 157 token3 = self._create_logout_token(session_id=session_id, sub=sub) 158 decoded3 = self._decode_token(token3) 159 160 self.assertEqual(decoded3["sid"], hash_session_key(session_id)) 161 self.assertEqual(decoded3["sub"], sub) 162 self.assertIn("events", decoded3) 163 164 def test_create_logout_token_header_type(self): 165 """Test creating logout tokens and checking if the token header type is correct""" 166 session_id = "test-session-123" 167 token1 = self._create_logout_token(session_id=session_id) 168 169 decoded = self._decode_token_complete(token1) 170 171 self.assertIsNotNone(decoded["header"]) 172 self.assertEqual(decoded["header"]["typ"], "logout+jwt") 173 174 @patch("authentik.providers.oauth2.tasks.get_http_session") 175 def test_send_backchannel_logout_request_scenarios(self, mock_get_session): 176 """Test various scenarios for backchannel logout request task""" 177 # Setup provider with backchannel logout URI 178 179 self.provider.logout_uri = "http://testserver/backchannel_logout" 180 self.provider.logout_method = OAuth2LogoutMethod.BACKCHANNEL 181 self.provider.save() 182 183 # Setup mock session and response 184 mock_session = Mock() 185 mock_get_session.return_value = mock_session 186 mock_response = Mock(spec=Response) 187 mock_response.status_code = 200 188 mock_response.raise_for_status.return_value = None # No exception for successful request 189 mock_session.post.return_value = mock_response 190 191 result = send_backchannel_logout_request.send( 192 self.provider.pk, "http://testserver", sub="test-user-uid" 193 ) 194 self.assertTrue(result) 195 mock_session.post.assert_called_once() 196 call_args = mock_session.post.call_args 197 self.assertIn("logout_token", call_args[1]["data"]) 198 self.assertEqual( 199 call_args[1]["headers"]["Content-Type"], "application/x-www-form-urlencoded" 200 ) 201 202 # Scenario 2: Failed request (400 response) - should raise exception 203 mock_session.post.reset_mock() 204 error_response = Mock(spec=Response) 205 error_response.status_code = 400 206 error_response.raise_for_status.side_effect = HTTPError("HTTP 400") 207 mock_session.post.return_value = error_response 208 with self.assertRaises(ResultFailure): 209 send_backchannel_logout_request.send( 210 self.provider.pk, "http://testserver", sub="test-user-uid" 211 ).get_result() 212 213 # Scenario 3: No URI configured 214 mock_session.post.reset_mock() 215 self.provider.logout_uri = "" 216 self.provider.save() 217 result = send_backchannel_logout_request.send( 218 self.provider.pk, "http://testserver", sub="test-user-uid" 219 ).get_result() 220 self.assertIsNone(result) 221 mock_session.post.assert_not_called() 222 223 # Scenario 4: No sub provided - should fail 224 result = send_backchannel_logout_request.send( 225 self.provider.pk, "http://testserver" 226 ).get_result() 227 self.assertIsNone(result) 228 229 # Scenario 5: Non-existent provider 230 result = send_backchannel_logout_request.send( 231 99999, "http://testserver", sub="test-user-uid" 232 ).get_result() 233 self.assertIsNone(result) 234 235 # Scenario 6: Request timeout 236 mock_session.post.side_effect = Timeout("Request timed out") 237 self.provider.logout_uri = "http://testserver/backchannel_logout" 238 self.provider.logout_method = OAuth2LogoutMethod.BACKCHANNEL 239 self.provider.save() 240 with self.assertRaises(ResultFailure): 241 send_backchannel_logout_request.send( 242 self.provider.pk, "http://testserver", sub="test-user-uid" 243 ).get_result()
Test Back-Channel Logout functionality
def
setUp(self) -> None:
33 def setUp(self) -> None: 34 super().setUp() 35 self.factory = RequestFactory() 36 self.user = create_test_admin_user() 37 self.app = Application.objects.create(name=generate_id(), slug="test-app") 38 self.provider = OAuth2Provider.objects.create( 39 name=generate_id(), 40 authorization_flow=create_test_flow(), 41 redirect_uris=[ 42 RedirectURI(RedirectURIMatchingMode.STRICT, "http://testserver/callback"), 43 ], 44 signing_key=self.keypair, 45 ) 46 self.app.provider = self.provider 47 self.app.save()
Hook method for setting up the test fixture before exercising it.
def
test_create_logout_token_variants(self):
130 def test_create_logout_token_variants(self): 131 """Test creating logout tokens with different combinations of parameters""" 132 # Test case 1: With session_id only 133 session_id = "test-session-123" 134 token1 = self._create_logout_token(session_id=session_id) 135 decoded1 = self._decode_token(token1) 136 137 self.assertIn("iss", decoded1) 138 self.assertEqual(decoded1["aud"], self.provider.client_id) 139 self.assertIn("iat", decoded1) 140 self.assertIn("jti", decoded1) 141 self.assertEqual(decoded1["sid"], hash_session_key(session_id)) 142 self.assertIn("events", decoded1) 143 self.assertIn("http://schemas.openid.net/event/backchannel-logout", decoded1["events"]) 144 self.assertNotIn("sub", decoded1) 145 146 # Test case 2: With sub only 147 sub = "user-123" 148 token2 = self._create_logout_token(sub=sub) 149 decoded2 = self._decode_token(token2) 150 151 self.assertEqual(decoded2["sub"], sub) 152 self.assertIn("events", decoded2) 153 self.assertIn("http://schemas.openid.net/event/backchannel-logout", decoded2["events"]) 154 self.assertNotIn("sid", decoded2) 155 156 # Test case 3: With both session_id and sub 157 token3 = self._create_logout_token(session_id=session_id, sub=sub) 158 decoded3 = self._decode_token(token3) 159 160 self.assertEqual(decoded3["sid"], hash_session_key(session_id)) 161 self.assertEqual(decoded3["sub"], sub) 162 self.assertIn("events", decoded3)
Test creating logout tokens with different combinations of parameters
def
test_create_logout_token_header_type(self):
164 def test_create_logout_token_header_type(self): 165 """Test creating logout tokens and checking if the token header type is correct""" 166 session_id = "test-session-123" 167 token1 = self._create_logout_token(session_id=session_id) 168 169 decoded = self._decode_token_complete(token1) 170 171 self.assertIsNotNone(decoded["header"]) 172 self.assertEqual(decoded["header"]["typ"], "logout+jwt")
Test creating logout tokens and checking if the token header type is correct
@patch('authentik.providers.oauth2.tasks.get_http_session')
def
test_send_backchannel_logout_request_scenarios(self, mock_get_session):
174 @patch("authentik.providers.oauth2.tasks.get_http_session") 175 def test_send_backchannel_logout_request_scenarios(self, mock_get_session): 176 """Test various scenarios for backchannel logout request task""" 177 # Setup provider with backchannel logout URI 178 179 self.provider.logout_uri = "http://testserver/backchannel_logout" 180 self.provider.logout_method = OAuth2LogoutMethod.BACKCHANNEL 181 self.provider.save() 182 183 # Setup mock session and response 184 mock_session = Mock() 185 mock_get_session.return_value = mock_session 186 mock_response = Mock(spec=Response) 187 mock_response.status_code = 200 188 mock_response.raise_for_status.return_value = None # No exception for successful request 189 mock_session.post.return_value = mock_response 190 191 result = send_backchannel_logout_request.send( 192 self.provider.pk, "http://testserver", sub="test-user-uid" 193 ) 194 self.assertTrue(result) 195 mock_session.post.assert_called_once() 196 call_args = mock_session.post.call_args 197 self.assertIn("logout_token", call_args[1]["data"]) 198 self.assertEqual( 199 call_args[1]["headers"]["Content-Type"], "application/x-www-form-urlencoded" 200 ) 201 202 # Scenario 2: Failed request (400 response) - should raise exception 203 mock_session.post.reset_mock() 204 error_response = Mock(spec=Response) 205 error_response.status_code = 400 206 error_response.raise_for_status.side_effect = HTTPError("HTTP 400") 207 mock_session.post.return_value = error_response 208 with self.assertRaises(ResultFailure): 209 send_backchannel_logout_request.send( 210 self.provider.pk, "http://testserver", sub="test-user-uid" 211 ).get_result() 212 213 # Scenario 3: No URI configured 214 mock_session.post.reset_mock() 215 self.provider.logout_uri = "" 216 self.provider.save() 217 result = send_backchannel_logout_request.send( 218 self.provider.pk, "http://testserver", sub="test-user-uid" 219 ).get_result() 220 self.assertIsNone(result) 221 mock_session.post.assert_not_called() 222 223 # Scenario 4: No sub provided - should fail 224 result = send_backchannel_logout_request.send( 225 self.provider.pk, "http://testserver" 226 ).get_result() 227 self.assertIsNone(result) 228 229 # Scenario 5: Non-existent provider 230 result = send_backchannel_logout_request.send( 231 99999, "http://testserver", sub="test-user-uid" 232 ).get_result() 233 self.assertIsNone(result) 234 235 # Scenario 6: Request timeout 236 mock_session.post.side_effect = Timeout("Request timed out") 237 self.provider.logout_uri = "http://testserver/backchannel_logout" 238 self.provider.logout_method = OAuth2LogoutMethod.BACKCHANNEL 239 self.provider.save() 240 with self.assertRaises(ResultFailure): 241 send_backchannel_logout_request.send( 242 self.provider.pk, "http://testserver", sub="test-user-uid" 243 ).get_result()
Test various scenarios for backchannel logout request task