authentik.enterprise.providers.ssf.tests.test_auth

  1import json
  2from dataclasses import asdict
  3
  4from django.urls import reverse
  5from django.utils import timezone
  6from rest_framework.test import APITestCase
  7
  8from authentik.core.models import Application, Token, TokenIntents
  9from authentik.core.tests.utils import (
 10    create_test_admin_user,
 11    create_test_cert,
 12    create_test_flow,
 13    create_test_user,
 14)
 15from authentik.enterprise.providers.ssf.models import (
 16    SSFEventStatus,
 17    SSFProvider,
 18    Stream,
 19    StreamEvent,
 20)
 21from authentik.lib.generators import generate_id
 22from authentik.providers.oauth2.id_token import IDToken
 23from authentik.providers.oauth2.models import AccessToken, OAuth2Provider
 24
 25
 26class TestSSFAuth(APITestCase):
 27    def setUp(self):
 28        self.application = Application.objects.create(name=generate_id(), slug=generate_id())
 29        self.provider = SSFProvider.objects.create(
 30            name=generate_id(),
 31            signing_key=create_test_cert(),
 32            backchannel_application=self.application,
 33        )
 34
 35    def test_stream_add_token(self):
 36        """test stream add (token auth)"""
 37        res = self.client.post(
 38            reverse(
 39                "authentik_providers_ssf:stream",
 40                kwargs={"application_slug": self.application.slug},
 41            ),
 42            data={
 43                "iss": "https://authentik.company/.well-known/ssf-configuration/foo/5",
 44                "aud": ["https://app.authentik.company"],
 45                "delivery": {
 46                    "method": "https://schemas.openid.net/secevent/risc/delivery-method/push",
 47                    "endpoint_url": "https://app.authentik.company",
 48                },
 49                "events_requested": [
 50                    "https://schemas.openid.net/secevent/caep/event-type/credential-change",
 51                    "https://schemas.openid.net/secevent/caep/event-type/session-revoked",
 52                ],
 53                "format": "iss_sub",
 54            },
 55            HTTP_AUTHORIZATION=f"Bearer {self.provider.token.key}",
 56        )
 57        self.assertEqual(res.status_code, 201)
 58        stream = Stream.objects.filter(provider=self.provider).first()
 59        self.assertIsNotNone(stream)
 60        event = StreamEvent.objects.filter(stream=stream).first()
 61        self.assertIsNotNone(event)
 62        self.assertEqual(event.status, SSFEventStatus.PENDING_FAILED)
 63        self.assertEqual(
 64            event.payload["events"],
 65            {"https://schemas.openid.net/secevent/ssf/event-type/verification": {"state": None}},
 66        )
 67
 68    def test_stream_add_oidc(self):
 69        """test stream add (oidc auth)"""
 70        provider = OAuth2Provider.objects.create(
 71            name=generate_id(),
 72            authorization_flow=create_test_flow(),
 73        )
 74        self.application.provider = provider
 75        self.application.save()
 76        user = create_test_admin_user()
 77        token = AccessToken.objects.create(
 78            provider=provider,
 79            user=user,
 80            token=generate_id(),
 81            auth_time=timezone.now(),
 82            _scope="openid user profile",
 83            _id_token=json.dumps(
 84                asdict(
 85                    IDToken("foo", "bar"),
 86                )
 87            ),
 88        )
 89
 90        res = self.client.post(
 91            reverse(
 92                "authentik_providers_ssf:stream",
 93                kwargs={"application_slug": self.application.slug},
 94            ),
 95            data={
 96                "iss": "https://authentik.company/.well-known/ssf-configuration/foo/5",
 97                "aud": ["https://app.authentik.company"],
 98                "delivery": {
 99                    "method": "https://schemas.openid.net/secevent/risc/delivery-method/push",
100                    "endpoint_url": "https://app.authentik.company",
101                },
102                "events_requested": [
103                    "https://schemas.openid.net/secevent/caep/event-type/credential-change",
104                    "https://schemas.openid.net/secevent/caep/event-type/session-revoked",
105                ],
106                "format": "iss_sub",
107            },
108            HTTP_AUTHORIZATION=f"Bearer {token.token}",
109        )
110        self.assertEqual(res.status_code, 201)
111        stream = Stream.objects.filter(provider=self.provider).first()
112        self.assertIsNotNone(stream)
113        event = StreamEvent.objects.filter(stream=stream).first()
114        self.assertIsNotNone(event)
115        self.assertEqual(event.status, SSFEventStatus.PENDING_FAILED)
116        self.assertEqual(
117            event.payload["events"],
118            {"https://schemas.openid.net/secevent/ssf/event-type/verification": {"state": None}},
119        )
120
121    def test_token_invalid(self):
122        res = self.client.post(
123            reverse(
124                "authentik_providers_ssf:stream",
125                kwargs={"application_slug": self.application.slug},
126            ),
127            data={
128                "iss": "https://authentik.company/.well-known/ssf-configuration/foo/5",
129                "aud": ["https://app.authentik.company"],
130                "delivery": {
131                    "method": "https://schemas.openid.net/secevent/risc/delivery-method/push",
132                    "endpoint_url": "https://app.authentik.company",
133                },
134                "events_requested": [
135                    "https://schemas.openid.net/secevent/caep/event-type/credential-change",
136                    "https://schemas.openid.net/secevent/caep/event-type/session-revoked",
137                ],
138                "format": "iss_sub",
139            },
140            HTTP_AUTHORIZATION=f"Bearer {self.provider.token.key}a",
141        )
142        # Response code needs to be 401 according to spec
143        self.assertEqual(res.status_code, 401)
144
145    def test_token_unrelated(self):
146        token = Token.objects.create(
147            identifier=generate_id(), user=create_test_user(), intent=TokenIntents.INTENT_API
148        )
149        res = self.client.post(
150            reverse(
151                "authentik_providers_ssf:stream",
152                kwargs={"application_slug": self.application.slug},
153            ),
154            data={
155                "iss": "https://authentik.company/.well-known/ssf-configuration/foo/5",
156                "aud": ["https://app.authentik.company"],
157                "delivery": {
158                    "method": "https://schemas.openid.net/secevent/risc/delivery-method/push",
159                    "endpoint_url": "https://app.authentik.company",
160                },
161                "events_requested": [
162                    "https://schemas.openid.net/secevent/caep/event-type/credential-change",
163                    "https://schemas.openid.net/secevent/caep/event-type/session-revoked",
164                ],
165                "format": "iss_sub",
166            },
167            HTTP_AUTHORIZATION=f"Bearer {token.key}",
168        )
169        # Response code needs to be 401 according to spec
170        self.assertEqual(res.status_code, 401)
class TestSSFAuth(rest_framework.test.APITestCase):
 27class TestSSFAuth(APITestCase):
 28    def setUp(self):
 29        self.application = Application.objects.create(name=generate_id(), slug=generate_id())
 30        self.provider = SSFProvider.objects.create(
 31            name=generate_id(),
 32            signing_key=create_test_cert(),
 33            backchannel_application=self.application,
 34        )
 35
 36    def test_stream_add_token(self):
 37        """test stream add (token auth)"""
 38        res = self.client.post(
 39            reverse(
 40                "authentik_providers_ssf:stream",
 41                kwargs={"application_slug": self.application.slug},
 42            ),
 43            data={
 44                "iss": "https://authentik.company/.well-known/ssf-configuration/foo/5",
 45                "aud": ["https://app.authentik.company"],
 46                "delivery": {
 47                    "method": "https://schemas.openid.net/secevent/risc/delivery-method/push",
 48                    "endpoint_url": "https://app.authentik.company",
 49                },
 50                "events_requested": [
 51                    "https://schemas.openid.net/secevent/caep/event-type/credential-change",
 52                    "https://schemas.openid.net/secevent/caep/event-type/session-revoked",
 53                ],
 54                "format": "iss_sub",
 55            },
 56            HTTP_AUTHORIZATION=f"Bearer {self.provider.token.key}",
 57        )
 58        self.assertEqual(res.status_code, 201)
 59        stream = Stream.objects.filter(provider=self.provider).first()
 60        self.assertIsNotNone(stream)
 61        event = StreamEvent.objects.filter(stream=stream).first()
 62        self.assertIsNotNone(event)
 63        self.assertEqual(event.status, SSFEventStatus.PENDING_FAILED)
 64        self.assertEqual(
 65            event.payload["events"],
 66            {"https://schemas.openid.net/secevent/ssf/event-type/verification": {"state": None}},
 67        )
 68
 69    def test_stream_add_oidc(self):
 70        """test stream add (oidc auth)"""
 71        provider = OAuth2Provider.objects.create(
 72            name=generate_id(),
 73            authorization_flow=create_test_flow(),
 74        )
 75        self.application.provider = provider
 76        self.application.save()
 77        user = create_test_admin_user()
 78        token = AccessToken.objects.create(
 79            provider=provider,
 80            user=user,
 81            token=generate_id(),
 82            auth_time=timezone.now(),
 83            _scope="openid user profile",
 84            _id_token=json.dumps(
 85                asdict(
 86                    IDToken("foo", "bar"),
 87                )
 88            ),
 89        )
 90
 91        res = self.client.post(
 92            reverse(
 93                "authentik_providers_ssf:stream",
 94                kwargs={"application_slug": self.application.slug},
 95            ),
 96            data={
 97                "iss": "https://authentik.company/.well-known/ssf-configuration/foo/5",
 98                "aud": ["https://app.authentik.company"],
 99                "delivery": {
100                    "method": "https://schemas.openid.net/secevent/risc/delivery-method/push",
101                    "endpoint_url": "https://app.authentik.company",
102                },
103                "events_requested": [
104                    "https://schemas.openid.net/secevent/caep/event-type/credential-change",
105                    "https://schemas.openid.net/secevent/caep/event-type/session-revoked",
106                ],
107                "format": "iss_sub",
108            },
109            HTTP_AUTHORIZATION=f"Bearer {token.token}",
110        )
111        self.assertEqual(res.status_code, 201)
112        stream = Stream.objects.filter(provider=self.provider).first()
113        self.assertIsNotNone(stream)
114        event = StreamEvent.objects.filter(stream=stream).first()
115        self.assertIsNotNone(event)
116        self.assertEqual(event.status, SSFEventStatus.PENDING_FAILED)
117        self.assertEqual(
118            event.payload["events"],
119            {"https://schemas.openid.net/secevent/ssf/event-type/verification": {"state": None}},
120        )
121
122    def test_token_invalid(self):
123        res = self.client.post(
124            reverse(
125                "authentik_providers_ssf:stream",
126                kwargs={"application_slug": self.application.slug},
127            ),
128            data={
129                "iss": "https://authentik.company/.well-known/ssf-configuration/foo/5",
130                "aud": ["https://app.authentik.company"],
131                "delivery": {
132                    "method": "https://schemas.openid.net/secevent/risc/delivery-method/push",
133                    "endpoint_url": "https://app.authentik.company",
134                },
135                "events_requested": [
136                    "https://schemas.openid.net/secevent/caep/event-type/credential-change",
137                    "https://schemas.openid.net/secevent/caep/event-type/session-revoked",
138                ],
139                "format": "iss_sub",
140            },
141            HTTP_AUTHORIZATION=f"Bearer {self.provider.token.key}a",
142        )
143        # Response code needs to be 401 according to spec
144        self.assertEqual(res.status_code, 401)
145
146    def test_token_unrelated(self):
147        token = Token.objects.create(
148            identifier=generate_id(), user=create_test_user(), intent=TokenIntents.INTENT_API
149        )
150        res = self.client.post(
151            reverse(
152                "authentik_providers_ssf:stream",
153                kwargs={"application_slug": self.application.slug},
154            ),
155            data={
156                "iss": "https://authentik.company/.well-known/ssf-configuration/foo/5",
157                "aud": ["https://app.authentik.company"],
158                "delivery": {
159                    "method": "https://schemas.openid.net/secevent/risc/delivery-method/push",
160                    "endpoint_url": "https://app.authentik.company",
161                },
162                "events_requested": [
163                    "https://schemas.openid.net/secevent/caep/event-type/credential-change",
164                    "https://schemas.openid.net/secevent/caep/event-type/session-revoked",
165                ],
166                "format": "iss_sub",
167            },
168            HTTP_AUTHORIZATION=f"Bearer {token.key}",
169        )
170        # Response code needs to be 401 according to spec
171        self.assertEqual(res.status_code, 401)

Similar to TransactionTestCase, but use transaction.atomic() to achieve test isolation.

In most situations, TestCase should be preferred to TransactionTestCase as it allows faster execution. However, there are some situations where using TransactionTestCase might be necessary (e.g. testing some transactional behavior).

On database backends with no transaction support, TestCase behaves as TransactionTestCase.

def setUp(self):
28    def setUp(self):
29        self.application = Application.objects.create(name=generate_id(), slug=generate_id())
30        self.provider = SSFProvider.objects.create(
31            name=generate_id(),
32            signing_key=create_test_cert(),
33            backchannel_application=self.application,
34        )

Hook method for setting up the test fixture before exercising it.

def test_stream_add_token(self):
36    def test_stream_add_token(self):
37        """test stream add (token auth)"""
38        res = self.client.post(
39            reverse(
40                "authentik_providers_ssf:stream",
41                kwargs={"application_slug": self.application.slug},
42            ),
43            data={
44                "iss": "https://authentik.company/.well-known/ssf-configuration/foo/5",
45                "aud": ["https://app.authentik.company"],
46                "delivery": {
47                    "method": "https://schemas.openid.net/secevent/risc/delivery-method/push",
48                    "endpoint_url": "https://app.authentik.company",
49                },
50                "events_requested": [
51                    "https://schemas.openid.net/secevent/caep/event-type/credential-change",
52                    "https://schemas.openid.net/secevent/caep/event-type/session-revoked",
53                ],
54                "format": "iss_sub",
55            },
56            HTTP_AUTHORIZATION=f"Bearer {self.provider.token.key}",
57        )
58        self.assertEqual(res.status_code, 201)
59        stream = Stream.objects.filter(provider=self.provider).first()
60        self.assertIsNotNone(stream)
61        event = StreamEvent.objects.filter(stream=stream).first()
62        self.assertIsNotNone(event)
63        self.assertEqual(event.status, SSFEventStatus.PENDING_FAILED)
64        self.assertEqual(
65            event.payload["events"],
66            {"https://schemas.openid.net/secevent/ssf/event-type/verification": {"state": None}},
67        )

test stream add (token auth)

def test_stream_add_oidc(self):
 69    def test_stream_add_oidc(self):
 70        """test stream add (oidc auth)"""
 71        provider = OAuth2Provider.objects.create(
 72            name=generate_id(),
 73            authorization_flow=create_test_flow(),
 74        )
 75        self.application.provider = provider
 76        self.application.save()
 77        user = create_test_admin_user()
 78        token = AccessToken.objects.create(
 79            provider=provider,
 80            user=user,
 81            token=generate_id(),
 82            auth_time=timezone.now(),
 83            _scope="openid user profile",
 84            _id_token=json.dumps(
 85                asdict(
 86                    IDToken("foo", "bar"),
 87                )
 88            ),
 89        )
 90
 91        res = self.client.post(
 92            reverse(
 93                "authentik_providers_ssf:stream",
 94                kwargs={"application_slug": self.application.slug},
 95            ),
 96            data={
 97                "iss": "https://authentik.company/.well-known/ssf-configuration/foo/5",
 98                "aud": ["https://app.authentik.company"],
 99                "delivery": {
100                    "method": "https://schemas.openid.net/secevent/risc/delivery-method/push",
101                    "endpoint_url": "https://app.authentik.company",
102                },
103                "events_requested": [
104                    "https://schemas.openid.net/secevent/caep/event-type/credential-change",
105                    "https://schemas.openid.net/secevent/caep/event-type/session-revoked",
106                ],
107                "format": "iss_sub",
108            },
109            HTTP_AUTHORIZATION=f"Bearer {token.token}",
110        )
111        self.assertEqual(res.status_code, 201)
112        stream = Stream.objects.filter(provider=self.provider).first()
113        self.assertIsNotNone(stream)
114        event = StreamEvent.objects.filter(stream=stream).first()
115        self.assertIsNotNone(event)
116        self.assertEqual(event.status, SSFEventStatus.PENDING_FAILED)
117        self.assertEqual(
118            event.payload["events"],
119            {"https://schemas.openid.net/secevent/ssf/event-type/verification": {"state": None}},
120        )

test stream add (oidc auth)

def test_token_invalid(self):
122    def test_token_invalid(self):
123        res = self.client.post(
124            reverse(
125                "authentik_providers_ssf:stream",
126                kwargs={"application_slug": self.application.slug},
127            ),
128            data={
129                "iss": "https://authentik.company/.well-known/ssf-configuration/foo/5",
130                "aud": ["https://app.authentik.company"],
131                "delivery": {
132                    "method": "https://schemas.openid.net/secevent/risc/delivery-method/push",
133                    "endpoint_url": "https://app.authentik.company",
134                },
135                "events_requested": [
136                    "https://schemas.openid.net/secevent/caep/event-type/credential-change",
137                    "https://schemas.openid.net/secevent/caep/event-type/session-revoked",
138                ],
139                "format": "iss_sub",
140            },
141            HTTP_AUTHORIZATION=f"Bearer {self.provider.token.key}a",
142        )
143        # Response code needs to be 401 according to spec
144        self.assertEqual(res.status_code, 401)
def test_token_unrelated(self):
146    def test_token_unrelated(self):
147        token = Token.objects.create(
148            identifier=generate_id(), user=create_test_user(), intent=TokenIntents.INTENT_API
149        )
150        res = self.client.post(
151            reverse(
152                "authentik_providers_ssf:stream",
153                kwargs={"application_slug": self.application.slug},
154            ),
155            data={
156                "iss": "https://authentik.company/.well-known/ssf-configuration/foo/5",
157                "aud": ["https://app.authentik.company"],
158                "delivery": {
159                    "method": "https://schemas.openid.net/secevent/risc/delivery-method/push",
160                    "endpoint_url": "https://app.authentik.company",
161                },
162                "events_requested": [
163                    "https://schemas.openid.net/secevent/caep/event-type/credential-change",
164                    "https://schemas.openid.net/secevent/caep/event-type/session-revoked",
165                ],
166                "format": "iss_sub",
167            },
168            HTTP_AUTHORIZATION=f"Bearer {token.key}",
169        )
170        # Response code needs to be 401 according to spec
171        self.assertEqual(res.status_code, 401)