authentik.enterprise.providers.ssf.tests.test_signals

  1from uuid import uuid4
  2
  3from django.urls import reverse
  4from rest_framework.test import APITestCase
  5
  6from authentik.core.models import Application, Group
  7from authentik.core.tests.utils import (
  8    create_test_cert,
  9    create_test_user,
 10)
 11from authentik.enterprise.providers.ssf.models import (
 12    EventTypes,
 13    SSFEventStatus,
 14    SSFProvider,
 15    Stream,
 16    StreamEvent,
 17)
 18from authentik.lib.generators import generate_id
 19from authentik.policies.models import PolicyBinding
 20from authentik.stages.authenticator_webauthn.models import WebAuthnDevice
 21
 22
 23class TestSignals(APITestCase):
 24    """Test individual SSF Signals"""
 25
 26    def setUp(self):
 27        self.application = Application.objects.create(name=generate_id(), slug=generate_id())
 28        self.provider = SSFProvider.objects.create(
 29            name=generate_id(),
 30            signing_key=create_test_cert(),
 31            backchannel_application=self.application,
 32        )
 33        res = self.client.post(
 34            reverse(
 35                "authentik_providers_ssf:stream",
 36                kwargs={"application_slug": self.application.slug},
 37            ),
 38            data={
 39                "iss": "https://authentik.company/.well-known/ssf-configuration/foo/5",
 40                "aud": ["https://app.authentik.company"],
 41                "delivery": {
 42                    "method": "https://schemas.openid.net/secevent/risc/delivery-method/push",
 43                    "endpoint_url": "https://app.authentik.company",
 44                },
 45                "events_requested": [
 46                    "https://schemas.openid.net/secevent/caep/event-type/credential-change",
 47                    "https://schemas.openid.net/secevent/caep/event-type/session-revoked",
 48                ],
 49                "format": "iss_sub",
 50            },
 51            HTTP_AUTHORIZATION=f"Bearer {self.provider.token.key}",
 52        )
 53        self.assertEqual(res.status_code, 201, res.content)
 54
 55    def test_signal_logout(self):
 56        """Test user logout"""
 57        user = create_test_user()
 58        self.client.force_login(user)
 59        self.client.logout()
 60
 61        stream = Stream.objects.filter(provider=self.provider).first()
 62        self.assertIsNotNone(stream)
 63        event = StreamEvent.objects.filter(stream=stream).first()
 64        self.assertIsNotNone(event)
 65        self.assertEqual(event.status, SSFEventStatus.PENDING_FAILED)
 66        event_payload = event.payload["events"][
 67            "https://schemas.openid.net/secevent/caep/event-type/session-revoked"
 68        ]
 69        self.assertEqual(event_payload["initiating_entity"], "user")
 70        self.assertEqual(event.payload["sub_id"]["format"], "complex")
 71        self.assertEqual(event.payload["sub_id"]["session"]["format"], "opaque")
 72        self.assertEqual(event.payload["sub_id"]["user"]["format"], "email")
 73        self.assertEqual(event.payload["sub_id"]["user"]["email"], user.email)
 74
 75    def test_signal_password_change(self):
 76        """Test user password change"""
 77        user = create_test_user()
 78        self.client.force_login(user)
 79        user.set_password(generate_id())
 80        user.save()
 81
 82        stream = Stream.objects.filter(provider=self.provider).first()
 83        self.assertIsNotNone(stream)
 84        event = StreamEvent.objects.filter(stream=stream).first()
 85        self.assertIsNotNone(event)
 86        self.assertEqual(event.status, SSFEventStatus.PENDING_FAILED)
 87        event_payload = event.payload["events"][
 88            "https://schemas.openid.net/secevent/caep/event-type/credential-change"
 89        ]
 90        self.assertEqual(event_payload["change_type"], "update")
 91        self.assertEqual(event_payload["credential_type"], "password")
 92        self.assertEqual(event.payload["sub_id"]["format"], "complex")
 93        self.assertEqual(event.payload["sub_id"]["user"]["format"], "email")
 94        self.assertEqual(event.payload["sub_id"]["user"]["email"], user.email)
 95
 96    def test_signal_authenticator_added(self):
 97        """Test authenticator creation signal"""
 98        user = create_test_user()
 99        self.client.force_login(user)
100        dev = WebAuthnDevice.objects.create(
101            user=user,
102            name=generate_id(),
103            credential_id=generate_id(),
104            public_key=generate_id(),
105            aaguid=str(uuid4()),
106        )
107
108        stream = Stream.objects.filter(provider=self.provider).first()
109        self.assertIsNotNone(stream)
110        event = StreamEvent.objects.filter(stream=stream).exclude().first()
111        self.assertIsNotNone(event)
112        self.assertEqual(event.status, SSFEventStatus.PENDING_FAILED)
113        event_payload = event.payload["events"][
114            "https://schemas.openid.net/secevent/caep/event-type/credential-change"
115        ]
116        self.assertEqual(event_payload["change_type"], "create")
117        self.assertEqual(event_payload["fido2_aaguid"], dev.aaguid)
118        self.assertEqual(event_payload["friendly_name"], dev.name)
119        self.assertEqual(event_payload["credential_type"], "fido-u2f")
120        self.assertEqual(event.payload["sub_id"]["format"], "complex")
121        self.assertEqual(event.payload["sub_id"]["user"]["format"], "email")
122        self.assertEqual(event.payload["sub_id"]["user"]["email"], user.email)
123
124    def test_signal_authenticator_deleted(self):
125        """Test authenticator deletion signal"""
126        user = create_test_user()
127        self.client.force_login(user)
128        dev = WebAuthnDevice.objects.create(
129            user=user,
130            name=generate_id(),
131            credential_id=generate_id(),
132            public_key=generate_id(),
133            aaguid=str(uuid4()),
134        )
135        dev.delete()
136
137        stream = Stream.objects.filter(provider=self.provider).first()
138        self.assertIsNotNone(stream)
139        event = StreamEvent.objects.filter(stream=stream).exclude().first()
140        self.assertIsNotNone(event)
141        self.assertEqual(event.status, SSFEventStatus.PENDING_FAILED)
142        event_payload = event.payload["events"][
143            "https://schemas.openid.net/secevent/caep/event-type/credential-change"
144        ]
145        self.assertEqual(event_payload["change_type"], "delete")
146        self.assertEqual(event_payload["fido2_aaguid"], dev.aaguid)
147        self.assertEqual(event_payload["friendly_name"], dev.name)
148        self.assertEqual(event_payload["credential_type"], "fido-u2f")
149        self.assertEqual(event.payload["sub_id"]["format"], "complex")
150        self.assertEqual(event.payload["sub_id"]["user"]["format"], "email")
151        self.assertEqual(event.payload["sub_id"]["user"]["email"], user.email)
152
153    def test_signal_policy_ignore(self):
154        """Test event not being created for user that doesn't have access to the application"""
155        PolicyBinding.objects.create(
156            target=self.application, group=Group.objects.create(name=generate_id()), order=0
157        )
158        user = create_test_user()
159        self.client.force_login(user)
160        user.set_password(generate_id())
161        user.save()
162
163        stream = Stream.objects.filter(provider=self.provider).first()
164        self.assertIsNotNone(stream)
165        event = StreamEvent.objects.filter(
166            stream=stream, type=EventTypes.CAEP_CREDENTIAL_CHANGE
167        ).first()
168        self.assertIsNone(event)
class TestSignals(rest_framework.test.APITestCase):
 24class TestSignals(APITestCase):
 25    """Test individual SSF Signals"""
 26
 27    def setUp(self):
 28        self.application = Application.objects.create(name=generate_id(), slug=generate_id())
 29        self.provider = SSFProvider.objects.create(
 30            name=generate_id(),
 31            signing_key=create_test_cert(),
 32            backchannel_application=self.application,
 33        )
 34        res = self.client.post(
 35            reverse(
 36                "authentik_providers_ssf:stream",
 37                kwargs={"application_slug": self.application.slug},
 38            ),
 39            data={
 40                "iss": "https://authentik.company/.well-known/ssf-configuration/foo/5",
 41                "aud": ["https://app.authentik.company"],
 42                "delivery": {
 43                    "method": "https://schemas.openid.net/secevent/risc/delivery-method/push",
 44                    "endpoint_url": "https://app.authentik.company",
 45                },
 46                "events_requested": [
 47                    "https://schemas.openid.net/secevent/caep/event-type/credential-change",
 48                    "https://schemas.openid.net/secevent/caep/event-type/session-revoked",
 49                ],
 50                "format": "iss_sub",
 51            },
 52            HTTP_AUTHORIZATION=f"Bearer {self.provider.token.key}",
 53        )
 54        self.assertEqual(res.status_code, 201, res.content)
 55
 56    def test_signal_logout(self):
 57        """Test user logout"""
 58        user = create_test_user()
 59        self.client.force_login(user)
 60        self.client.logout()
 61
 62        stream = Stream.objects.filter(provider=self.provider).first()
 63        self.assertIsNotNone(stream)
 64        event = StreamEvent.objects.filter(stream=stream).first()
 65        self.assertIsNotNone(event)
 66        self.assertEqual(event.status, SSFEventStatus.PENDING_FAILED)
 67        event_payload = event.payload["events"][
 68            "https://schemas.openid.net/secevent/caep/event-type/session-revoked"
 69        ]
 70        self.assertEqual(event_payload["initiating_entity"], "user")
 71        self.assertEqual(event.payload["sub_id"]["format"], "complex")
 72        self.assertEqual(event.payload["sub_id"]["session"]["format"], "opaque")
 73        self.assertEqual(event.payload["sub_id"]["user"]["format"], "email")
 74        self.assertEqual(event.payload["sub_id"]["user"]["email"], user.email)
 75
 76    def test_signal_password_change(self):
 77        """Test user password change"""
 78        user = create_test_user()
 79        self.client.force_login(user)
 80        user.set_password(generate_id())
 81        user.save()
 82
 83        stream = Stream.objects.filter(provider=self.provider).first()
 84        self.assertIsNotNone(stream)
 85        event = StreamEvent.objects.filter(stream=stream).first()
 86        self.assertIsNotNone(event)
 87        self.assertEqual(event.status, SSFEventStatus.PENDING_FAILED)
 88        event_payload = event.payload["events"][
 89            "https://schemas.openid.net/secevent/caep/event-type/credential-change"
 90        ]
 91        self.assertEqual(event_payload["change_type"], "update")
 92        self.assertEqual(event_payload["credential_type"], "password")
 93        self.assertEqual(event.payload["sub_id"]["format"], "complex")
 94        self.assertEqual(event.payload["sub_id"]["user"]["format"], "email")
 95        self.assertEqual(event.payload["sub_id"]["user"]["email"], user.email)
 96
 97    def test_signal_authenticator_added(self):
 98        """Test authenticator creation signal"""
 99        user = create_test_user()
100        self.client.force_login(user)
101        dev = WebAuthnDevice.objects.create(
102            user=user,
103            name=generate_id(),
104            credential_id=generate_id(),
105            public_key=generate_id(),
106            aaguid=str(uuid4()),
107        )
108
109        stream = Stream.objects.filter(provider=self.provider).first()
110        self.assertIsNotNone(stream)
111        event = StreamEvent.objects.filter(stream=stream).exclude().first()
112        self.assertIsNotNone(event)
113        self.assertEqual(event.status, SSFEventStatus.PENDING_FAILED)
114        event_payload = event.payload["events"][
115            "https://schemas.openid.net/secevent/caep/event-type/credential-change"
116        ]
117        self.assertEqual(event_payload["change_type"], "create")
118        self.assertEqual(event_payload["fido2_aaguid"], dev.aaguid)
119        self.assertEqual(event_payload["friendly_name"], dev.name)
120        self.assertEqual(event_payload["credential_type"], "fido-u2f")
121        self.assertEqual(event.payload["sub_id"]["format"], "complex")
122        self.assertEqual(event.payload["sub_id"]["user"]["format"], "email")
123        self.assertEqual(event.payload["sub_id"]["user"]["email"], user.email)
124
125    def test_signal_authenticator_deleted(self):
126        """Test authenticator deletion signal"""
127        user = create_test_user()
128        self.client.force_login(user)
129        dev = WebAuthnDevice.objects.create(
130            user=user,
131            name=generate_id(),
132            credential_id=generate_id(),
133            public_key=generate_id(),
134            aaguid=str(uuid4()),
135        )
136        dev.delete()
137
138        stream = Stream.objects.filter(provider=self.provider).first()
139        self.assertIsNotNone(stream)
140        event = StreamEvent.objects.filter(stream=stream).exclude().first()
141        self.assertIsNotNone(event)
142        self.assertEqual(event.status, SSFEventStatus.PENDING_FAILED)
143        event_payload = event.payload["events"][
144            "https://schemas.openid.net/secevent/caep/event-type/credential-change"
145        ]
146        self.assertEqual(event_payload["change_type"], "delete")
147        self.assertEqual(event_payload["fido2_aaguid"], dev.aaguid)
148        self.assertEqual(event_payload["friendly_name"], dev.name)
149        self.assertEqual(event_payload["credential_type"], "fido-u2f")
150        self.assertEqual(event.payload["sub_id"]["format"], "complex")
151        self.assertEqual(event.payload["sub_id"]["user"]["format"], "email")
152        self.assertEqual(event.payload["sub_id"]["user"]["email"], user.email)
153
154    def test_signal_policy_ignore(self):
155        """Test event not being created for user that doesn't have access to the application"""
156        PolicyBinding.objects.create(
157            target=self.application, group=Group.objects.create(name=generate_id()), order=0
158        )
159        user = create_test_user()
160        self.client.force_login(user)
161        user.set_password(generate_id())
162        user.save()
163
164        stream = Stream.objects.filter(provider=self.provider).first()
165        self.assertIsNotNone(stream)
166        event = StreamEvent.objects.filter(
167            stream=stream, type=EventTypes.CAEP_CREDENTIAL_CHANGE
168        ).first()
169        self.assertIsNone(event)

Test individual SSF Signals

def setUp(self):
27    def setUp(self):
28        self.application = Application.objects.create(name=generate_id(), slug=generate_id())
29        self.provider = SSFProvider.objects.create(
30            name=generate_id(),
31            signing_key=create_test_cert(),
32            backchannel_application=self.application,
33        )
34        res = self.client.post(
35            reverse(
36                "authentik_providers_ssf:stream",
37                kwargs={"application_slug": self.application.slug},
38            ),
39            data={
40                "iss": "https://authentik.company/.well-known/ssf-configuration/foo/5",
41                "aud": ["https://app.authentik.company"],
42                "delivery": {
43                    "method": "https://schemas.openid.net/secevent/risc/delivery-method/push",
44                    "endpoint_url": "https://app.authentik.company",
45                },
46                "events_requested": [
47                    "https://schemas.openid.net/secevent/caep/event-type/credential-change",
48                    "https://schemas.openid.net/secevent/caep/event-type/session-revoked",
49                ],
50                "format": "iss_sub",
51            },
52            HTTP_AUTHORIZATION=f"Bearer {self.provider.token.key}",
53        )
54        self.assertEqual(res.status_code, 201, res.content)

Hook method for setting up the test fixture before exercising it.

def test_signal_logout(self):
56    def test_signal_logout(self):
57        """Test user logout"""
58        user = create_test_user()
59        self.client.force_login(user)
60        self.client.logout()
61
62        stream = Stream.objects.filter(provider=self.provider).first()
63        self.assertIsNotNone(stream)
64        event = StreamEvent.objects.filter(stream=stream).first()
65        self.assertIsNotNone(event)
66        self.assertEqual(event.status, SSFEventStatus.PENDING_FAILED)
67        event_payload = event.payload["events"][
68            "https://schemas.openid.net/secevent/caep/event-type/session-revoked"
69        ]
70        self.assertEqual(event_payload["initiating_entity"], "user")
71        self.assertEqual(event.payload["sub_id"]["format"], "complex")
72        self.assertEqual(event.payload["sub_id"]["session"]["format"], "opaque")
73        self.assertEqual(event.payload["sub_id"]["user"]["format"], "email")
74        self.assertEqual(event.payload["sub_id"]["user"]["email"], user.email)

Test user logout

def test_signal_password_change(self):
76    def test_signal_password_change(self):
77        """Test user password change"""
78        user = create_test_user()
79        self.client.force_login(user)
80        user.set_password(generate_id())
81        user.save()
82
83        stream = Stream.objects.filter(provider=self.provider).first()
84        self.assertIsNotNone(stream)
85        event = StreamEvent.objects.filter(stream=stream).first()
86        self.assertIsNotNone(event)
87        self.assertEqual(event.status, SSFEventStatus.PENDING_FAILED)
88        event_payload = event.payload["events"][
89            "https://schemas.openid.net/secevent/caep/event-type/credential-change"
90        ]
91        self.assertEqual(event_payload["change_type"], "update")
92        self.assertEqual(event_payload["credential_type"], "password")
93        self.assertEqual(event.payload["sub_id"]["format"], "complex")
94        self.assertEqual(event.payload["sub_id"]["user"]["format"], "email")
95        self.assertEqual(event.payload["sub_id"]["user"]["email"], user.email)

Test user password change

def test_signal_authenticator_added(self):
 97    def test_signal_authenticator_added(self):
 98        """Test authenticator creation signal"""
 99        user = create_test_user()
100        self.client.force_login(user)
101        dev = WebAuthnDevice.objects.create(
102            user=user,
103            name=generate_id(),
104            credential_id=generate_id(),
105            public_key=generate_id(),
106            aaguid=str(uuid4()),
107        )
108
109        stream = Stream.objects.filter(provider=self.provider).first()
110        self.assertIsNotNone(stream)
111        event = StreamEvent.objects.filter(stream=stream).exclude().first()
112        self.assertIsNotNone(event)
113        self.assertEqual(event.status, SSFEventStatus.PENDING_FAILED)
114        event_payload = event.payload["events"][
115            "https://schemas.openid.net/secevent/caep/event-type/credential-change"
116        ]
117        self.assertEqual(event_payload["change_type"], "create")
118        self.assertEqual(event_payload["fido2_aaguid"], dev.aaguid)
119        self.assertEqual(event_payload["friendly_name"], dev.name)
120        self.assertEqual(event_payload["credential_type"], "fido-u2f")
121        self.assertEqual(event.payload["sub_id"]["format"], "complex")
122        self.assertEqual(event.payload["sub_id"]["user"]["format"], "email")
123        self.assertEqual(event.payload["sub_id"]["user"]["email"], user.email)

Test authenticator creation signal

def test_signal_authenticator_deleted(self):
125    def test_signal_authenticator_deleted(self):
126        """Test authenticator deletion signal"""
127        user = create_test_user()
128        self.client.force_login(user)
129        dev = WebAuthnDevice.objects.create(
130            user=user,
131            name=generate_id(),
132            credential_id=generate_id(),
133            public_key=generate_id(),
134            aaguid=str(uuid4()),
135        )
136        dev.delete()
137
138        stream = Stream.objects.filter(provider=self.provider).first()
139        self.assertIsNotNone(stream)
140        event = StreamEvent.objects.filter(stream=stream).exclude().first()
141        self.assertIsNotNone(event)
142        self.assertEqual(event.status, SSFEventStatus.PENDING_FAILED)
143        event_payload = event.payload["events"][
144            "https://schemas.openid.net/secevent/caep/event-type/credential-change"
145        ]
146        self.assertEqual(event_payload["change_type"], "delete")
147        self.assertEqual(event_payload["fido2_aaguid"], dev.aaguid)
148        self.assertEqual(event_payload["friendly_name"], dev.name)
149        self.assertEqual(event_payload["credential_type"], "fido-u2f")
150        self.assertEqual(event.payload["sub_id"]["format"], "complex")
151        self.assertEqual(event.payload["sub_id"]["user"]["format"], "email")
152        self.assertEqual(event.payload["sub_id"]["user"]["email"], user.email)

Test authenticator deletion signal

def test_signal_policy_ignore(self):
154    def test_signal_policy_ignore(self):
155        """Test event not being created for user that doesn't have access to the application"""
156        PolicyBinding.objects.create(
157            target=self.application, group=Group.objects.create(name=generate_id()), order=0
158        )
159        user = create_test_user()
160        self.client.force_login(user)
161        user.set_password(generate_id())
162        user.save()
163
164        stream = Stream.objects.filter(provider=self.provider).first()
165        self.assertIsNotNone(stream)
166        event = StreamEvent.objects.filter(
167            stream=stream, type=EventTypes.CAEP_CREDENTIAL_CHANGE
168        ).first()
169        self.assertIsNone(event)

Test event not being created for user that doesn't have access to the application