authentik.enterprise.providers.ssf.tests.test_signals
1from uuid import uuid4 2 3from django.urls import reverse 4from rest_framework.test import APITestCase 5 6from authentik.core.models import Application, Group 7from authentik.core.tests.utils import ( 8 create_test_cert, 9 create_test_user, 10) 11from authentik.enterprise.providers.ssf.models import ( 12 EventTypes, 13 SSFEventStatus, 14 SSFProvider, 15 Stream, 16 StreamEvent, 17) 18from authentik.lib.generators import generate_id 19from authentik.policies.models import PolicyBinding 20from authentik.stages.authenticator_webauthn.models import WebAuthnDevice 21 22 23class TestSignals(APITestCase): 24 """Test individual SSF Signals""" 25 26 def setUp(self): 27 self.application = Application.objects.create(name=generate_id(), slug=generate_id()) 28 self.provider = SSFProvider.objects.create( 29 name=generate_id(), 30 signing_key=create_test_cert(), 31 backchannel_application=self.application, 32 ) 33 res = self.client.post( 34 reverse( 35 "authentik_providers_ssf:stream", 36 kwargs={"application_slug": self.application.slug}, 37 ), 38 data={ 39 "iss": "https://authentik.company/.well-known/ssf-configuration/foo/5", 40 "aud": ["https://app.authentik.company"], 41 "delivery": { 42 "method": "https://schemas.openid.net/secevent/risc/delivery-method/push", 43 "endpoint_url": "https://app.authentik.company", 44 }, 45 "events_requested": [ 46 "https://schemas.openid.net/secevent/caep/event-type/credential-change", 47 "https://schemas.openid.net/secevent/caep/event-type/session-revoked", 48 ], 49 "format": "iss_sub", 50 }, 51 HTTP_AUTHORIZATION=f"Bearer {self.provider.token.key}", 52 ) 53 self.assertEqual(res.status_code, 201, res.content) 54 55 def test_signal_logout(self): 56 """Test user logout""" 57 user = create_test_user() 58 self.client.force_login(user) 59 self.client.logout() 60 61 stream = Stream.objects.filter(provider=self.provider).first() 62 self.assertIsNotNone(stream) 63 event = StreamEvent.objects.filter(stream=stream).first() 64 self.assertIsNotNone(event) 65 self.assertEqual(event.status, SSFEventStatus.PENDING_FAILED) 66 event_payload = event.payload["events"][ 67 "https://schemas.openid.net/secevent/caep/event-type/session-revoked" 68 ] 69 self.assertEqual(event_payload["initiating_entity"], "user") 70 self.assertEqual(event.payload["sub_id"]["format"], "complex") 71 self.assertEqual(event.payload["sub_id"]["session"]["format"], "opaque") 72 self.assertEqual(event.payload["sub_id"]["user"]["format"], "email") 73 self.assertEqual(event.payload["sub_id"]["user"]["email"], user.email) 74 75 def test_signal_password_change(self): 76 """Test user password change""" 77 user = create_test_user() 78 self.client.force_login(user) 79 user.set_password(generate_id()) 80 user.save() 81 82 stream = Stream.objects.filter(provider=self.provider).first() 83 self.assertIsNotNone(stream) 84 event = StreamEvent.objects.filter(stream=stream).first() 85 self.assertIsNotNone(event) 86 self.assertEqual(event.status, SSFEventStatus.PENDING_FAILED) 87 event_payload = event.payload["events"][ 88 "https://schemas.openid.net/secevent/caep/event-type/credential-change" 89 ] 90 self.assertEqual(event_payload["change_type"], "update") 91 self.assertEqual(event_payload["credential_type"], "password") 92 self.assertEqual(event.payload["sub_id"]["format"], "complex") 93 self.assertEqual(event.payload["sub_id"]["user"]["format"], "email") 94 self.assertEqual(event.payload["sub_id"]["user"]["email"], user.email) 95 96 def test_signal_authenticator_added(self): 97 """Test authenticator creation signal""" 98 user = create_test_user() 99 self.client.force_login(user) 100 dev = WebAuthnDevice.objects.create( 101 user=user, 102 name=generate_id(), 103 credential_id=generate_id(), 104 public_key=generate_id(), 105 aaguid=str(uuid4()), 106 ) 107 108 stream = Stream.objects.filter(provider=self.provider).first() 109 self.assertIsNotNone(stream) 110 event = StreamEvent.objects.filter(stream=stream).exclude().first() 111 self.assertIsNotNone(event) 112 self.assertEqual(event.status, SSFEventStatus.PENDING_FAILED) 113 event_payload = event.payload["events"][ 114 "https://schemas.openid.net/secevent/caep/event-type/credential-change" 115 ] 116 self.assertEqual(event_payload["change_type"], "create") 117 self.assertEqual(event_payload["fido2_aaguid"], dev.aaguid) 118 self.assertEqual(event_payload["friendly_name"], dev.name) 119 self.assertEqual(event_payload["credential_type"], "fido-u2f") 120 self.assertEqual(event.payload["sub_id"]["format"], "complex") 121 self.assertEqual(event.payload["sub_id"]["user"]["format"], "email") 122 self.assertEqual(event.payload["sub_id"]["user"]["email"], user.email) 123 124 def test_signal_authenticator_deleted(self): 125 """Test authenticator deletion signal""" 126 user = create_test_user() 127 self.client.force_login(user) 128 dev = WebAuthnDevice.objects.create( 129 user=user, 130 name=generate_id(), 131 credential_id=generate_id(), 132 public_key=generate_id(), 133 aaguid=str(uuid4()), 134 ) 135 dev.delete() 136 137 stream = Stream.objects.filter(provider=self.provider).first() 138 self.assertIsNotNone(stream) 139 event = StreamEvent.objects.filter(stream=stream).exclude().first() 140 self.assertIsNotNone(event) 141 self.assertEqual(event.status, SSFEventStatus.PENDING_FAILED) 142 event_payload = event.payload["events"][ 143 "https://schemas.openid.net/secevent/caep/event-type/credential-change" 144 ] 145 self.assertEqual(event_payload["change_type"], "delete") 146 self.assertEqual(event_payload["fido2_aaguid"], dev.aaguid) 147 self.assertEqual(event_payload["friendly_name"], dev.name) 148 self.assertEqual(event_payload["credential_type"], "fido-u2f") 149 self.assertEqual(event.payload["sub_id"]["format"], "complex") 150 self.assertEqual(event.payload["sub_id"]["user"]["format"], "email") 151 self.assertEqual(event.payload["sub_id"]["user"]["email"], user.email) 152 153 def test_signal_policy_ignore(self): 154 """Test event not being created for user that doesn't have access to the application""" 155 PolicyBinding.objects.create( 156 target=self.application, group=Group.objects.create(name=generate_id()), order=0 157 ) 158 user = create_test_user() 159 self.client.force_login(user) 160 user.set_password(generate_id()) 161 user.save() 162 163 stream = Stream.objects.filter(provider=self.provider).first() 164 self.assertIsNotNone(stream) 165 event = StreamEvent.objects.filter( 166 stream=stream, type=EventTypes.CAEP_CREDENTIAL_CHANGE 167 ).first() 168 self.assertIsNone(event)
class
TestSignals(rest_framework.test.APITestCase):
24class TestSignals(APITestCase): 25 """Test individual SSF Signals""" 26 27 def setUp(self): 28 self.application = Application.objects.create(name=generate_id(), slug=generate_id()) 29 self.provider = SSFProvider.objects.create( 30 name=generate_id(), 31 signing_key=create_test_cert(), 32 backchannel_application=self.application, 33 ) 34 res = self.client.post( 35 reverse( 36 "authentik_providers_ssf:stream", 37 kwargs={"application_slug": self.application.slug}, 38 ), 39 data={ 40 "iss": "https://authentik.company/.well-known/ssf-configuration/foo/5", 41 "aud": ["https://app.authentik.company"], 42 "delivery": { 43 "method": "https://schemas.openid.net/secevent/risc/delivery-method/push", 44 "endpoint_url": "https://app.authentik.company", 45 }, 46 "events_requested": [ 47 "https://schemas.openid.net/secevent/caep/event-type/credential-change", 48 "https://schemas.openid.net/secevent/caep/event-type/session-revoked", 49 ], 50 "format": "iss_sub", 51 }, 52 HTTP_AUTHORIZATION=f"Bearer {self.provider.token.key}", 53 ) 54 self.assertEqual(res.status_code, 201, res.content) 55 56 def test_signal_logout(self): 57 """Test user logout""" 58 user = create_test_user() 59 self.client.force_login(user) 60 self.client.logout() 61 62 stream = Stream.objects.filter(provider=self.provider).first() 63 self.assertIsNotNone(stream) 64 event = StreamEvent.objects.filter(stream=stream).first() 65 self.assertIsNotNone(event) 66 self.assertEqual(event.status, SSFEventStatus.PENDING_FAILED) 67 event_payload = event.payload["events"][ 68 "https://schemas.openid.net/secevent/caep/event-type/session-revoked" 69 ] 70 self.assertEqual(event_payload["initiating_entity"], "user") 71 self.assertEqual(event.payload["sub_id"]["format"], "complex") 72 self.assertEqual(event.payload["sub_id"]["session"]["format"], "opaque") 73 self.assertEqual(event.payload["sub_id"]["user"]["format"], "email") 74 self.assertEqual(event.payload["sub_id"]["user"]["email"], user.email) 75 76 def test_signal_password_change(self): 77 """Test user password change""" 78 user = create_test_user() 79 self.client.force_login(user) 80 user.set_password(generate_id()) 81 user.save() 82 83 stream = Stream.objects.filter(provider=self.provider).first() 84 self.assertIsNotNone(stream) 85 event = StreamEvent.objects.filter(stream=stream).first() 86 self.assertIsNotNone(event) 87 self.assertEqual(event.status, SSFEventStatus.PENDING_FAILED) 88 event_payload = event.payload["events"][ 89 "https://schemas.openid.net/secevent/caep/event-type/credential-change" 90 ] 91 self.assertEqual(event_payload["change_type"], "update") 92 self.assertEqual(event_payload["credential_type"], "password") 93 self.assertEqual(event.payload["sub_id"]["format"], "complex") 94 self.assertEqual(event.payload["sub_id"]["user"]["format"], "email") 95 self.assertEqual(event.payload["sub_id"]["user"]["email"], user.email) 96 97 def test_signal_authenticator_added(self): 98 """Test authenticator creation signal""" 99 user = create_test_user() 100 self.client.force_login(user) 101 dev = WebAuthnDevice.objects.create( 102 user=user, 103 name=generate_id(), 104 credential_id=generate_id(), 105 public_key=generate_id(), 106 aaguid=str(uuid4()), 107 ) 108 109 stream = Stream.objects.filter(provider=self.provider).first() 110 self.assertIsNotNone(stream) 111 event = StreamEvent.objects.filter(stream=stream).exclude().first() 112 self.assertIsNotNone(event) 113 self.assertEqual(event.status, SSFEventStatus.PENDING_FAILED) 114 event_payload = event.payload["events"][ 115 "https://schemas.openid.net/secevent/caep/event-type/credential-change" 116 ] 117 self.assertEqual(event_payload["change_type"], "create") 118 self.assertEqual(event_payload["fido2_aaguid"], dev.aaguid) 119 self.assertEqual(event_payload["friendly_name"], dev.name) 120 self.assertEqual(event_payload["credential_type"], "fido-u2f") 121 self.assertEqual(event.payload["sub_id"]["format"], "complex") 122 self.assertEqual(event.payload["sub_id"]["user"]["format"], "email") 123 self.assertEqual(event.payload["sub_id"]["user"]["email"], user.email) 124 125 def test_signal_authenticator_deleted(self): 126 """Test authenticator deletion signal""" 127 user = create_test_user() 128 self.client.force_login(user) 129 dev = WebAuthnDevice.objects.create( 130 user=user, 131 name=generate_id(), 132 credential_id=generate_id(), 133 public_key=generate_id(), 134 aaguid=str(uuid4()), 135 ) 136 dev.delete() 137 138 stream = Stream.objects.filter(provider=self.provider).first() 139 self.assertIsNotNone(stream) 140 event = StreamEvent.objects.filter(stream=stream).exclude().first() 141 self.assertIsNotNone(event) 142 self.assertEqual(event.status, SSFEventStatus.PENDING_FAILED) 143 event_payload = event.payload["events"][ 144 "https://schemas.openid.net/secevent/caep/event-type/credential-change" 145 ] 146 self.assertEqual(event_payload["change_type"], "delete") 147 self.assertEqual(event_payload["fido2_aaguid"], dev.aaguid) 148 self.assertEqual(event_payload["friendly_name"], dev.name) 149 self.assertEqual(event_payload["credential_type"], "fido-u2f") 150 self.assertEqual(event.payload["sub_id"]["format"], "complex") 151 self.assertEqual(event.payload["sub_id"]["user"]["format"], "email") 152 self.assertEqual(event.payload["sub_id"]["user"]["email"], user.email) 153 154 def test_signal_policy_ignore(self): 155 """Test event not being created for user that doesn't have access to the application""" 156 PolicyBinding.objects.create( 157 target=self.application, group=Group.objects.create(name=generate_id()), order=0 158 ) 159 user = create_test_user() 160 self.client.force_login(user) 161 user.set_password(generate_id()) 162 user.save() 163 164 stream = Stream.objects.filter(provider=self.provider).first() 165 self.assertIsNotNone(stream) 166 event = StreamEvent.objects.filter( 167 stream=stream, type=EventTypes.CAEP_CREDENTIAL_CHANGE 168 ).first() 169 self.assertIsNone(event)
Test individual SSF Signals
def
setUp(self):
27 def setUp(self): 28 self.application = Application.objects.create(name=generate_id(), slug=generate_id()) 29 self.provider = SSFProvider.objects.create( 30 name=generate_id(), 31 signing_key=create_test_cert(), 32 backchannel_application=self.application, 33 ) 34 res = self.client.post( 35 reverse( 36 "authentik_providers_ssf:stream", 37 kwargs={"application_slug": self.application.slug}, 38 ), 39 data={ 40 "iss": "https://authentik.company/.well-known/ssf-configuration/foo/5", 41 "aud": ["https://app.authentik.company"], 42 "delivery": { 43 "method": "https://schemas.openid.net/secevent/risc/delivery-method/push", 44 "endpoint_url": "https://app.authentik.company", 45 }, 46 "events_requested": [ 47 "https://schemas.openid.net/secevent/caep/event-type/credential-change", 48 "https://schemas.openid.net/secevent/caep/event-type/session-revoked", 49 ], 50 "format": "iss_sub", 51 }, 52 HTTP_AUTHORIZATION=f"Bearer {self.provider.token.key}", 53 ) 54 self.assertEqual(res.status_code, 201, res.content)
Hook method for setting up the test fixture before exercising it.
def
test_signal_logout(self):
56 def test_signal_logout(self): 57 """Test user logout""" 58 user = create_test_user() 59 self.client.force_login(user) 60 self.client.logout() 61 62 stream = Stream.objects.filter(provider=self.provider).first() 63 self.assertIsNotNone(stream) 64 event = StreamEvent.objects.filter(stream=stream).first() 65 self.assertIsNotNone(event) 66 self.assertEqual(event.status, SSFEventStatus.PENDING_FAILED) 67 event_payload = event.payload["events"][ 68 "https://schemas.openid.net/secevent/caep/event-type/session-revoked" 69 ] 70 self.assertEqual(event_payload["initiating_entity"], "user") 71 self.assertEqual(event.payload["sub_id"]["format"], "complex") 72 self.assertEqual(event.payload["sub_id"]["session"]["format"], "opaque") 73 self.assertEqual(event.payload["sub_id"]["user"]["format"], "email") 74 self.assertEqual(event.payload["sub_id"]["user"]["email"], user.email)
Test user logout
def
test_signal_password_change(self):
76 def test_signal_password_change(self): 77 """Test user password change""" 78 user = create_test_user() 79 self.client.force_login(user) 80 user.set_password(generate_id()) 81 user.save() 82 83 stream = Stream.objects.filter(provider=self.provider).first() 84 self.assertIsNotNone(stream) 85 event = StreamEvent.objects.filter(stream=stream).first() 86 self.assertIsNotNone(event) 87 self.assertEqual(event.status, SSFEventStatus.PENDING_FAILED) 88 event_payload = event.payload["events"][ 89 "https://schemas.openid.net/secevent/caep/event-type/credential-change" 90 ] 91 self.assertEqual(event_payload["change_type"], "update") 92 self.assertEqual(event_payload["credential_type"], "password") 93 self.assertEqual(event.payload["sub_id"]["format"], "complex") 94 self.assertEqual(event.payload["sub_id"]["user"]["format"], "email") 95 self.assertEqual(event.payload["sub_id"]["user"]["email"], user.email)
Test user password change
def
test_signal_authenticator_added(self):
97 def test_signal_authenticator_added(self): 98 """Test authenticator creation signal""" 99 user = create_test_user() 100 self.client.force_login(user) 101 dev = WebAuthnDevice.objects.create( 102 user=user, 103 name=generate_id(), 104 credential_id=generate_id(), 105 public_key=generate_id(), 106 aaguid=str(uuid4()), 107 ) 108 109 stream = Stream.objects.filter(provider=self.provider).first() 110 self.assertIsNotNone(stream) 111 event = StreamEvent.objects.filter(stream=stream).exclude().first() 112 self.assertIsNotNone(event) 113 self.assertEqual(event.status, SSFEventStatus.PENDING_FAILED) 114 event_payload = event.payload["events"][ 115 "https://schemas.openid.net/secevent/caep/event-type/credential-change" 116 ] 117 self.assertEqual(event_payload["change_type"], "create") 118 self.assertEqual(event_payload["fido2_aaguid"], dev.aaguid) 119 self.assertEqual(event_payload["friendly_name"], dev.name) 120 self.assertEqual(event_payload["credential_type"], "fido-u2f") 121 self.assertEqual(event.payload["sub_id"]["format"], "complex") 122 self.assertEqual(event.payload["sub_id"]["user"]["format"], "email") 123 self.assertEqual(event.payload["sub_id"]["user"]["email"], user.email)
Test authenticator creation signal
def
test_signal_authenticator_deleted(self):
125 def test_signal_authenticator_deleted(self): 126 """Test authenticator deletion signal""" 127 user = create_test_user() 128 self.client.force_login(user) 129 dev = WebAuthnDevice.objects.create( 130 user=user, 131 name=generate_id(), 132 credential_id=generate_id(), 133 public_key=generate_id(), 134 aaguid=str(uuid4()), 135 ) 136 dev.delete() 137 138 stream = Stream.objects.filter(provider=self.provider).first() 139 self.assertIsNotNone(stream) 140 event = StreamEvent.objects.filter(stream=stream).exclude().first() 141 self.assertIsNotNone(event) 142 self.assertEqual(event.status, SSFEventStatus.PENDING_FAILED) 143 event_payload = event.payload["events"][ 144 "https://schemas.openid.net/secevent/caep/event-type/credential-change" 145 ] 146 self.assertEqual(event_payload["change_type"], "delete") 147 self.assertEqual(event_payload["fido2_aaguid"], dev.aaguid) 148 self.assertEqual(event_payload["friendly_name"], dev.name) 149 self.assertEqual(event_payload["credential_type"], "fido-u2f") 150 self.assertEqual(event.payload["sub_id"]["format"], "complex") 151 self.assertEqual(event.payload["sub_id"]["user"]["format"], "email") 152 self.assertEqual(event.payload["sub_id"]["user"]["email"], user.email)
Test authenticator deletion signal
def
test_signal_policy_ignore(self):
154 def test_signal_policy_ignore(self): 155 """Test event not being created for user that doesn't have access to the application""" 156 PolicyBinding.objects.create( 157 target=self.application, group=Group.objects.create(name=generate_id()), order=0 158 ) 159 user = create_test_user() 160 self.client.force_login(user) 161 user.set_password(generate_id()) 162 user.save() 163 164 stream = Stream.objects.filter(provider=self.provider).first() 165 self.assertIsNotNone(stream) 166 event = StreamEvent.objects.filter( 167 stream=stream, type=EventTypes.CAEP_CREDENTIAL_CHANGE 168 ).first() 169 self.assertIsNone(event)
Test event not being created for user that doesn't have access to the application