authentik.enterprise.providers.ssf.tests.test_stream
1from uuid import uuid4 2 3from django.urls import reverse 4from rest_framework.test import APITestCase 5 6from authentik.core.models import Application 7from authentik.core.tests.utils import create_test_cert 8from authentik.enterprise.providers.ssf.models import ( 9 SSFEventStatus, 10 SSFProvider, 11 Stream, 12 StreamEvent, 13 StreamStatus, 14) 15from authentik.lib.generators import generate_id 16 17 18class TestStream(APITestCase): 19 def setUp(self): 20 self.application = Application.objects.create(name=generate_id(), slug=generate_id()) 21 self.provider = SSFProvider.objects.create( 22 name=generate_id(), 23 signing_key=create_test_cert(), 24 backchannel_application=self.application, 25 ) 26 27 def test_stream_add_token(self): 28 """test stream add (token auth)""" 29 res = self.client.post( 30 reverse( 31 "authentik_providers_ssf:stream", 32 kwargs={"application_slug": self.application.slug}, 33 ), 34 data={ 35 "iss": "https://authentik.company/.well-known/ssf-configuration/foo/5", 36 "aud": ["https://app.authentik.company"], 37 "delivery": { 38 "method": "https://schemas.openid.net/secevent/risc/delivery-method/push", 39 "endpoint_url": "https://app.authentik.company", 40 }, 41 "events_requested": [ 42 "https://schemas.openid.net/secevent/caep/event-type/credential-change", 43 "https://schemas.openid.net/secevent/caep/event-type/session-revoked", 44 ], 45 "format": "iss_sub", 46 }, 47 HTTP_AUTHORIZATION=f"Bearer {self.provider.token.key}", 48 ) 49 self.assertEqual(res.status_code, 201) 50 stream = Stream.objects.filter(provider=self.provider).first() 51 self.assertIsNotNone(stream) 52 event = StreamEvent.objects.filter(stream=stream).first() 53 self.assertIsNotNone(event) 54 self.assertEqual(event.status, SSFEventStatus.PENDING_FAILED) 55 self.assertEqual( 56 event.payload["events"], 57 {"https://schemas.openid.net/secevent/ssf/event-type/verification": {"state": None}}, 58 ) 59 60 def test_stream_add_poll(self): 61 """test stream add - poll method""" 62 res = self.client.post( 63 reverse( 64 "authentik_providers_ssf:stream", 65 kwargs={"application_slug": self.application.slug}, 66 ), 67 data={ 68 "iss": "https://authentik.company/.well-known/ssf-configuration/foo/5", 69 "aud": ["https://app.authentik.company"], 70 "delivery": { 71 "method": "https://schemas.openid.net/secevent/risc/delivery-method/poll", 72 }, 73 "events_requested": [ 74 "https://schemas.openid.net/secevent/caep/event-type/credential-change", 75 "https://schemas.openid.net/secevent/caep/event-type/session-revoked", 76 ], 77 "format": "iss_sub", 78 }, 79 HTTP_AUTHORIZATION=f"Bearer {self.provider.token.key}", 80 ) 81 self.assertEqual(res.status_code, 400) 82 self.assertJSONEqual( 83 res.content, 84 {"delivery": {"method": ["Polling for SSF events is not currently supported."]}}, 85 ) 86 87 def test_stream_delete(self): 88 """delete stream""" 89 stream = Stream.objects.create(provider=self.provider) 90 res = self.client.delete( 91 reverse( 92 "authentik_providers_ssf:stream", 93 kwargs={"application_slug": self.application.slug}, 94 ), 95 HTTP_AUTHORIZATION=f"Bearer {self.provider.token.key}", 96 ) 97 self.assertEqual(res.status_code, 204) 98 stream.refresh_from_db() 99 self.assertEqual(stream.status, StreamStatus.DISABLED) 100 101 def test_stream_get(self): 102 """get stream""" 103 Stream.objects.create(provider=self.provider) 104 res = self.client.get( 105 reverse( 106 "authentik_providers_ssf:stream", 107 kwargs={"application_slug": self.application.slug}, 108 ), 109 HTTP_AUTHORIZATION=f"Bearer {self.provider.token.key}", 110 ) 111 self.assertEqual(res.status_code, 200) 112 113 def test_stream_get_filter_query(self): 114 """get stream""" 115 other_stream = Stream.objects.create(provider=self.provider) 116 stream = Stream.objects.create(provider=self.provider) 117 res = self.client.get( 118 reverse( 119 "authentik_providers_ssf:stream", 120 kwargs={"application_slug": self.application.slug}, 121 ) 122 + f"?stream_id={stream.pk}", 123 HTTP_AUTHORIZATION=f"Bearer {self.provider.token.key}", 124 ) 125 self.assertEqual(res.status_code, 200) 126 self.assertIn(str(stream.pk), res.content.decode()) 127 self.assertNotIn(str(other_stream.pk), res.content.decode()) 128 129 def test_stream_patch(self): 130 """patch stream""" 131 other_stream = Stream.objects.create(provider=self.provider) 132 stream = Stream.objects.create(provider=self.provider) 133 res = self.client.patch( 134 reverse( 135 "authentik_providers_ssf:stream", 136 kwargs={"application_slug": self.application.slug}, 137 ), 138 data={ 139 "delivery": {"endpoint_url": "https://localhost"}, 140 "stream_id": str(stream.pk), 141 }, 142 HTTP_AUTHORIZATION=f"Bearer {self.provider.token.key}", 143 ) 144 self.assertEqual(res.status_code, 200) 145 self.assertIn(str(stream.pk), res.content.decode()) 146 self.assertNotIn(str(other_stream.pk), res.content.decode()) 147 148 def test_stream_put(self): 149 """put stream""" 150 stream = Stream.objects.create(provider=self.provider) 151 res = self.client.put( 152 reverse( 153 "authentik_providers_ssf:stream", 154 kwargs={"application_slug": self.application.slug}, 155 ), 156 data={ 157 "iss": "https://authentik.company/.well-known/ssf-configuration/foo/5", 158 "aud": ["https://app.authentik.company"], 159 "delivery": { 160 "method": "https://schemas.openid.net/secevent/risc/delivery-method/push", 161 "endpoint_url": "https://app.authentik.company", 162 }, 163 "events_requested": [ 164 "https://schemas.openid.net/secevent/caep/event-type/credential-change", 165 "https://schemas.openid.net/secevent/caep/event-type/session-revoked", 166 ], 167 "format": "iss_sub", 168 "stream_id": str(stream.pk), 169 }, 170 HTTP_AUTHORIZATION=f"Bearer {self.provider.token.key}", 171 ) 172 self.assertEqual(res.status_code, 200) 173 self.assertIn(str(stream.pk), res.content.decode()) 174 stream.refresh_from_db() 175 self.assertEqual(stream.aud, ["https://app.authentik.company"]) 176 177 def test_stream_verify(self): 178 """Test stream verify""" 179 stream = Stream.objects.create(provider=self.provider) 180 res = self.client.post( 181 reverse( 182 "authentik_providers_ssf:stream-verify", 183 kwargs={"application_slug": self.application.slug}, 184 ), 185 data={ 186 "stream_id": str(stream.pk), 187 }, 188 HTTP_AUTHORIZATION=f"Bearer {self.provider.token.key}", 189 ) 190 self.assertEqual(res.status_code, 204) 191 192 def test_stream_status(self): 193 """Test stream status""" 194 stream = Stream.objects.create(provider=self.provider) 195 res = self.client.get( 196 reverse( 197 "authentik_providers_ssf:stream-status", 198 kwargs={"application_slug": self.application.slug}, 199 ), 200 data={ 201 "stream_id": str(stream.pk), 202 }, 203 HTTP_AUTHORIZATION=f"Bearer {self.provider.token.key}", 204 ) 205 self.assertEqual(res.status_code, 200) 206 self.assertJSONEqual( 207 res.content, 208 { 209 "stream_id": str(stream.pk), 210 "status": str(stream.status), 211 }, 212 ) 213 214 def test_stream_status_not_found(self): 215 """Test stream status""" 216 Stream.objects.create(provider=self.provider) 217 res = self.client.get( 218 reverse( 219 "authentik_providers_ssf:stream-status", 220 kwargs={"application_slug": self.application.slug}, 221 ), 222 data={ 223 "stream_id": str(uuid4()), 224 }, 225 HTTP_AUTHORIZATION=f"Bearer {self.provider.token.key}", 226 ) 227 self.assertEqual(res.status_code, 404)
class
TestStream(rest_framework.test.APITestCase):
19class TestStream(APITestCase): 20 def setUp(self): 21 self.application = Application.objects.create(name=generate_id(), slug=generate_id()) 22 self.provider = SSFProvider.objects.create( 23 name=generate_id(), 24 signing_key=create_test_cert(), 25 backchannel_application=self.application, 26 ) 27 28 def test_stream_add_token(self): 29 """test stream add (token auth)""" 30 res = self.client.post( 31 reverse( 32 "authentik_providers_ssf:stream", 33 kwargs={"application_slug": self.application.slug}, 34 ), 35 data={ 36 "iss": "https://authentik.company/.well-known/ssf-configuration/foo/5", 37 "aud": ["https://app.authentik.company"], 38 "delivery": { 39 "method": "https://schemas.openid.net/secevent/risc/delivery-method/push", 40 "endpoint_url": "https://app.authentik.company", 41 }, 42 "events_requested": [ 43 "https://schemas.openid.net/secevent/caep/event-type/credential-change", 44 "https://schemas.openid.net/secevent/caep/event-type/session-revoked", 45 ], 46 "format": "iss_sub", 47 }, 48 HTTP_AUTHORIZATION=f"Bearer {self.provider.token.key}", 49 ) 50 self.assertEqual(res.status_code, 201) 51 stream = Stream.objects.filter(provider=self.provider).first() 52 self.assertIsNotNone(stream) 53 event = StreamEvent.objects.filter(stream=stream).first() 54 self.assertIsNotNone(event) 55 self.assertEqual(event.status, SSFEventStatus.PENDING_FAILED) 56 self.assertEqual( 57 event.payload["events"], 58 {"https://schemas.openid.net/secevent/ssf/event-type/verification": {"state": None}}, 59 ) 60 61 def test_stream_add_poll(self): 62 """test stream add - poll method""" 63 res = self.client.post( 64 reverse( 65 "authentik_providers_ssf:stream", 66 kwargs={"application_slug": self.application.slug}, 67 ), 68 data={ 69 "iss": "https://authentik.company/.well-known/ssf-configuration/foo/5", 70 "aud": ["https://app.authentik.company"], 71 "delivery": { 72 "method": "https://schemas.openid.net/secevent/risc/delivery-method/poll", 73 }, 74 "events_requested": [ 75 "https://schemas.openid.net/secevent/caep/event-type/credential-change", 76 "https://schemas.openid.net/secevent/caep/event-type/session-revoked", 77 ], 78 "format": "iss_sub", 79 }, 80 HTTP_AUTHORIZATION=f"Bearer {self.provider.token.key}", 81 ) 82 self.assertEqual(res.status_code, 400) 83 self.assertJSONEqual( 84 res.content, 85 {"delivery": {"method": ["Polling for SSF events is not currently supported."]}}, 86 ) 87 88 def test_stream_delete(self): 89 """delete stream""" 90 stream = Stream.objects.create(provider=self.provider) 91 res = self.client.delete( 92 reverse( 93 "authentik_providers_ssf:stream", 94 kwargs={"application_slug": self.application.slug}, 95 ), 96 HTTP_AUTHORIZATION=f"Bearer {self.provider.token.key}", 97 ) 98 self.assertEqual(res.status_code, 204) 99 stream.refresh_from_db() 100 self.assertEqual(stream.status, StreamStatus.DISABLED) 101 102 def test_stream_get(self): 103 """get stream""" 104 Stream.objects.create(provider=self.provider) 105 res = self.client.get( 106 reverse( 107 "authentik_providers_ssf:stream", 108 kwargs={"application_slug": self.application.slug}, 109 ), 110 HTTP_AUTHORIZATION=f"Bearer {self.provider.token.key}", 111 ) 112 self.assertEqual(res.status_code, 200) 113 114 def test_stream_get_filter_query(self): 115 """get stream""" 116 other_stream = Stream.objects.create(provider=self.provider) 117 stream = Stream.objects.create(provider=self.provider) 118 res = self.client.get( 119 reverse( 120 "authentik_providers_ssf:stream", 121 kwargs={"application_slug": self.application.slug}, 122 ) 123 + f"?stream_id={stream.pk}", 124 HTTP_AUTHORIZATION=f"Bearer {self.provider.token.key}", 125 ) 126 self.assertEqual(res.status_code, 200) 127 self.assertIn(str(stream.pk), res.content.decode()) 128 self.assertNotIn(str(other_stream.pk), res.content.decode()) 129 130 def test_stream_patch(self): 131 """patch stream""" 132 other_stream = Stream.objects.create(provider=self.provider) 133 stream = Stream.objects.create(provider=self.provider) 134 res = self.client.patch( 135 reverse( 136 "authentik_providers_ssf:stream", 137 kwargs={"application_slug": self.application.slug}, 138 ), 139 data={ 140 "delivery": {"endpoint_url": "https://localhost"}, 141 "stream_id": str(stream.pk), 142 }, 143 HTTP_AUTHORIZATION=f"Bearer {self.provider.token.key}", 144 ) 145 self.assertEqual(res.status_code, 200) 146 self.assertIn(str(stream.pk), res.content.decode()) 147 self.assertNotIn(str(other_stream.pk), res.content.decode()) 148 149 def test_stream_put(self): 150 """put stream""" 151 stream = Stream.objects.create(provider=self.provider) 152 res = self.client.put( 153 reverse( 154 "authentik_providers_ssf:stream", 155 kwargs={"application_slug": self.application.slug}, 156 ), 157 data={ 158 "iss": "https://authentik.company/.well-known/ssf-configuration/foo/5", 159 "aud": ["https://app.authentik.company"], 160 "delivery": { 161 "method": "https://schemas.openid.net/secevent/risc/delivery-method/push", 162 "endpoint_url": "https://app.authentik.company", 163 }, 164 "events_requested": [ 165 "https://schemas.openid.net/secevent/caep/event-type/credential-change", 166 "https://schemas.openid.net/secevent/caep/event-type/session-revoked", 167 ], 168 "format": "iss_sub", 169 "stream_id": str(stream.pk), 170 }, 171 HTTP_AUTHORIZATION=f"Bearer {self.provider.token.key}", 172 ) 173 self.assertEqual(res.status_code, 200) 174 self.assertIn(str(stream.pk), res.content.decode()) 175 stream.refresh_from_db() 176 self.assertEqual(stream.aud, ["https://app.authentik.company"]) 177 178 def test_stream_verify(self): 179 """Test stream verify""" 180 stream = Stream.objects.create(provider=self.provider) 181 res = self.client.post( 182 reverse( 183 "authentik_providers_ssf:stream-verify", 184 kwargs={"application_slug": self.application.slug}, 185 ), 186 data={ 187 "stream_id": str(stream.pk), 188 }, 189 HTTP_AUTHORIZATION=f"Bearer {self.provider.token.key}", 190 ) 191 self.assertEqual(res.status_code, 204) 192 193 def test_stream_status(self): 194 """Test stream status""" 195 stream = Stream.objects.create(provider=self.provider) 196 res = self.client.get( 197 reverse( 198 "authentik_providers_ssf:stream-status", 199 kwargs={"application_slug": self.application.slug}, 200 ), 201 data={ 202 "stream_id": str(stream.pk), 203 }, 204 HTTP_AUTHORIZATION=f"Bearer {self.provider.token.key}", 205 ) 206 self.assertEqual(res.status_code, 200) 207 self.assertJSONEqual( 208 res.content, 209 { 210 "stream_id": str(stream.pk), 211 "status": str(stream.status), 212 }, 213 ) 214 215 def test_stream_status_not_found(self): 216 """Test stream status""" 217 Stream.objects.create(provider=self.provider) 218 res = self.client.get( 219 reverse( 220 "authentik_providers_ssf:stream-status", 221 kwargs={"application_slug": self.application.slug}, 222 ), 223 data={ 224 "stream_id": str(uuid4()), 225 }, 226 HTTP_AUTHORIZATION=f"Bearer {self.provider.token.key}", 227 ) 228 self.assertEqual(res.status_code, 404)
Similar to TransactionTestCase, but use transaction.atomic() to achieve
test isolation.
In most situations, TestCase should be preferred to TransactionTestCase as it allows faster execution. However, there are some situations where using TransactionTestCase might be necessary (e.g. testing some transactional behavior).
On database backends with no transaction support, TestCase behaves as TransactionTestCase.
def
setUp(self):
20 def setUp(self): 21 self.application = Application.objects.create(name=generate_id(), slug=generate_id()) 22 self.provider = SSFProvider.objects.create( 23 name=generate_id(), 24 signing_key=create_test_cert(), 25 backchannel_application=self.application, 26 )
Hook method for setting up the test fixture before exercising it.
def
test_stream_add_token(self):
28 def test_stream_add_token(self): 29 """test stream add (token auth)""" 30 res = self.client.post( 31 reverse( 32 "authentik_providers_ssf:stream", 33 kwargs={"application_slug": self.application.slug}, 34 ), 35 data={ 36 "iss": "https://authentik.company/.well-known/ssf-configuration/foo/5", 37 "aud": ["https://app.authentik.company"], 38 "delivery": { 39 "method": "https://schemas.openid.net/secevent/risc/delivery-method/push", 40 "endpoint_url": "https://app.authentik.company", 41 }, 42 "events_requested": [ 43 "https://schemas.openid.net/secevent/caep/event-type/credential-change", 44 "https://schemas.openid.net/secevent/caep/event-type/session-revoked", 45 ], 46 "format": "iss_sub", 47 }, 48 HTTP_AUTHORIZATION=f"Bearer {self.provider.token.key}", 49 ) 50 self.assertEqual(res.status_code, 201) 51 stream = Stream.objects.filter(provider=self.provider).first() 52 self.assertIsNotNone(stream) 53 event = StreamEvent.objects.filter(stream=stream).first() 54 self.assertIsNotNone(event) 55 self.assertEqual(event.status, SSFEventStatus.PENDING_FAILED) 56 self.assertEqual( 57 event.payload["events"], 58 {"https://schemas.openid.net/secevent/ssf/event-type/verification": {"state": None}}, 59 )
test stream add (token auth)
def
test_stream_add_poll(self):
61 def test_stream_add_poll(self): 62 """test stream add - poll method""" 63 res = self.client.post( 64 reverse( 65 "authentik_providers_ssf:stream", 66 kwargs={"application_slug": self.application.slug}, 67 ), 68 data={ 69 "iss": "https://authentik.company/.well-known/ssf-configuration/foo/5", 70 "aud": ["https://app.authentik.company"], 71 "delivery": { 72 "method": "https://schemas.openid.net/secevent/risc/delivery-method/poll", 73 }, 74 "events_requested": [ 75 "https://schemas.openid.net/secevent/caep/event-type/credential-change", 76 "https://schemas.openid.net/secevent/caep/event-type/session-revoked", 77 ], 78 "format": "iss_sub", 79 }, 80 HTTP_AUTHORIZATION=f"Bearer {self.provider.token.key}", 81 ) 82 self.assertEqual(res.status_code, 400) 83 self.assertJSONEqual( 84 res.content, 85 {"delivery": {"method": ["Polling for SSF events is not currently supported."]}}, 86 )
test stream add - poll method
def
test_stream_delete(self):
88 def test_stream_delete(self): 89 """delete stream""" 90 stream = Stream.objects.create(provider=self.provider) 91 res = self.client.delete( 92 reverse( 93 "authentik_providers_ssf:stream", 94 kwargs={"application_slug": self.application.slug}, 95 ), 96 HTTP_AUTHORIZATION=f"Bearer {self.provider.token.key}", 97 ) 98 self.assertEqual(res.status_code, 204) 99 stream.refresh_from_db() 100 self.assertEqual(stream.status, StreamStatus.DISABLED)
delete stream
def
test_stream_get(self):
102 def test_stream_get(self): 103 """get stream""" 104 Stream.objects.create(provider=self.provider) 105 res = self.client.get( 106 reverse( 107 "authentik_providers_ssf:stream", 108 kwargs={"application_slug": self.application.slug}, 109 ), 110 HTTP_AUTHORIZATION=f"Bearer {self.provider.token.key}", 111 ) 112 self.assertEqual(res.status_code, 200)
get stream
def
test_stream_get_filter_query(self):
114 def test_stream_get_filter_query(self): 115 """get stream""" 116 other_stream = Stream.objects.create(provider=self.provider) 117 stream = Stream.objects.create(provider=self.provider) 118 res = self.client.get( 119 reverse( 120 "authentik_providers_ssf:stream", 121 kwargs={"application_slug": self.application.slug}, 122 ) 123 + f"?stream_id={stream.pk}", 124 HTTP_AUTHORIZATION=f"Bearer {self.provider.token.key}", 125 ) 126 self.assertEqual(res.status_code, 200) 127 self.assertIn(str(stream.pk), res.content.decode()) 128 self.assertNotIn(str(other_stream.pk), res.content.decode())
get stream
def
test_stream_patch(self):
130 def test_stream_patch(self): 131 """patch stream""" 132 other_stream = Stream.objects.create(provider=self.provider) 133 stream = Stream.objects.create(provider=self.provider) 134 res = self.client.patch( 135 reverse( 136 "authentik_providers_ssf:stream", 137 kwargs={"application_slug": self.application.slug}, 138 ), 139 data={ 140 "delivery": {"endpoint_url": "https://localhost"}, 141 "stream_id": str(stream.pk), 142 }, 143 HTTP_AUTHORIZATION=f"Bearer {self.provider.token.key}", 144 ) 145 self.assertEqual(res.status_code, 200) 146 self.assertIn(str(stream.pk), res.content.decode()) 147 self.assertNotIn(str(other_stream.pk), res.content.decode())
patch stream
def
test_stream_put(self):
149 def test_stream_put(self): 150 """put stream""" 151 stream = Stream.objects.create(provider=self.provider) 152 res = self.client.put( 153 reverse( 154 "authentik_providers_ssf:stream", 155 kwargs={"application_slug": self.application.slug}, 156 ), 157 data={ 158 "iss": "https://authentik.company/.well-known/ssf-configuration/foo/5", 159 "aud": ["https://app.authentik.company"], 160 "delivery": { 161 "method": "https://schemas.openid.net/secevent/risc/delivery-method/push", 162 "endpoint_url": "https://app.authentik.company", 163 }, 164 "events_requested": [ 165 "https://schemas.openid.net/secevent/caep/event-type/credential-change", 166 "https://schemas.openid.net/secevent/caep/event-type/session-revoked", 167 ], 168 "format": "iss_sub", 169 "stream_id": str(stream.pk), 170 }, 171 HTTP_AUTHORIZATION=f"Bearer {self.provider.token.key}", 172 ) 173 self.assertEqual(res.status_code, 200) 174 self.assertIn(str(stream.pk), res.content.decode()) 175 stream.refresh_from_db() 176 self.assertEqual(stream.aud, ["https://app.authentik.company"])
put stream
def
test_stream_verify(self):
178 def test_stream_verify(self): 179 """Test stream verify""" 180 stream = Stream.objects.create(provider=self.provider) 181 res = self.client.post( 182 reverse( 183 "authentik_providers_ssf:stream-verify", 184 kwargs={"application_slug": self.application.slug}, 185 ), 186 data={ 187 "stream_id": str(stream.pk), 188 }, 189 HTTP_AUTHORIZATION=f"Bearer {self.provider.token.key}", 190 ) 191 self.assertEqual(res.status_code, 204)
Test stream verify
def
test_stream_status(self):
193 def test_stream_status(self): 194 """Test stream status""" 195 stream = Stream.objects.create(provider=self.provider) 196 res = self.client.get( 197 reverse( 198 "authentik_providers_ssf:stream-status", 199 kwargs={"application_slug": self.application.slug}, 200 ), 201 data={ 202 "stream_id": str(stream.pk), 203 }, 204 HTTP_AUTHORIZATION=f"Bearer {self.provider.token.key}", 205 ) 206 self.assertEqual(res.status_code, 200) 207 self.assertJSONEqual( 208 res.content, 209 { 210 "stream_id": str(stream.pk), 211 "status": str(stream.status), 212 }, 213 )
Test stream status
def
test_stream_status_not_found(self):
215 def test_stream_status_not_found(self): 216 """Test stream status""" 217 Stream.objects.create(provider=self.provider) 218 res = self.client.get( 219 reverse( 220 "authentik_providers_ssf:stream-status", 221 kwargs={"application_slug": self.application.slug}, 222 ), 223 data={ 224 "stream_id": str(uuid4()), 225 }, 226 HTTP_AUTHORIZATION=f"Bearer {self.provider.token.key}", 227 ) 228 self.assertEqual(res.status_code, 404)
Test stream status