authentik.providers.oauth2.tests.test_backchannel_logout
Test OAuth2 Back-Channel Logout implementation
1"""Test OAuth2 Back-Channel Logout implementation""" 2 3from unittest.mock import Mock, patch 4 5import jwt 6from django.test import RequestFactory 7from dramatiq.results.errors import ResultFailure 8from requests import Response 9from requests.exceptions import HTTPError, Timeout 10 11from authentik.core.models import Application 12from authentik.core.tests.utils import create_test_admin_user, create_test_flow 13from authentik.lib.generators import generate_id 14from authentik.providers.oauth2.id_token import hash_session_key 15from authentik.providers.oauth2.models import ( 16 OAuth2LogoutMethod, 17 OAuth2Provider, 18 RedirectURI, 19 RedirectURIMatchingMode, 20) 21from authentik.providers.oauth2.tasks import send_backchannel_logout_request 22from authentik.providers.oauth2.tests.utils import OAuthTestCase 23from authentik.providers.oauth2.utils import create_logout_token 24 25 26class TestBackChannelLogout(OAuthTestCase): 27 """Test Back-Channel Logout functionality""" 28 29 def setUp(self) -> None: 30 super().setUp() 31 self.factory = RequestFactory() 32 self.user = create_test_admin_user() 33 self.app = Application.objects.create(name=generate_id(), slug="test-app") 34 self.provider = OAuth2Provider.objects.create( 35 name=generate_id(), 36 authorization_flow=create_test_flow(), 37 redirect_uris=[ 38 RedirectURI(RedirectURIMatchingMode.STRICT, "http://testserver/callback"), 39 ], 40 signing_key=self.keypair, 41 ) 42 self.app.provider = self.provider 43 self.app.save() 44 45 def _create_logout_token( 46 self, 47 provider: OAuth2Provider | None = None, 48 session_id: str | None = None, 49 sub: str | None = None, 50 ): 51 """Create a logout token with the given parameters""" 52 provider = provider or self.provider 53 54 # Create a token with the same issuer that the view will expect 55 # Use the same request object that will be used in the test 56 request = self.factory.post("/backchannel_logout") 57 58 return create_logout_token( 59 iss=provider.get_issuer(request), 60 provider=provider, 61 session_key=session_id, 62 sub=sub, 63 ) 64 65 def _decode_token(self, token, provider=None): 66 """Helper to decode and validate a JWT token""" 67 decoded = self._decode_token_complete(token, provider) 68 return decoded["payload"] 69 70 def _decode_token_complete(self, token, provider=None): 71 """Helper to decode and validate a JWT token into a header, and payload dict""" 72 provider = provider or self.provider 73 key, alg = provider.jwt_key 74 if alg != "HS256": 75 key = provider.signing_key.public_key 76 return jwt.decode_complete( 77 token, key, algorithms=[alg], options={"verify_exp": False, "verify_aud": False} 78 ) 79 80 def test_create_logout_token_variants(self): 81 """Test creating logout tokens with different combinations of parameters""" 82 # Test case 1: With session_id only 83 session_id = "test-session-123" 84 token1 = self._create_logout_token(session_id=session_id) 85 decoded1 = self._decode_token(token1) 86 87 self.assertIn("iss", decoded1) 88 self.assertEqual(decoded1["aud"], self.provider.client_id) 89 self.assertIn("iat", decoded1) 90 self.assertIn("jti", decoded1) 91 self.assertEqual(decoded1["sid"], hash_session_key(session_id)) 92 self.assertIn("events", decoded1) 93 self.assertIn("http://schemas.openid.net/event/backchannel-logout", decoded1["events"]) 94 self.assertNotIn("sub", decoded1) 95 96 # Test case 2: With sub only 97 sub = "user-123" 98 token2 = self._create_logout_token(sub=sub) 99 decoded2 = self._decode_token(token2) 100 101 self.assertEqual(decoded2["sub"], sub) 102 self.assertIn("events", decoded2) 103 self.assertIn("http://schemas.openid.net/event/backchannel-logout", decoded2["events"]) 104 self.assertNotIn("sid", decoded2) 105 106 # Test case 3: With both session_id and sub 107 token3 = self._create_logout_token(session_id=session_id, sub=sub) 108 decoded3 = self._decode_token(token3) 109 110 self.assertEqual(decoded3["sid"], hash_session_key(session_id)) 111 self.assertEqual(decoded3["sub"], sub) 112 self.assertIn("events", decoded3) 113 114 def test_create_logout_token_header_type(self): 115 """Test creating logout tokens and checking if the token header type is correct""" 116 session_id = "test-session-123" 117 token1 = self._create_logout_token(session_id=session_id) 118 119 decoded = self._decode_token_complete(token1) 120 121 self.assertIsNotNone(decoded["header"]) 122 self.assertEqual(decoded["header"]["typ"], "logout+jwt") 123 124 @patch("authentik.providers.oauth2.tasks.get_http_session") 125 def test_send_backchannel_logout_request_scenarios(self, mock_get_session): 126 """Test various scenarios for backchannel logout request task""" 127 # Setup provider with backchannel logout URI 128 129 self.provider.logout_uri = "http://testserver/backchannel_logout" 130 self.provider.logout_method = OAuth2LogoutMethod.BACKCHANNEL 131 self.provider.save() 132 133 # Setup mock session and response 134 mock_session = Mock() 135 mock_get_session.return_value = mock_session 136 mock_response = Mock(spec=Response) 137 mock_response.status_code = 200 138 mock_response.raise_for_status.return_value = None # No exception for successful request 139 mock_session.post.return_value = mock_response 140 141 result = send_backchannel_logout_request.send( 142 self.provider.pk, "http://testserver", sub="test-user-uid" 143 ) 144 self.assertTrue(result) 145 mock_session.post.assert_called_once() 146 call_args = mock_session.post.call_args 147 self.assertIn("logout_token", call_args[1]["data"]) 148 self.assertEqual( 149 call_args[1]["headers"]["Content-Type"], "application/x-www-form-urlencoded" 150 ) 151 152 # Scenario 2: Failed request (400 response) - should raise exception 153 mock_session.post.reset_mock() 154 error_response = Mock(spec=Response) 155 error_response.status_code = 400 156 error_response.raise_for_status.side_effect = HTTPError("HTTP 400") 157 mock_session.post.return_value = error_response 158 with self.assertRaises(ResultFailure): 159 send_backchannel_logout_request.send( 160 self.provider.pk, "http://testserver", sub="test-user-uid" 161 ).get_result() 162 163 # Scenario 3: No URI configured 164 mock_session.post.reset_mock() 165 self.provider.logout_uri = "" 166 self.provider.save() 167 result = send_backchannel_logout_request.send( 168 self.provider.pk, "http://testserver", sub="test-user-uid" 169 ).get_result() 170 self.assertIsNone(result) 171 mock_session.post.assert_not_called() 172 173 # Scenario 4: No sub provided - should fail 174 result = send_backchannel_logout_request.send( 175 self.provider.pk, "http://testserver" 176 ).get_result() 177 self.assertIsNone(result) 178 179 # Scenario 5: Non-existent provider 180 result = send_backchannel_logout_request.send( 181 99999, "http://testserver", sub="test-user-uid" 182 ).get_result() 183 self.assertIsNone(result) 184 185 # Scenario 6: Request timeout 186 mock_session.post.side_effect = Timeout("Request timed out") 187 self.provider.logout_uri = "http://testserver/backchannel_logout" 188 self.provider.logout_method = OAuth2LogoutMethod.BACKCHANNEL 189 self.provider.save() 190 with self.assertRaises(ResultFailure): 191 send_backchannel_logout_request.send( 192 self.provider.pk, "http://testserver", sub="test-user-uid" 193 ).get_result()
27class TestBackChannelLogout(OAuthTestCase): 28 """Test Back-Channel Logout functionality""" 29 30 def setUp(self) -> None: 31 super().setUp() 32 self.factory = RequestFactory() 33 self.user = create_test_admin_user() 34 self.app = Application.objects.create(name=generate_id(), slug="test-app") 35 self.provider = OAuth2Provider.objects.create( 36 name=generate_id(), 37 authorization_flow=create_test_flow(), 38 redirect_uris=[ 39 RedirectURI(RedirectURIMatchingMode.STRICT, "http://testserver/callback"), 40 ], 41 signing_key=self.keypair, 42 ) 43 self.app.provider = self.provider 44 self.app.save() 45 46 def _create_logout_token( 47 self, 48 provider: OAuth2Provider | None = None, 49 session_id: str | None = None, 50 sub: str | None = None, 51 ): 52 """Create a logout token with the given parameters""" 53 provider = provider or self.provider 54 55 # Create a token with the same issuer that the view will expect 56 # Use the same request object that will be used in the test 57 request = self.factory.post("/backchannel_logout") 58 59 return create_logout_token( 60 iss=provider.get_issuer(request), 61 provider=provider, 62 session_key=session_id, 63 sub=sub, 64 ) 65 66 def _decode_token(self, token, provider=None): 67 """Helper to decode and validate a JWT token""" 68 decoded = self._decode_token_complete(token, provider) 69 return decoded["payload"] 70 71 def _decode_token_complete(self, token, provider=None): 72 """Helper to decode and validate a JWT token into a header, and payload dict""" 73 provider = provider or self.provider 74 key, alg = provider.jwt_key 75 if alg != "HS256": 76 key = provider.signing_key.public_key 77 return jwt.decode_complete( 78 token, key, algorithms=[alg], options={"verify_exp": False, "verify_aud": False} 79 ) 80 81 def test_create_logout_token_variants(self): 82 """Test creating logout tokens with different combinations of parameters""" 83 # Test case 1: With session_id only 84 session_id = "test-session-123" 85 token1 = self._create_logout_token(session_id=session_id) 86 decoded1 = self._decode_token(token1) 87 88 self.assertIn("iss", decoded1) 89 self.assertEqual(decoded1["aud"], self.provider.client_id) 90 self.assertIn("iat", decoded1) 91 self.assertIn("jti", decoded1) 92 self.assertEqual(decoded1["sid"], hash_session_key(session_id)) 93 self.assertIn("events", decoded1) 94 self.assertIn("http://schemas.openid.net/event/backchannel-logout", decoded1["events"]) 95 self.assertNotIn("sub", decoded1) 96 97 # Test case 2: With sub only 98 sub = "user-123" 99 token2 = self._create_logout_token(sub=sub) 100 decoded2 = self._decode_token(token2) 101 102 self.assertEqual(decoded2["sub"], sub) 103 self.assertIn("events", decoded2) 104 self.assertIn("http://schemas.openid.net/event/backchannel-logout", decoded2["events"]) 105 self.assertNotIn("sid", decoded2) 106 107 # Test case 3: With both session_id and sub 108 token3 = self._create_logout_token(session_id=session_id, sub=sub) 109 decoded3 = self._decode_token(token3) 110 111 self.assertEqual(decoded3["sid"], hash_session_key(session_id)) 112 self.assertEqual(decoded3["sub"], sub) 113 self.assertIn("events", decoded3) 114 115 def test_create_logout_token_header_type(self): 116 """Test creating logout tokens and checking if the token header type is correct""" 117 session_id = "test-session-123" 118 token1 = self._create_logout_token(session_id=session_id) 119 120 decoded = self._decode_token_complete(token1) 121 122 self.assertIsNotNone(decoded["header"]) 123 self.assertEqual(decoded["header"]["typ"], "logout+jwt") 124 125 @patch("authentik.providers.oauth2.tasks.get_http_session") 126 def test_send_backchannel_logout_request_scenarios(self, mock_get_session): 127 """Test various scenarios for backchannel logout request task""" 128 # Setup provider with backchannel logout URI 129 130 self.provider.logout_uri = "http://testserver/backchannel_logout" 131 self.provider.logout_method = OAuth2LogoutMethod.BACKCHANNEL 132 self.provider.save() 133 134 # Setup mock session and response 135 mock_session = Mock() 136 mock_get_session.return_value = mock_session 137 mock_response = Mock(spec=Response) 138 mock_response.status_code = 200 139 mock_response.raise_for_status.return_value = None # No exception for successful request 140 mock_session.post.return_value = mock_response 141 142 result = send_backchannel_logout_request.send( 143 self.provider.pk, "http://testserver", sub="test-user-uid" 144 ) 145 self.assertTrue(result) 146 mock_session.post.assert_called_once() 147 call_args = mock_session.post.call_args 148 self.assertIn("logout_token", call_args[1]["data"]) 149 self.assertEqual( 150 call_args[1]["headers"]["Content-Type"], "application/x-www-form-urlencoded" 151 ) 152 153 # Scenario 2: Failed request (400 response) - should raise exception 154 mock_session.post.reset_mock() 155 error_response = Mock(spec=Response) 156 error_response.status_code = 400 157 error_response.raise_for_status.side_effect = HTTPError("HTTP 400") 158 mock_session.post.return_value = error_response 159 with self.assertRaises(ResultFailure): 160 send_backchannel_logout_request.send( 161 self.provider.pk, "http://testserver", sub="test-user-uid" 162 ).get_result() 163 164 # Scenario 3: No URI configured 165 mock_session.post.reset_mock() 166 self.provider.logout_uri = "" 167 self.provider.save() 168 result = send_backchannel_logout_request.send( 169 self.provider.pk, "http://testserver", sub="test-user-uid" 170 ).get_result() 171 self.assertIsNone(result) 172 mock_session.post.assert_not_called() 173 174 # Scenario 4: No sub provided - should fail 175 result = send_backchannel_logout_request.send( 176 self.provider.pk, "http://testserver" 177 ).get_result() 178 self.assertIsNone(result) 179 180 # Scenario 5: Non-existent provider 181 result = send_backchannel_logout_request.send( 182 99999, "http://testserver", sub="test-user-uid" 183 ).get_result() 184 self.assertIsNone(result) 185 186 # Scenario 6: Request timeout 187 mock_session.post.side_effect = Timeout("Request timed out") 188 self.provider.logout_uri = "http://testserver/backchannel_logout" 189 self.provider.logout_method = OAuth2LogoutMethod.BACKCHANNEL 190 self.provider.save() 191 with self.assertRaises(ResultFailure): 192 send_backchannel_logout_request.send( 193 self.provider.pk, "http://testserver", sub="test-user-uid" 194 ).get_result()
Test Back-Channel Logout functionality
def
setUp(self) -> None:
30 def setUp(self) -> None: 31 super().setUp() 32 self.factory = RequestFactory() 33 self.user = create_test_admin_user() 34 self.app = Application.objects.create(name=generate_id(), slug="test-app") 35 self.provider = OAuth2Provider.objects.create( 36 name=generate_id(), 37 authorization_flow=create_test_flow(), 38 redirect_uris=[ 39 RedirectURI(RedirectURIMatchingMode.STRICT, "http://testserver/callback"), 40 ], 41 signing_key=self.keypair, 42 ) 43 self.app.provider = self.provider 44 self.app.save()
Hook method for setting up the test fixture before exercising it.
def
test_create_logout_token_variants(self):
81 def test_create_logout_token_variants(self): 82 """Test creating logout tokens with different combinations of parameters""" 83 # Test case 1: With session_id only 84 session_id = "test-session-123" 85 token1 = self._create_logout_token(session_id=session_id) 86 decoded1 = self._decode_token(token1) 87 88 self.assertIn("iss", decoded1) 89 self.assertEqual(decoded1["aud"], self.provider.client_id) 90 self.assertIn("iat", decoded1) 91 self.assertIn("jti", decoded1) 92 self.assertEqual(decoded1["sid"], hash_session_key(session_id)) 93 self.assertIn("events", decoded1) 94 self.assertIn("http://schemas.openid.net/event/backchannel-logout", decoded1["events"]) 95 self.assertNotIn("sub", decoded1) 96 97 # Test case 2: With sub only 98 sub = "user-123" 99 token2 = self._create_logout_token(sub=sub) 100 decoded2 = self._decode_token(token2) 101 102 self.assertEqual(decoded2["sub"], sub) 103 self.assertIn("events", decoded2) 104 self.assertIn("http://schemas.openid.net/event/backchannel-logout", decoded2["events"]) 105 self.assertNotIn("sid", decoded2) 106 107 # Test case 3: With both session_id and sub 108 token3 = self._create_logout_token(session_id=session_id, sub=sub) 109 decoded3 = self._decode_token(token3) 110 111 self.assertEqual(decoded3["sid"], hash_session_key(session_id)) 112 self.assertEqual(decoded3["sub"], sub) 113 self.assertIn("events", decoded3)
Test creating logout tokens with different combinations of parameters
def
test_create_logout_token_header_type(self):
115 def test_create_logout_token_header_type(self): 116 """Test creating logout tokens and checking if the token header type is correct""" 117 session_id = "test-session-123" 118 token1 = self._create_logout_token(session_id=session_id) 119 120 decoded = self._decode_token_complete(token1) 121 122 self.assertIsNotNone(decoded["header"]) 123 self.assertEqual(decoded["header"]["typ"], "logout+jwt")
Test creating logout tokens and checking if the token header type is correct
@patch('authentik.providers.oauth2.tasks.get_http_session')
def
test_send_backchannel_logout_request_scenarios(self, mock_get_session):
125 @patch("authentik.providers.oauth2.tasks.get_http_session") 126 def test_send_backchannel_logout_request_scenarios(self, mock_get_session): 127 """Test various scenarios for backchannel logout request task""" 128 # Setup provider with backchannel logout URI 129 130 self.provider.logout_uri = "http://testserver/backchannel_logout" 131 self.provider.logout_method = OAuth2LogoutMethod.BACKCHANNEL 132 self.provider.save() 133 134 # Setup mock session and response 135 mock_session = Mock() 136 mock_get_session.return_value = mock_session 137 mock_response = Mock(spec=Response) 138 mock_response.status_code = 200 139 mock_response.raise_for_status.return_value = None # No exception for successful request 140 mock_session.post.return_value = mock_response 141 142 result = send_backchannel_logout_request.send( 143 self.provider.pk, "http://testserver", sub="test-user-uid" 144 ) 145 self.assertTrue(result) 146 mock_session.post.assert_called_once() 147 call_args = mock_session.post.call_args 148 self.assertIn("logout_token", call_args[1]["data"]) 149 self.assertEqual( 150 call_args[1]["headers"]["Content-Type"], "application/x-www-form-urlencoded" 151 ) 152 153 # Scenario 2: Failed request (400 response) - should raise exception 154 mock_session.post.reset_mock() 155 error_response = Mock(spec=Response) 156 error_response.status_code = 400 157 error_response.raise_for_status.side_effect = HTTPError("HTTP 400") 158 mock_session.post.return_value = error_response 159 with self.assertRaises(ResultFailure): 160 send_backchannel_logout_request.send( 161 self.provider.pk, "http://testserver", sub="test-user-uid" 162 ).get_result() 163 164 # Scenario 3: No URI configured 165 mock_session.post.reset_mock() 166 self.provider.logout_uri = "" 167 self.provider.save() 168 result = send_backchannel_logout_request.send( 169 self.provider.pk, "http://testserver", sub="test-user-uid" 170 ).get_result() 171 self.assertIsNone(result) 172 mock_session.post.assert_not_called() 173 174 # Scenario 4: No sub provided - should fail 175 result = send_backchannel_logout_request.send( 176 self.provider.pk, "http://testserver" 177 ).get_result() 178 self.assertIsNone(result) 179 180 # Scenario 5: Non-existent provider 181 result = send_backchannel_logout_request.send( 182 99999, "http://testserver", sub="test-user-uid" 183 ).get_result() 184 self.assertIsNone(result) 185 186 # Scenario 6: Request timeout 187 mock_session.post.side_effect = Timeout("Request timed out") 188 self.provider.logout_uri = "http://testserver/backchannel_logout" 189 self.provider.logout_method = OAuth2LogoutMethod.BACKCHANNEL 190 self.provider.save() 191 with self.assertRaises(ResultFailure): 192 send_backchannel_logout_request.send( 193 self.provider.pk, "http://testserver", sub="test-user-uid" 194 ).get_result()
Test various scenarios for backchannel logout request task