authentik.enterprise.providers.ssf.tests.test_signals

  1from uuid import uuid4
  2
  3from django.contrib.auth.hashers import make_password
  4from django.urls import reverse
  5from rest_framework.test import APITestCase
  6
  7from authentik.core.models import Application, Group
  8from authentik.core.tests.utils import (
  9    create_test_cert,
 10    create_test_user,
 11)
 12from authentik.enterprise.providers.ssf.models import (
 13    EventTypes,
 14    SSFEventStatus,
 15    SSFProvider,
 16    Stream,
 17    StreamEvent,
 18)
 19from authentik.lib.generators import generate_id
 20from authentik.policies.models import PolicyBinding
 21from authentik.stages.authenticator_webauthn.models import WebAuthnDevice
 22
 23
 24class TestSignals(APITestCase):
 25    """Test individual SSF Signals"""
 26
 27    def setUp(self):
 28        self.application = Application.objects.create(name=generate_id(), slug=generate_id())
 29        self.provider = SSFProvider.objects.create(
 30            name=generate_id(),
 31            signing_key=create_test_cert(),
 32            backchannel_application=self.application,
 33        )
 34        res = self.client.post(
 35            reverse(
 36                "authentik_providers_ssf:stream",
 37                kwargs={"application_slug": self.application.slug},
 38            ),
 39            data={
 40                "iss": "https://authentik.company/.well-known/ssf-configuration/foo/5",
 41                "aud": ["https://app.authentik.company"],
 42                "delivery": {
 43                    "method": "https://schemas.openid.net/secevent/risc/delivery-method/push",
 44                    "endpoint_url": "https://app.authentik.company",
 45                },
 46                "events_requested": [
 47                    "https://schemas.openid.net/secevent/caep/event-type/credential-change",
 48                    "https://schemas.openid.net/secevent/caep/event-type/session-revoked",
 49                ],
 50                "format": "iss_sub",
 51            },
 52            HTTP_AUTHORIZATION=f"Bearer {self.provider.token.key}",
 53        )
 54        self.assertEqual(res.status_code, 201, res.content)
 55
 56    def _assert_password_credential_change(self, user, change_type: str):
 57        stream = Stream.objects.filter(provider=self.provider).first()
 58        self.assertIsNotNone(stream)
 59        event = StreamEvent.objects.filter(stream=stream).first()
 60        self.assertIsNotNone(event)
 61        self.assertEqual(event.status, SSFEventStatus.PENDING_FAILED)
 62        event_payload = event.payload["events"][
 63            "https://schemas.openid.net/secevent/caep/event-type/credential-change"
 64        ]
 65        self.assertEqual(event_payload["change_type"], change_type)
 66        self.assertEqual(event_payload["credential_type"], "password")
 67        self.assertEqual(event.payload["sub_id"]["format"], "complex")
 68        self.assertEqual(event.payload["sub_id"]["user"]["format"], "email")
 69        self.assertEqual(event.payload["sub_id"]["user"]["email"], user.email)
 70
 71    def test_signal_logout(self):
 72        """Test user logout"""
 73        user = create_test_user()
 74        self.client.force_login(user)
 75        self.client.logout()
 76
 77        stream = Stream.objects.filter(provider=self.provider).first()
 78        self.assertIsNotNone(stream)
 79        event = StreamEvent.objects.filter(stream=stream).first()
 80        self.assertIsNotNone(event)
 81        self.assertEqual(event.status, SSFEventStatus.PENDING_FAILED)
 82        event_payload = event.payload["events"][
 83            "https://schemas.openid.net/secevent/caep/event-type/session-revoked"
 84        ]
 85        self.assertEqual(event_payload["initiating_entity"], "user")
 86        self.assertEqual(event.payload["sub_id"]["format"], "complex")
 87        self.assertEqual(event.payload["sub_id"]["session"]["format"], "opaque")
 88        self.assertEqual(event.payload["sub_id"]["user"]["format"], "email")
 89        self.assertEqual(event.payload["sub_id"]["user"]["email"], user.email)
 90
 91    def test_signal_password_change(self):
 92        """Test user password change"""
 93        user = create_test_user()
 94        self.client.force_login(user)
 95        user.set_password(generate_id())
 96        user.save()
 97
 98        self._assert_password_credential_change(user, "update")
 99
100    def test_signal_password_change_from_hash(self):
101        """Test user password change from a pre-hashed password."""
102        user = create_test_user()
103        self.client.force_login(user)
104        user.set_password_from_hash(make_password(generate_id()))
105        user.save()
106
107        self._assert_password_credential_change(user, "update")
108
109    def test_signal_password_revoke(self):
110        """Test explicit password revoke."""
111        user = create_test_user()
112        self.client.force_login(user)
113        user.set_password(None)
114        user.save()
115
116        self._assert_password_credential_change(user, "revoke")
117
118    def test_signal_authenticator_added(self):
119        """Test authenticator creation signal"""
120        user = create_test_user()
121        self.client.force_login(user)
122        dev = WebAuthnDevice.objects.create(
123            user=user,
124            name=generate_id(),
125            credential_id=generate_id(),
126            public_key=generate_id(),
127            aaguid=str(uuid4()),
128        )
129
130        stream = Stream.objects.filter(provider=self.provider).first()
131        self.assertIsNotNone(stream)
132        event = StreamEvent.objects.filter(stream=stream).exclude().first()
133        self.assertIsNotNone(event)
134        self.assertEqual(event.status, SSFEventStatus.PENDING_FAILED)
135        event_payload = event.payload["events"][
136            "https://schemas.openid.net/secevent/caep/event-type/credential-change"
137        ]
138        self.assertEqual(event_payload["change_type"], "create")
139        self.assertEqual(event_payload["fido2_aaguid"], dev.aaguid)
140        self.assertEqual(event_payload["friendly_name"], dev.name)
141        self.assertEqual(event_payload["credential_type"], "fido-u2f")
142        self.assertEqual(event.payload["sub_id"]["format"], "complex")
143        self.assertEqual(event.payload["sub_id"]["user"]["format"], "email")
144        self.assertEqual(event.payload["sub_id"]["user"]["email"], user.email)
145
146    def test_signal_authenticator_deleted(self):
147        """Test authenticator deletion signal"""
148        user = create_test_user()
149        self.client.force_login(user)
150        dev = WebAuthnDevice.objects.create(
151            user=user,
152            name=generate_id(),
153            credential_id=generate_id(),
154            public_key=generate_id(),
155            aaguid=str(uuid4()),
156        )
157        dev.delete()
158
159        stream = Stream.objects.filter(provider=self.provider).first()
160        self.assertIsNotNone(stream)
161        event = StreamEvent.objects.filter(stream=stream).exclude().first()
162        self.assertIsNotNone(event)
163        self.assertEqual(event.status, SSFEventStatus.PENDING_FAILED)
164        event_payload = event.payload["events"][
165            "https://schemas.openid.net/secevent/caep/event-type/credential-change"
166        ]
167        self.assertEqual(event_payload["change_type"], "delete")
168        self.assertEqual(event_payload["fido2_aaguid"], dev.aaguid)
169        self.assertEqual(event_payload["friendly_name"], dev.name)
170        self.assertEqual(event_payload["credential_type"], "fido-u2f")
171        self.assertEqual(event.payload["sub_id"]["format"], "complex")
172        self.assertEqual(event.payload["sub_id"]["user"]["format"], "email")
173        self.assertEqual(event.payload["sub_id"]["user"]["email"], user.email)
174
175    def test_signal_policy_ignore(self):
176        """Test event not being created for user that doesn't have access to the application"""
177        PolicyBinding.objects.create(
178            target=self.application, group=Group.objects.create(name=generate_id()), order=0
179        )
180        user = create_test_user()
181        self.client.force_login(user)
182        user.set_password(generate_id())
183        user.save()
184
185        stream = Stream.objects.filter(provider=self.provider).first()
186        self.assertIsNotNone(stream)
187        event = StreamEvent.objects.filter(
188            stream=stream, type=EventTypes.CAEP_CREDENTIAL_CHANGE
189        ).first()
190        self.assertIsNone(event)
class TestSignals(rest_framework.test.APITestCase):
 25class TestSignals(APITestCase):
 26    """Test individual SSF Signals"""
 27
 28    def setUp(self):
 29        self.application = Application.objects.create(name=generate_id(), slug=generate_id())
 30        self.provider = SSFProvider.objects.create(
 31            name=generate_id(),
 32            signing_key=create_test_cert(),
 33            backchannel_application=self.application,
 34        )
 35        res = self.client.post(
 36            reverse(
 37                "authentik_providers_ssf:stream",
 38                kwargs={"application_slug": self.application.slug},
 39            ),
 40            data={
 41                "iss": "https://authentik.company/.well-known/ssf-configuration/foo/5",
 42                "aud": ["https://app.authentik.company"],
 43                "delivery": {
 44                    "method": "https://schemas.openid.net/secevent/risc/delivery-method/push",
 45                    "endpoint_url": "https://app.authentik.company",
 46                },
 47                "events_requested": [
 48                    "https://schemas.openid.net/secevent/caep/event-type/credential-change",
 49                    "https://schemas.openid.net/secevent/caep/event-type/session-revoked",
 50                ],
 51                "format": "iss_sub",
 52            },
 53            HTTP_AUTHORIZATION=f"Bearer {self.provider.token.key}",
 54        )
 55        self.assertEqual(res.status_code, 201, res.content)
 56
 57    def _assert_password_credential_change(self, user, change_type: str):
 58        stream = Stream.objects.filter(provider=self.provider).first()
 59        self.assertIsNotNone(stream)
 60        event = StreamEvent.objects.filter(stream=stream).first()
 61        self.assertIsNotNone(event)
 62        self.assertEqual(event.status, SSFEventStatus.PENDING_FAILED)
 63        event_payload = event.payload["events"][
 64            "https://schemas.openid.net/secevent/caep/event-type/credential-change"
 65        ]
 66        self.assertEqual(event_payload["change_type"], change_type)
 67        self.assertEqual(event_payload["credential_type"], "password")
 68        self.assertEqual(event.payload["sub_id"]["format"], "complex")
 69        self.assertEqual(event.payload["sub_id"]["user"]["format"], "email")
 70        self.assertEqual(event.payload["sub_id"]["user"]["email"], user.email)
 71
 72    def test_signal_logout(self):
 73        """Test user logout"""
 74        user = create_test_user()
 75        self.client.force_login(user)
 76        self.client.logout()
 77
 78        stream = Stream.objects.filter(provider=self.provider).first()
 79        self.assertIsNotNone(stream)
 80        event = StreamEvent.objects.filter(stream=stream).first()
 81        self.assertIsNotNone(event)
 82        self.assertEqual(event.status, SSFEventStatus.PENDING_FAILED)
 83        event_payload = event.payload["events"][
 84            "https://schemas.openid.net/secevent/caep/event-type/session-revoked"
 85        ]
 86        self.assertEqual(event_payload["initiating_entity"], "user")
 87        self.assertEqual(event.payload["sub_id"]["format"], "complex")
 88        self.assertEqual(event.payload["sub_id"]["session"]["format"], "opaque")
 89        self.assertEqual(event.payload["sub_id"]["user"]["format"], "email")
 90        self.assertEqual(event.payload["sub_id"]["user"]["email"], user.email)
 91
 92    def test_signal_password_change(self):
 93        """Test user password change"""
 94        user = create_test_user()
 95        self.client.force_login(user)
 96        user.set_password(generate_id())
 97        user.save()
 98
 99        self._assert_password_credential_change(user, "update")
100
101    def test_signal_password_change_from_hash(self):
102        """Test user password change from a pre-hashed password."""
103        user = create_test_user()
104        self.client.force_login(user)
105        user.set_password_from_hash(make_password(generate_id()))
106        user.save()
107
108        self._assert_password_credential_change(user, "update")
109
110    def test_signal_password_revoke(self):
111        """Test explicit password revoke."""
112        user = create_test_user()
113        self.client.force_login(user)
114        user.set_password(None)
115        user.save()
116
117        self._assert_password_credential_change(user, "revoke")
118
119    def test_signal_authenticator_added(self):
120        """Test authenticator creation signal"""
121        user = create_test_user()
122        self.client.force_login(user)
123        dev = WebAuthnDevice.objects.create(
124            user=user,
125            name=generate_id(),
126            credential_id=generate_id(),
127            public_key=generate_id(),
128            aaguid=str(uuid4()),
129        )
130
131        stream = Stream.objects.filter(provider=self.provider).first()
132        self.assertIsNotNone(stream)
133        event = StreamEvent.objects.filter(stream=stream).exclude().first()
134        self.assertIsNotNone(event)
135        self.assertEqual(event.status, SSFEventStatus.PENDING_FAILED)
136        event_payload = event.payload["events"][
137            "https://schemas.openid.net/secevent/caep/event-type/credential-change"
138        ]
139        self.assertEqual(event_payload["change_type"], "create")
140        self.assertEqual(event_payload["fido2_aaguid"], dev.aaguid)
141        self.assertEqual(event_payload["friendly_name"], dev.name)
142        self.assertEqual(event_payload["credential_type"], "fido-u2f")
143        self.assertEqual(event.payload["sub_id"]["format"], "complex")
144        self.assertEqual(event.payload["sub_id"]["user"]["format"], "email")
145        self.assertEqual(event.payload["sub_id"]["user"]["email"], user.email)
146
147    def test_signal_authenticator_deleted(self):
148        """Test authenticator deletion signal"""
149        user = create_test_user()
150        self.client.force_login(user)
151        dev = WebAuthnDevice.objects.create(
152            user=user,
153            name=generate_id(),
154            credential_id=generate_id(),
155            public_key=generate_id(),
156            aaguid=str(uuid4()),
157        )
158        dev.delete()
159
160        stream = Stream.objects.filter(provider=self.provider).first()
161        self.assertIsNotNone(stream)
162        event = StreamEvent.objects.filter(stream=stream).exclude().first()
163        self.assertIsNotNone(event)
164        self.assertEqual(event.status, SSFEventStatus.PENDING_FAILED)
165        event_payload = event.payload["events"][
166            "https://schemas.openid.net/secevent/caep/event-type/credential-change"
167        ]
168        self.assertEqual(event_payload["change_type"], "delete")
169        self.assertEqual(event_payload["fido2_aaguid"], dev.aaguid)
170        self.assertEqual(event_payload["friendly_name"], dev.name)
171        self.assertEqual(event_payload["credential_type"], "fido-u2f")
172        self.assertEqual(event.payload["sub_id"]["format"], "complex")
173        self.assertEqual(event.payload["sub_id"]["user"]["format"], "email")
174        self.assertEqual(event.payload["sub_id"]["user"]["email"], user.email)
175
176    def test_signal_policy_ignore(self):
177        """Test event not being created for user that doesn't have access to the application"""
178        PolicyBinding.objects.create(
179            target=self.application, group=Group.objects.create(name=generate_id()), order=0
180        )
181        user = create_test_user()
182        self.client.force_login(user)
183        user.set_password(generate_id())
184        user.save()
185
186        stream = Stream.objects.filter(provider=self.provider).first()
187        self.assertIsNotNone(stream)
188        event = StreamEvent.objects.filter(
189            stream=stream, type=EventTypes.CAEP_CREDENTIAL_CHANGE
190        ).first()
191        self.assertIsNone(event)

Test individual SSF Signals

def setUp(self):
28    def setUp(self):
29        self.application = Application.objects.create(name=generate_id(), slug=generate_id())
30        self.provider = SSFProvider.objects.create(
31            name=generate_id(),
32            signing_key=create_test_cert(),
33            backchannel_application=self.application,
34        )
35        res = self.client.post(
36            reverse(
37                "authentik_providers_ssf:stream",
38                kwargs={"application_slug": self.application.slug},
39            ),
40            data={
41                "iss": "https://authentik.company/.well-known/ssf-configuration/foo/5",
42                "aud": ["https://app.authentik.company"],
43                "delivery": {
44                    "method": "https://schemas.openid.net/secevent/risc/delivery-method/push",
45                    "endpoint_url": "https://app.authentik.company",
46                },
47                "events_requested": [
48                    "https://schemas.openid.net/secevent/caep/event-type/credential-change",
49                    "https://schemas.openid.net/secevent/caep/event-type/session-revoked",
50                ],
51                "format": "iss_sub",
52            },
53            HTTP_AUTHORIZATION=f"Bearer {self.provider.token.key}",
54        )
55        self.assertEqual(res.status_code, 201, res.content)

Hook method for setting up the test fixture before exercising it.

def test_signal_logout(self):
72    def test_signal_logout(self):
73        """Test user logout"""
74        user = create_test_user()
75        self.client.force_login(user)
76        self.client.logout()
77
78        stream = Stream.objects.filter(provider=self.provider).first()
79        self.assertIsNotNone(stream)
80        event = StreamEvent.objects.filter(stream=stream).first()
81        self.assertIsNotNone(event)
82        self.assertEqual(event.status, SSFEventStatus.PENDING_FAILED)
83        event_payload = event.payload["events"][
84            "https://schemas.openid.net/secevent/caep/event-type/session-revoked"
85        ]
86        self.assertEqual(event_payload["initiating_entity"], "user")
87        self.assertEqual(event.payload["sub_id"]["format"], "complex")
88        self.assertEqual(event.payload["sub_id"]["session"]["format"], "opaque")
89        self.assertEqual(event.payload["sub_id"]["user"]["format"], "email")
90        self.assertEqual(event.payload["sub_id"]["user"]["email"], user.email)

Test user logout

def test_signal_password_change(self):
92    def test_signal_password_change(self):
93        """Test user password change"""
94        user = create_test_user()
95        self.client.force_login(user)
96        user.set_password(generate_id())
97        user.save()
98
99        self._assert_password_credential_change(user, "update")

Test user password change

def test_signal_password_change_from_hash(self):
101    def test_signal_password_change_from_hash(self):
102        """Test user password change from a pre-hashed password."""
103        user = create_test_user()
104        self.client.force_login(user)
105        user.set_password_from_hash(make_password(generate_id()))
106        user.save()
107
108        self._assert_password_credential_change(user, "update")

Test user password change from a pre-hashed password.

def test_signal_password_revoke(self):
110    def test_signal_password_revoke(self):
111        """Test explicit password revoke."""
112        user = create_test_user()
113        self.client.force_login(user)
114        user.set_password(None)
115        user.save()
116
117        self._assert_password_credential_change(user, "revoke")

Test explicit password revoke.

def test_signal_authenticator_added(self):
119    def test_signal_authenticator_added(self):
120        """Test authenticator creation signal"""
121        user = create_test_user()
122        self.client.force_login(user)
123        dev = WebAuthnDevice.objects.create(
124            user=user,
125            name=generate_id(),
126            credential_id=generate_id(),
127            public_key=generate_id(),
128            aaguid=str(uuid4()),
129        )
130
131        stream = Stream.objects.filter(provider=self.provider).first()
132        self.assertIsNotNone(stream)
133        event = StreamEvent.objects.filter(stream=stream).exclude().first()
134        self.assertIsNotNone(event)
135        self.assertEqual(event.status, SSFEventStatus.PENDING_FAILED)
136        event_payload = event.payload["events"][
137            "https://schemas.openid.net/secevent/caep/event-type/credential-change"
138        ]
139        self.assertEqual(event_payload["change_type"], "create")
140        self.assertEqual(event_payload["fido2_aaguid"], dev.aaguid)
141        self.assertEqual(event_payload["friendly_name"], dev.name)
142        self.assertEqual(event_payload["credential_type"], "fido-u2f")
143        self.assertEqual(event.payload["sub_id"]["format"], "complex")
144        self.assertEqual(event.payload["sub_id"]["user"]["format"], "email")
145        self.assertEqual(event.payload["sub_id"]["user"]["email"], user.email)

Test authenticator creation signal

def test_signal_authenticator_deleted(self):
147    def test_signal_authenticator_deleted(self):
148        """Test authenticator deletion signal"""
149        user = create_test_user()
150        self.client.force_login(user)
151        dev = WebAuthnDevice.objects.create(
152            user=user,
153            name=generate_id(),
154            credential_id=generate_id(),
155            public_key=generate_id(),
156            aaguid=str(uuid4()),
157        )
158        dev.delete()
159
160        stream = Stream.objects.filter(provider=self.provider).first()
161        self.assertIsNotNone(stream)
162        event = StreamEvent.objects.filter(stream=stream).exclude().first()
163        self.assertIsNotNone(event)
164        self.assertEqual(event.status, SSFEventStatus.PENDING_FAILED)
165        event_payload = event.payload["events"][
166            "https://schemas.openid.net/secevent/caep/event-type/credential-change"
167        ]
168        self.assertEqual(event_payload["change_type"], "delete")
169        self.assertEqual(event_payload["fido2_aaguid"], dev.aaguid)
170        self.assertEqual(event_payload["friendly_name"], dev.name)
171        self.assertEqual(event_payload["credential_type"], "fido-u2f")
172        self.assertEqual(event.payload["sub_id"]["format"], "complex")
173        self.assertEqual(event.payload["sub_id"]["user"]["format"], "email")
174        self.assertEqual(event.payload["sub_id"]["user"]["email"], user.email)

Test authenticator deletion signal

def test_signal_policy_ignore(self):
176    def test_signal_policy_ignore(self):
177        """Test event not being created for user that doesn't have access to the application"""
178        PolicyBinding.objects.create(
179            target=self.application, group=Group.objects.create(name=generate_id()), order=0
180        )
181        user = create_test_user()
182        self.client.force_login(user)
183        user.set_password(generate_id())
184        user.save()
185
186        stream = Stream.objects.filter(provider=self.provider).first()
187        self.assertIsNotNone(stream)
188        event = StreamEvent.objects.filter(
189            stream=stream, type=EventTypes.CAEP_CREDENTIAL_CHANGE
190        ).first()
191        self.assertIsNone(event)

Test event not being created for user that doesn't have access to the application