authentik.enterprise.providers.ssf.tests.test_signals
1from uuid import uuid4 2 3from django.contrib.auth.hashers import make_password 4from django.urls import reverse 5from rest_framework.test import APITestCase 6 7from authentik.core.models import Application, Group 8from authentik.core.tests.utils import ( 9 create_test_cert, 10 create_test_user, 11) 12from authentik.enterprise.providers.ssf.models import ( 13 EventTypes, 14 SSFEventStatus, 15 SSFProvider, 16 Stream, 17 StreamEvent, 18) 19from authentik.lib.generators import generate_id 20from authentik.policies.models import PolicyBinding 21from authentik.stages.authenticator_webauthn.models import WebAuthnDevice 22 23 24class TestSignals(APITestCase): 25 """Test individual SSF Signals""" 26 27 def setUp(self): 28 self.application = Application.objects.create(name=generate_id(), slug=generate_id()) 29 self.provider = SSFProvider.objects.create( 30 name=generate_id(), 31 signing_key=create_test_cert(), 32 backchannel_application=self.application, 33 ) 34 res = self.client.post( 35 reverse( 36 "authentik_providers_ssf:stream", 37 kwargs={"application_slug": self.application.slug}, 38 ), 39 data={ 40 "iss": "https://authentik.company/.well-known/ssf-configuration/foo/5", 41 "aud": ["https://app.authentik.company"], 42 "delivery": { 43 "method": "https://schemas.openid.net/secevent/risc/delivery-method/push", 44 "endpoint_url": "https://app.authentik.company", 45 }, 46 "events_requested": [ 47 "https://schemas.openid.net/secevent/caep/event-type/credential-change", 48 "https://schemas.openid.net/secevent/caep/event-type/session-revoked", 49 ], 50 "format": "iss_sub", 51 }, 52 HTTP_AUTHORIZATION=f"Bearer {self.provider.token.key}", 53 ) 54 self.assertEqual(res.status_code, 201, res.content) 55 56 def _assert_password_credential_change(self, user, change_type: str): 57 stream = Stream.objects.filter(provider=self.provider).first() 58 self.assertIsNotNone(stream) 59 event = StreamEvent.objects.filter(stream=stream).first() 60 self.assertIsNotNone(event) 61 self.assertEqual(event.status, SSFEventStatus.PENDING_FAILED) 62 event_payload = event.payload["events"][ 63 "https://schemas.openid.net/secevent/caep/event-type/credential-change" 64 ] 65 self.assertEqual(event_payload["change_type"], change_type) 66 self.assertEqual(event_payload["credential_type"], "password") 67 self.assertEqual(event.payload["sub_id"]["format"], "complex") 68 self.assertEqual(event.payload["sub_id"]["user"]["format"], "email") 69 self.assertEqual(event.payload["sub_id"]["user"]["email"], user.email) 70 71 def test_signal_logout(self): 72 """Test user logout""" 73 user = create_test_user() 74 self.client.force_login(user) 75 self.client.logout() 76 77 stream = Stream.objects.filter(provider=self.provider).first() 78 self.assertIsNotNone(stream) 79 event = StreamEvent.objects.filter(stream=stream).first() 80 self.assertIsNotNone(event) 81 self.assertEqual(event.status, SSFEventStatus.PENDING_FAILED) 82 event_payload = event.payload["events"][ 83 "https://schemas.openid.net/secevent/caep/event-type/session-revoked" 84 ] 85 self.assertEqual(event_payload["initiating_entity"], "user") 86 self.assertEqual(event.payload["sub_id"]["format"], "complex") 87 self.assertEqual(event.payload["sub_id"]["session"]["format"], "opaque") 88 self.assertEqual(event.payload["sub_id"]["user"]["format"], "email") 89 self.assertEqual(event.payload["sub_id"]["user"]["email"], user.email) 90 91 def test_signal_password_change(self): 92 """Test user password change""" 93 user = create_test_user() 94 self.client.force_login(user) 95 user.set_password(generate_id()) 96 user.save() 97 98 self._assert_password_credential_change(user, "update") 99 100 def test_signal_password_change_from_hash(self): 101 """Test user password change from a pre-hashed password.""" 102 user = create_test_user() 103 self.client.force_login(user) 104 user.set_password_from_hash(make_password(generate_id())) 105 user.save() 106 107 self._assert_password_credential_change(user, "update") 108 109 def test_signal_password_revoke(self): 110 """Test explicit password revoke.""" 111 user = create_test_user() 112 self.client.force_login(user) 113 user.set_password(None) 114 user.save() 115 116 self._assert_password_credential_change(user, "revoke") 117 118 def test_signal_authenticator_added(self): 119 """Test authenticator creation signal""" 120 user = create_test_user() 121 self.client.force_login(user) 122 dev = WebAuthnDevice.objects.create( 123 user=user, 124 name=generate_id(), 125 credential_id=generate_id(), 126 public_key=generate_id(), 127 aaguid=str(uuid4()), 128 ) 129 130 stream = Stream.objects.filter(provider=self.provider).first() 131 self.assertIsNotNone(stream) 132 event = StreamEvent.objects.filter(stream=stream).exclude().first() 133 self.assertIsNotNone(event) 134 self.assertEqual(event.status, SSFEventStatus.PENDING_FAILED) 135 event_payload = event.payload["events"][ 136 "https://schemas.openid.net/secevent/caep/event-type/credential-change" 137 ] 138 self.assertEqual(event_payload["change_type"], "create") 139 self.assertEqual(event_payload["fido2_aaguid"], dev.aaguid) 140 self.assertEqual(event_payload["friendly_name"], dev.name) 141 self.assertEqual(event_payload["credential_type"], "fido-u2f") 142 self.assertEqual(event.payload["sub_id"]["format"], "complex") 143 self.assertEqual(event.payload["sub_id"]["user"]["format"], "email") 144 self.assertEqual(event.payload["sub_id"]["user"]["email"], user.email) 145 146 def test_signal_authenticator_deleted(self): 147 """Test authenticator deletion signal""" 148 user = create_test_user() 149 self.client.force_login(user) 150 dev = WebAuthnDevice.objects.create( 151 user=user, 152 name=generate_id(), 153 credential_id=generate_id(), 154 public_key=generate_id(), 155 aaguid=str(uuid4()), 156 ) 157 dev.delete() 158 159 stream = Stream.objects.filter(provider=self.provider).first() 160 self.assertIsNotNone(stream) 161 event = StreamEvent.objects.filter(stream=stream).exclude().first() 162 self.assertIsNotNone(event) 163 self.assertEqual(event.status, SSFEventStatus.PENDING_FAILED) 164 event_payload = event.payload["events"][ 165 "https://schemas.openid.net/secevent/caep/event-type/credential-change" 166 ] 167 self.assertEqual(event_payload["change_type"], "delete") 168 self.assertEqual(event_payload["fido2_aaguid"], dev.aaguid) 169 self.assertEqual(event_payload["friendly_name"], dev.name) 170 self.assertEqual(event_payload["credential_type"], "fido-u2f") 171 self.assertEqual(event.payload["sub_id"]["format"], "complex") 172 self.assertEqual(event.payload["sub_id"]["user"]["format"], "email") 173 self.assertEqual(event.payload["sub_id"]["user"]["email"], user.email) 174 175 def test_signal_policy_ignore(self): 176 """Test event not being created for user that doesn't have access to the application""" 177 PolicyBinding.objects.create( 178 target=self.application, group=Group.objects.create(name=generate_id()), order=0 179 ) 180 user = create_test_user() 181 self.client.force_login(user) 182 user.set_password(generate_id()) 183 user.save() 184 185 stream = Stream.objects.filter(provider=self.provider).first() 186 self.assertIsNotNone(stream) 187 event = StreamEvent.objects.filter( 188 stream=stream, type=EventTypes.CAEP_CREDENTIAL_CHANGE 189 ).first() 190 self.assertIsNone(event)
class
TestSignals(rest_framework.test.APITestCase):
25class TestSignals(APITestCase): 26 """Test individual SSF Signals""" 27 28 def setUp(self): 29 self.application = Application.objects.create(name=generate_id(), slug=generate_id()) 30 self.provider = SSFProvider.objects.create( 31 name=generate_id(), 32 signing_key=create_test_cert(), 33 backchannel_application=self.application, 34 ) 35 res = self.client.post( 36 reverse( 37 "authentik_providers_ssf:stream", 38 kwargs={"application_slug": self.application.slug}, 39 ), 40 data={ 41 "iss": "https://authentik.company/.well-known/ssf-configuration/foo/5", 42 "aud": ["https://app.authentik.company"], 43 "delivery": { 44 "method": "https://schemas.openid.net/secevent/risc/delivery-method/push", 45 "endpoint_url": "https://app.authentik.company", 46 }, 47 "events_requested": [ 48 "https://schemas.openid.net/secevent/caep/event-type/credential-change", 49 "https://schemas.openid.net/secevent/caep/event-type/session-revoked", 50 ], 51 "format": "iss_sub", 52 }, 53 HTTP_AUTHORIZATION=f"Bearer {self.provider.token.key}", 54 ) 55 self.assertEqual(res.status_code, 201, res.content) 56 57 def _assert_password_credential_change(self, user, change_type: str): 58 stream = Stream.objects.filter(provider=self.provider).first() 59 self.assertIsNotNone(stream) 60 event = StreamEvent.objects.filter(stream=stream).first() 61 self.assertIsNotNone(event) 62 self.assertEqual(event.status, SSFEventStatus.PENDING_FAILED) 63 event_payload = event.payload["events"][ 64 "https://schemas.openid.net/secevent/caep/event-type/credential-change" 65 ] 66 self.assertEqual(event_payload["change_type"], change_type) 67 self.assertEqual(event_payload["credential_type"], "password") 68 self.assertEqual(event.payload["sub_id"]["format"], "complex") 69 self.assertEqual(event.payload["sub_id"]["user"]["format"], "email") 70 self.assertEqual(event.payload["sub_id"]["user"]["email"], user.email) 71 72 def test_signal_logout(self): 73 """Test user logout""" 74 user = create_test_user() 75 self.client.force_login(user) 76 self.client.logout() 77 78 stream = Stream.objects.filter(provider=self.provider).first() 79 self.assertIsNotNone(stream) 80 event = StreamEvent.objects.filter(stream=stream).first() 81 self.assertIsNotNone(event) 82 self.assertEqual(event.status, SSFEventStatus.PENDING_FAILED) 83 event_payload = event.payload["events"][ 84 "https://schemas.openid.net/secevent/caep/event-type/session-revoked" 85 ] 86 self.assertEqual(event_payload["initiating_entity"], "user") 87 self.assertEqual(event.payload["sub_id"]["format"], "complex") 88 self.assertEqual(event.payload["sub_id"]["session"]["format"], "opaque") 89 self.assertEqual(event.payload["sub_id"]["user"]["format"], "email") 90 self.assertEqual(event.payload["sub_id"]["user"]["email"], user.email) 91 92 def test_signal_password_change(self): 93 """Test user password change""" 94 user = create_test_user() 95 self.client.force_login(user) 96 user.set_password(generate_id()) 97 user.save() 98 99 self._assert_password_credential_change(user, "update") 100 101 def test_signal_password_change_from_hash(self): 102 """Test user password change from a pre-hashed password.""" 103 user = create_test_user() 104 self.client.force_login(user) 105 user.set_password_from_hash(make_password(generate_id())) 106 user.save() 107 108 self._assert_password_credential_change(user, "update") 109 110 def test_signal_password_revoke(self): 111 """Test explicit password revoke.""" 112 user = create_test_user() 113 self.client.force_login(user) 114 user.set_password(None) 115 user.save() 116 117 self._assert_password_credential_change(user, "revoke") 118 119 def test_signal_authenticator_added(self): 120 """Test authenticator creation signal""" 121 user = create_test_user() 122 self.client.force_login(user) 123 dev = WebAuthnDevice.objects.create( 124 user=user, 125 name=generate_id(), 126 credential_id=generate_id(), 127 public_key=generate_id(), 128 aaguid=str(uuid4()), 129 ) 130 131 stream = Stream.objects.filter(provider=self.provider).first() 132 self.assertIsNotNone(stream) 133 event = StreamEvent.objects.filter(stream=stream).exclude().first() 134 self.assertIsNotNone(event) 135 self.assertEqual(event.status, SSFEventStatus.PENDING_FAILED) 136 event_payload = event.payload["events"][ 137 "https://schemas.openid.net/secevent/caep/event-type/credential-change" 138 ] 139 self.assertEqual(event_payload["change_type"], "create") 140 self.assertEqual(event_payload["fido2_aaguid"], dev.aaguid) 141 self.assertEqual(event_payload["friendly_name"], dev.name) 142 self.assertEqual(event_payload["credential_type"], "fido-u2f") 143 self.assertEqual(event.payload["sub_id"]["format"], "complex") 144 self.assertEqual(event.payload["sub_id"]["user"]["format"], "email") 145 self.assertEqual(event.payload["sub_id"]["user"]["email"], user.email) 146 147 def test_signal_authenticator_deleted(self): 148 """Test authenticator deletion signal""" 149 user = create_test_user() 150 self.client.force_login(user) 151 dev = WebAuthnDevice.objects.create( 152 user=user, 153 name=generate_id(), 154 credential_id=generate_id(), 155 public_key=generate_id(), 156 aaguid=str(uuid4()), 157 ) 158 dev.delete() 159 160 stream = Stream.objects.filter(provider=self.provider).first() 161 self.assertIsNotNone(stream) 162 event = StreamEvent.objects.filter(stream=stream).exclude().first() 163 self.assertIsNotNone(event) 164 self.assertEqual(event.status, SSFEventStatus.PENDING_FAILED) 165 event_payload = event.payload["events"][ 166 "https://schemas.openid.net/secevent/caep/event-type/credential-change" 167 ] 168 self.assertEqual(event_payload["change_type"], "delete") 169 self.assertEqual(event_payload["fido2_aaguid"], dev.aaguid) 170 self.assertEqual(event_payload["friendly_name"], dev.name) 171 self.assertEqual(event_payload["credential_type"], "fido-u2f") 172 self.assertEqual(event.payload["sub_id"]["format"], "complex") 173 self.assertEqual(event.payload["sub_id"]["user"]["format"], "email") 174 self.assertEqual(event.payload["sub_id"]["user"]["email"], user.email) 175 176 def test_signal_policy_ignore(self): 177 """Test event not being created for user that doesn't have access to the application""" 178 PolicyBinding.objects.create( 179 target=self.application, group=Group.objects.create(name=generate_id()), order=0 180 ) 181 user = create_test_user() 182 self.client.force_login(user) 183 user.set_password(generate_id()) 184 user.save() 185 186 stream = Stream.objects.filter(provider=self.provider).first() 187 self.assertIsNotNone(stream) 188 event = StreamEvent.objects.filter( 189 stream=stream, type=EventTypes.CAEP_CREDENTIAL_CHANGE 190 ).first() 191 self.assertIsNone(event)
Test individual SSF Signals
def
setUp(self):
28 def setUp(self): 29 self.application = Application.objects.create(name=generate_id(), slug=generate_id()) 30 self.provider = SSFProvider.objects.create( 31 name=generate_id(), 32 signing_key=create_test_cert(), 33 backchannel_application=self.application, 34 ) 35 res = self.client.post( 36 reverse( 37 "authentik_providers_ssf:stream", 38 kwargs={"application_slug": self.application.slug}, 39 ), 40 data={ 41 "iss": "https://authentik.company/.well-known/ssf-configuration/foo/5", 42 "aud": ["https://app.authentik.company"], 43 "delivery": { 44 "method": "https://schemas.openid.net/secevent/risc/delivery-method/push", 45 "endpoint_url": "https://app.authentik.company", 46 }, 47 "events_requested": [ 48 "https://schemas.openid.net/secevent/caep/event-type/credential-change", 49 "https://schemas.openid.net/secevent/caep/event-type/session-revoked", 50 ], 51 "format": "iss_sub", 52 }, 53 HTTP_AUTHORIZATION=f"Bearer {self.provider.token.key}", 54 ) 55 self.assertEqual(res.status_code, 201, res.content)
Hook method for setting up the test fixture before exercising it.
def
test_signal_logout(self):
72 def test_signal_logout(self): 73 """Test user logout""" 74 user = create_test_user() 75 self.client.force_login(user) 76 self.client.logout() 77 78 stream = Stream.objects.filter(provider=self.provider).first() 79 self.assertIsNotNone(stream) 80 event = StreamEvent.objects.filter(stream=stream).first() 81 self.assertIsNotNone(event) 82 self.assertEqual(event.status, SSFEventStatus.PENDING_FAILED) 83 event_payload = event.payload["events"][ 84 "https://schemas.openid.net/secevent/caep/event-type/session-revoked" 85 ] 86 self.assertEqual(event_payload["initiating_entity"], "user") 87 self.assertEqual(event.payload["sub_id"]["format"], "complex") 88 self.assertEqual(event.payload["sub_id"]["session"]["format"], "opaque") 89 self.assertEqual(event.payload["sub_id"]["user"]["format"], "email") 90 self.assertEqual(event.payload["sub_id"]["user"]["email"], user.email)
Test user logout
def
test_signal_password_change(self):
92 def test_signal_password_change(self): 93 """Test user password change""" 94 user = create_test_user() 95 self.client.force_login(user) 96 user.set_password(generate_id()) 97 user.save() 98 99 self._assert_password_credential_change(user, "update")
Test user password change
def
test_signal_password_change_from_hash(self):
101 def test_signal_password_change_from_hash(self): 102 """Test user password change from a pre-hashed password.""" 103 user = create_test_user() 104 self.client.force_login(user) 105 user.set_password_from_hash(make_password(generate_id())) 106 user.save() 107 108 self._assert_password_credential_change(user, "update")
Test user password change from a pre-hashed password.
def
test_signal_password_revoke(self):
110 def test_signal_password_revoke(self): 111 """Test explicit password revoke.""" 112 user = create_test_user() 113 self.client.force_login(user) 114 user.set_password(None) 115 user.save() 116 117 self._assert_password_credential_change(user, "revoke")
Test explicit password revoke.
def
test_signal_authenticator_added(self):
119 def test_signal_authenticator_added(self): 120 """Test authenticator creation signal""" 121 user = create_test_user() 122 self.client.force_login(user) 123 dev = WebAuthnDevice.objects.create( 124 user=user, 125 name=generate_id(), 126 credential_id=generate_id(), 127 public_key=generate_id(), 128 aaguid=str(uuid4()), 129 ) 130 131 stream = Stream.objects.filter(provider=self.provider).first() 132 self.assertIsNotNone(stream) 133 event = StreamEvent.objects.filter(stream=stream).exclude().first() 134 self.assertIsNotNone(event) 135 self.assertEqual(event.status, SSFEventStatus.PENDING_FAILED) 136 event_payload = event.payload["events"][ 137 "https://schemas.openid.net/secevent/caep/event-type/credential-change" 138 ] 139 self.assertEqual(event_payload["change_type"], "create") 140 self.assertEqual(event_payload["fido2_aaguid"], dev.aaguid) 141 self.assertEqual(event_payload["friendly_name"], dev.name) 142 self.assertEqual(event_payload["credential_type"], "fido-u2f") 143 self.assertEqual(event.payload["sub_id"]["format"], "complex") 144 self.assertEqual(event.payload["sub_id"]["user"]["format"], "email") 145 self.assertEqual(event.payload["sub_id"]["user"]["email"], user.email)
Test authenticator creation signal
def
test_signal_authenticator_deleted(self):
147 def test_signal_authenticator_deleted(self): 148 """Test authenticator deletion signal""" 149 user = create_test_user() 150 self.client.force_login(user) 151 dev = WebAuthnDevice.objects.create( 152 user=user, 153 name=generate_id(), 154 credential_id=generate_id(), 155 public_key=generate_id(), 156 aaguid=str(uuid4()), 157 ) 158 dev.delete() 159 160 stream = Stream.objects.filter(provider=self.provider).first() 161 self.assertIsNotNone(stream) 162 event = StreamEvent.objects.filter(stream=stream).exclude().first() 163 self.assertIsNotNone(event) 164 self.assertEqual(event.status, SSFEventStatus.PENDING_FAILED) 165 event_payload = event.payload["events"][ 166 "https://schemas.openid.net/secevent/caep/event-type/credential-change" 167 ] 168 self.assertEqual(event_payload["change_type"], "delete") 169 self.assertEqual(event_payload["fido2_aaguid"], dev.aaguid) 170 self.assertEqual(event_payload["friendly_name"], dev.name) 171 self.assertEqual(event_payload["credential_type"], "fido-u2f") 172 self.assertEqual(event.payload["sub_id"]["format"], "complex") 173 self.assertEqual(event.payload["sub_id"]["user"]["format"], "email") 174 self.assertEqual(event.payload["sub_id"]["user"]["email"], user.email)
Test authenticator deletion signal
def
test_signal_policy_ignore(self):
176 def test_signal_policy_ignore(self): 177 """Test event not being created for user that doesn't have access to the application""" 178 PolicyBinding.objects.create( 179 target=self.application, group=Group.objects.create(name=generate_id()), order=0 180 ) 181 user = create_test_user() 182 self.client.force_login(user) 183 user.set_password(generate_id()) 184 user.save() 185 186 stream = Stream.objects.filter(provider=self.provider).first() 187 self.assertIsNotNone(stream) 188 event = StreamEvent.objects.filter( 189 stream=stream, type=EventTypes.CAEP_CREDENTIAL_CHANGE 190 ).first() 191 self.assertIsNone(event)
Test event not being created for user that doesn't have access to the application