authentik.providers.oauth2.tests.test_backchannel_logout

Test OAuth2 Back-Channel Logout implementation

  1"""Test OAuth2 Back-Channel Logout implementation"""
  2
  3from unittest.mock import Mock, patch
  4
  5import jwt
  6from django.test import RequestFactory
  7from dramatiq.results.errors import ResultFailure
  8from requests import Response
  9from requests.exceptions import HTTPError, Timeout
 10
 11from authentik.core.models import Application
 12from authentik.core.tests.utils import create_test_admin_user, create_test_flow
 13from authentik.lib.generators import generate_id
 14from authentik.providers.oauth2.id_token import hash_session_key
 15from authentik.providers.oauth2.models import (
 16    OAuth2LogoutMethod,
 17    OAuth2Provider,
 18    RedirectURI,
 19    RedirectURIMatchingMode,
 20)
 21from authentik.providers.oauth2.tasks import send_backchannel_logout_request
 22from authentik.providers.oauth2.tests.utils import OAuthTestCase
 23from authentik.providers.oauth2.utils import create_logout_token
 24
 25
 26class TestBackChannelLogout(OAuthTestCase):
 27    """Test Back-Channel Logout functionality"""
 28
 29    def setUp(self) -> None:
 30        super().setUp()
 31        self.factory = RequestFactory()
 32        self.user = create_test_admin_user()
 33        self.app = Application.objects.create(name=generate_id(), slug="test-app")
 34        self.provider = OAuth2Provider.objects.create(
 35            name=generate_id(),
 36            authorization_flow=create_test_flow(),
 37            redirect_uris=[
 38                RedirectURI(RedirectURIMatchingMode.STRICT, "http://testserver/callback"),
 39            ],
 40            signing_key=self.keypair,
 41        )
 42        self.app.provider = self.provider
 43        self.app.save()
 44
 45    def _create_logout_token(
 46        self,
 47        provider: OAuth2Provider | None = None,
 48        session_id: str | None = None,
 49        sub: str | None = None,
 50    ):
 51        """Create a logout token with the given parameters"""
 52        provider = provider or self.provider
 53
 54        # Create a token with the same issuer that the view will expect
 55        # Use the same request object that will be used in the test
 56        request = self.factory.post("/backchannel_logout")
 57
 58        return create_logout_token(
 59            iss=provider.get_issuer(request),
 60            provider=provider,
 61            session_key=session_id,
 62            sub=sub,
 63        )
 64
 65    def _decode_token(self, token, provider=None):
 66        """Helper to decode and validate a JWT token"""
 67        decoded = self._decode_token_complete(token, provider)
 68        return decoded["payload"]
 69
 70    def _decode_token_complete(self, token, provider=None):
 71        """Helper to decode and validate a JWT token into a header, and payload dict"""
 72        provider = provider or self.provider
 73        key, alg = provider.jwt_key
 74        if alg != "HS256":
 75            key = provider.signing_key.public_key
 76        return jwt.decode_complete(
 77            token, key, algorithms=[alg], options={"verify_exp": False, "verify_aud": False}
 78        )
 79
 80    def test_create_logout_token_variants(self):
 81        """Test creating logout tokens with different combinations of parameters"""
 82        # Test case 1: With session_id only
 83        session_id = "test-session-123"
 84        token1 = self._create_logout_token(session_id=session_id)
 85        decoded1 = self._decode_token(token1)
 86
 87        self.assertIn("iss", decoded1)
 88        self.assertEqual(decoded1["aud"], self.provider.client_id)
 89        self.assertIn("iat", decoded1)
 90        self.assertIn("jti", decoded1)
 91        self.assertEqual(decoded1["sid"], hash_session_key(session_id))
 92        self.assertIn("events", decoded1)
 93        self.assertIn("http://schemas.openid.net/event/backchannel-logout", decoded1["events"])
 94        self.assertNotIn("sub", decoded1)
 95
 96        # Test case 2: With sub only
 97        sub = "user-123"
 98        token2 = self._create_logout_token(sub=sub)
 99        decoded2 = self._decode_token(token2)
100
101        self.assertEqual(decoded2["sub"], sub)
102        self.assertIn("events", decoded2)
103        self.assertIn("http://schemas.openid.net/event/backchannel-logout", decoded2["events"])
104        self.assertNotIn("sid", decoded2)
105
106        # Test case 3: With both session_id and sub
107        token3 = self._create_logout_token(session_id=session_id, sub=sub)
108        decoded3 = self._decode_token(token3)
109
110        self.assertEqual(decoded3["sid"], hash_session_key(session_id))
111        self.assertEqual(decoded3["sub"], sub)
112        self.assertIn("events", decoded3)
113
114    def test_create_logout_token_header_type(self):
115        """Test creating logout tokens and checking if the token header type is correct"""
116        session_id = "test-session-123"
117        token1 = self._create_logout_token(session_id=session_id)
118
119        decoded = self._decode_token_complete(token1)
120
121        self.assertIsNotNone(decoded["header"])
122        self.assertEqual(decoded["header"]["typ"], "logout+jwt")
123
124    @patch("authentik.providers.oauth2.tasks.get_http_session")
125    def test_send_backchannel_logout_request_scenarios(self, mock_get_session):
126        """Test various scenarios for backchannel logout request task"""
127        # Setup provider with backchannel logout URI
128
129        self.provider.logout_uri = "http://testserver/backchannel_logout"
130        self.provider.logout_method = OAuth2LogoutMethod.BACKCHANNEL
131        self.provider.save()
132
133        # Setup mock session and response
134        mock_session = Mock()
135        mock_get_session.return_value = mock_session
136        mock_response = Mock(spec=Response)
137        mock_response.status_code = 200
138        mock_response.raise_for_status.return_value = None  # No exception for successful request
139        mock_session.post.return_value = mock_response
140
141        result = send_backchannel_logout_request.send(
142            self.provider.pk, "http://testserver", sub="test-user-uid"
143        )
144        self.assertTrue(result)
145        mock_session.post.assert_called_once()
146        call_args = mock_session.post.call_args
147        self.assertIn("logout_token", call_args[1]["data"])
148        self.assertEqual(
149            call_args[1]["headers"]["Content-Type"], "application/x-www-form-urlencoded"
150        )
151
152        # Scenario 2: Failed request (400 response) - should raise exception
153        mock_session.post.reset_mock()
154        error_response = Mock(spec=Response)
155        error_response.status_code = 400
156        error_response.raise_for_status.side_effect = HTTPError("HTTP 400")
157        mock_session.post.return_value = error_response
158        with self.assertRaises(ResultFailure):
159            send_backchannel_logout_request.send(
160                self.provider.pk, "http://testserver", sub="test-user-uid"
161            ).get_result()
162
163        # Scenario 3: No URI configured
164        mock_session.post.reset_mock()
165        self.provider.logout_uri = ""
166        self.provider.save()
167        result = send_backchannel_logout_request.send(
168            self.provider.pk, "http://testserver", sub="test-user-uid"
169        ).get_result()
170        self.assertIsNone(result)
171        mock_session.post.assert_not_called()
172
173        # Scenario 4: No sub provided - should fail
174        result = send_backchannel_logout_request.send(
175            self.provider.pk, "http://testserver"
176        ).get_result()
177        self.assertIsNone(result)
178
179        # Scenario 5: Non-existent provider
180        result = send_backchannel_logout_request.send(
181            99999, "http://testserver", sub="test-user-uid"
182        ).get_result()
183        self.assertIsNone(result)
184
185        # Scenario 6: Request timeout
186        mock_session.post.side_effect = Timeout("Request timed out")
187        self.provider.logout_uri = "http://testserver/backchannel_logout"
188        self.provider.logout_method = OAuth2LogoutMethod.BACKCHANNEL
189        self.provider.save()
190        with self.assertRaises(ResultFailure):
191            send_backchannel_logout_request.send(
192                self.provider.pk, "http://testserver", sub="test-user-uid"
193            ).get_result()
class TestBackChannelLogout(authentik.providers.oauth2.tests.utils.OAuthTestCase):
 27class TestBackChannelLogout(OAuthTestCase):
 28    """Test Back-Channel Logout functionality"""
 29
 30    def setUp(self) -> None:
 31        super().setUp()
 32        self.factory = RequestFactory()
 33        self.user = create_test_admin_user()
 34        self.app = Application.objects.create(name=generate_id(), slug="test-app")
 35        self.provider = OAuth2Provider.objects.create(
 36            name=generate_id(),
 37            authorization_flow=create_test_flow(),
 38            redirect_uris=[
 39                RedirectURI(RedirectURIMatchingMode.STRICT, "http://testserver/callback"),
 40            ],
 41            signing_key=self.keypair,
 42        )
 43        self.app.provider = self.provider
 44        self.app.save()
 45
 46    def _create_logout_token(
 47        self,
 48        provider: OAuth2Provider | None = None,
 49        session_id: str | None = None,
 50        sub: str | None = None,
 51    ):
 52        """Create a logout token with the given parameters"""
 53        provider = provider or self.provider
 54
 55        # Create a token with the same issuer that the view will expect
 56        # Use the same request object that will be used in the test
 57        request = self.factory.post("/backchannel_logout")
 58
 59        return create_logout_token(
 60            iss=provider.get_issuer(request),
 61            provider=provider,
 62            session_key=session_id,
 63            sub=sub,
 64        )
 65
 66    def _decode_token(self, token, provider=None):
 67        """Helper to decode and validate a JWT token"""
 68        decoded = self._decode_token_complete(token, provider)
 69        return decoded["payload"]
 70
 71    def _decode_token_complete(self, token, provider=None):
 72        """Helper to decode and validate a JWT token into a header, and payload dict"""
 73        provider = provider or self.provider
 74        key, alg = provider.jwt_key
 75        if alg != "HS256":
 76            key = provider.signing_key.public_key
 77        return jwt.decode_complete(
 78            token, key, algorithms=[alg], options={"verify_exp": False, "verify_aud": False}
 79        )
 80
 81    def test_create_logout_token_variants(self):
 82        """Test creating logout tokens with different combinations of parameters"""
 83        # Test case 1: With session_id only
 84        session_id = "test-session-123"
 85        token1 = self._create_logout_token(session_id=session_id)
 86        decoded1 = self._decode_token(token1)
 87
 88        self.assertIn("iss", decoded1)
 89        self.assertEqual(decoded1["aud"], self.provider.client_id)
 90        self.assertIn("iat", decoded1)
 91        self.assertIn("jti", decoded1)
 92        self.assertEqual(decoded1["sid"], hash_session_key(session_id))
 93        self.assertIn("events", decoded1)
 94        self.assertIn("http://schemas.openid.net/event/backchannel-logout", decoded1["events"])
 95        self.assertNotIn("sub", decoded1)
 96
 97        # Test case 2: With sub only
 98        sub = "user-123"
 99        token2 = self._create_logout_token(sub=sub)
100        decoded2 = self._decode_token(token2)
101
102        self.assertEqual(decoded2["sub"], sub)
103        self.assertIn("events", decoded2)
104        self.assertIn("http://schemas.openid.net/event/backchannel-logout", decoded2["events"])
105        self.assertNotIn("sid", decoded2)
106
107        # Test case 3: With both session_id and sub
108        token3 = self._create_logout_token(session_id=session_id, sub=sub)
109        decoded3 = self._decode_token(token3)
110
111        self.assertEqual(decoded3["sid"], hash_session_key(session_id))
112        self.assertEqual(decoded3["sub"], sub)
113        self.assertIn("events", decoded3)
114
115    def test_create_logout_token_header_type(self):
116        """Test creating logout tokens and checking if the token header type is correct"""
117        session_id = "test-session-123"
118        token1 = self._create_logout_token(session_id=session_id)
119
120        decoded = self._decode_token_complete(token1)
121
122        self.assertIsNotNone(decoded["header"])
123        self.assertEqual(decoded["header"]["typ"], "logout+jwt")
124
125    @patch("authentik.providers.oauth2.tasks.get_http_session")
126    def test_send_backchannel_logout_request_scenarios(self, mock_get_session):
127        """Test various scenarios for backchannel logout request task"""
128        # Setup provider with backchannel logout URI
129
130        self.provider.logout_uri = "http://testserver/backchannel_logout"
131        self.provider.logout_method = OAuth2LogoutMethod.BACKCHANNEL
132        self.provider.save()
133
134        # Setup mock session and response
135        mock_session = Mock()
136        mock_get_session.return_value = mock_session
137        mock_response = Mock(spec=Response)
138        mock_response.status_code = 200
139        mock_response.raise_for_status.return_value = None  # No exception for successful request
140        mock_session.post.return_value = mock_response
141
142        result = send_backchannel_logout_request.send(
143            self.provider.pk, "http://testserver", sub="test-user-uid"
144        )
145        self.assertTrue(result)
146        mock_session.post.assert_called_once()
147        call_args = mock_session.post.call_args
148        self.assertIn("logout_token", call_args[1]["data"])
149        self.assertEqual(
150            call_args[1]["headers"]["Content-Type"], "application/x-www-form-urlencoded"
151        )
152
153        # Scenario 2: Failed request (400 response) - should raise exception
154        mock_session.post.reset_mock()
155        error_response = Mock(spec=Response)
156        error_response.status_code = 400
157        error_response.raise_for_status.side_effect = HTTPError("HTTP 400")
158        mock_session.post.return_value = error_response
159        with self.assertRaises(ResultFailure):
160            send_backchannel_logout_request.send(
161                self.provider.pk, "http://testserver", sub="test-user-uid"
162            ).get_result()
163
164        # Scenario 3: No URI configured
165        mock_session.post.reset_mock()
166        self.provider.logout_uri = ""
167        self.provider.save()
168        result = send_backchannel_logout_request.send(
169            self.provider.pk, "http://testserver", sub="test-user-uid"
170        ).get_result()
171        self.assertIsNone(result)
172        mock_session.post.assert_not_called()
173
174        # Scenario 4: No sub provided - should fail
175        result = send_backchannel_logout_request.send(
176            self.provider.pk, "http://testserver"
177        ).get_result()
178        self.assertIsNone(result)
179
180        # Scenario 5: Non-existent provider
181        result = send_backchannel_logout_request.send(
182            99999, "http://testserver", sub="test-user-uid"
183        ).get_result()
184        self.assertIsNone(result)
185
186        # Scenario 6: Request timeout
187        mock_session.post.side_effect = Timeout("Request timed out")
188        self.provider.logout_uri = "http://testserver/backchannel_logout"
189        self.provider.logout_method = OAuth2LogoutMethod.BACKCHANNEL
190        self.provider.save()
191        with self.assertRaises(ResultFailure):
192            send_backchannel_logout_request.send(
193                self.provider.pk, "http://testserver", sub="test-user-uid"
194            ).get_result()

Test Back-Channel Logout functionality

def setUp(self) -> None:
30    def setUp(self) -> None:
31        super().setUp()
32        self.factory = RequestFactory()
33        self.user = create_test_admin_user()
34        self.app = Application.objects.create(name=generate_id(), slug="test-app")
35        self.provider = OAuth2Provider.objects.create(
36            name=generate_id(),
37            authorization_flow=create_test_flow(),
38            redirect_uris=[
39                RedirectURI(RedirectURIMatchingMode.STRICT, "http://testserver/callback"),
40            ],
41            signing_key=self.keypair,
42        )
43        self.app.provider = self.provider
44        self.app.save()

Hook method for setting up the test fixture before exercising it.

def test_create_logout_token_variants(self):
 81    def test_create_logout_token_variants(self):
 82        """Test creating logout tokens with different combinations of parameters"""
 83        # Test case 1: With session_id only
 84        session_id = "test-session-123"
 85        token1 = self._create_logout_token(session_id=session_id)
 86        decoded1 = self._decode_token(token1)
 87
 88        self.assertIn("iss", decoded1)
 89        self.assertEqual(decoded1["aud"], self.provider.client_id)
 90        self.assertIn("iat", decoded1)
 91        self.assertIn("jti", decoded1)
 92        self.assertEqual(decoded1["sid"], hash_session_key(session_id))
 93        self.assertIn("events", decoded1)
 94        self.assertIn("http://schemas.openid.net/event/backchannel-logout", decoded1["events"])
 95        self.assertNotIn("sub", decoded1)
 96
 97        # Test case 2: With sub only
 98        sub = "user-123"
 99        token2 = self._create_logout_token(sub=sub)
100        decoded2 = self._decode_token(token2)
101
102        self.assertEqual(decoded2["sub"], sub)
103        self.assertIn("events", decoded2)
104        self.assertIn("http://schemas.openid.net/event/backchannel-logout", decoded2["events"])
105        self.assertNotIn("sid", decoded2)
106
107        # Test case 3: With both session_id and sub
108        token3 = self._create_logout_token(session_id=session_id, sub=sub)
109        decoded3 = self._decode_token(token3)
110
111        self.assertEqual(decoded3["sid"], hash_session_key(session_id))
112        self.assertEqual(decoded3["sub"], sub)
113        self.assertIn("events", decoded3)

Test creating logout tokens with different combinations of parameters

def test_create_logout_token_header_type(self):
115    def test_create_logout_token_header_type(self):
116        """Test creating logout tokens and checking if the token header type is correct"""
117        session_id = "test-session-123"
118        token1 = self._create_logout_token(session_id=session_id)
119
120        decoded = self._decode_token_complete(token1)
121
122        self.assertIsNotNone(decoded["header"])
123        self.assertEqual(decoded["header"]["typ"], "logout+jwt")

Test creating logout tokens and checking if the token header type is correct

@patch('authentik.providers.oauth2.tasks.get_http_session')
def test_send_backchannel_logout_request_scenarios(self, mock_get_session):
125    @patch("authentik.providers.oauth2.tasks.get_http_session")
126    def test_send_backchannel_logout_request_scenarios(self, mock_get_session):
127        """Test various scenarios for backchannel logout request task"""
128        # Setup provider with backchannel logout URI
129
130        self.provider.logout_uri = "http://testserver/backchannel_logout"
131        self.provider.logout_method = OAuth2LogoutMethod.BACKCHANNEL
132        self.provider.save()
133
134        # Setup mock session and response
135        mock_session = Mock()
136        mock_get_session.return_value = mock_session
137        mock_response = Mock(spec=Response)
138        mock_response.status_code = 200
139        mock_response.raise_for_status.return_value = None  # No exception for successful request
140        mock_session.post.return_value = mock_response
141
142        result = send_backchannel_logout_request.send(
143            self.provider.pk, "http://testserver", sub="test-user-uid"
144        )
145        self.assertTrue(result)
146        mock_session.post.assert_called_once()
147        call_args = mock_session.post.call_args
148        self.assertIn("logout_token", call_args[1]["data"])
149        self.assertEqual(
150            call_args[1]["headers"]["Content-Type"], "application/x-www-form-urlencoded"
151        )
152
153        # Scenario 2: Failed request (400 response) - should raise exception
154        mock_session.post.reset_mock()
155        error_response = Mock(spec=Response)
156        error_response.status_code = 400
157        error_response.raise_for_status.side_effect = HTTPError("HTTP 400")
158        mock_session.post.return_value = error_response
159        with self.assertRaises(ResultFailure):
160            send_backchannel_logout_request.send(
161                self.provider.pk, "http://testserver", sub="test-user-uid"
162            ).get_result()
163
164        # Scenario 3: No URI configured
165        mock_session.post.reset_mock()
166        self.provider.logout_uri = ""
167        self.provider.save()
168        result = send_backchannel_logout_request.send(
169            self.provider.pk, "http://testserver", sub="test-user-uid"
170        ).get_result()
171        self.assertIsNone(result)
172        mock_session.post.assert_not_called()
173
174        # Scenario 4: No sub provided - should fail
175        result = send_backchannel_logout_request.send(
176            self.provider.pk, "http://testserver"
177        ).get_result()
178        self.assertIsNone(result)
179
180        # Scenario 5: Non-existent provider
181        result = send_backchannel_logout_request.send(
182            99999, "http://testserver", sub="test-user-uid"
183        ).get_result()
184        self.assertIsNone(result)
185
186        # Scenario 6: Request timeout
187        mock_session.post.side_effect = Timeout("Request timed out")
188        self.provider.logout_uri = "http://testserver/backchannel_logout"
189        self.provider.logout_method = OAuth2LogoutMethod.BACKCHANNEL
190        self.provider.save()
191        with self.assertRaises(ResultFailure):
192            send_backchannel_logout_request.send(
193                self.provider.pk, "http://testserver", sub="test-user-uid"
194            ).get_result()

Test various scenarios for backchannel logout request task