authentik.enterprise.providers.ssf.tests.test_auth
1import json 2from dataclasses import asdict 3 4from django.urls import reverse 5from django.utils import timezone 6from rest_framework.test import APITestCase 7 8from authentik.core.models import Application, Token, TokenIntents 9from authentik.core.tests.utils import ( 10 create_test_admin_user, 11 create_test_cert, 12 create_test_flow, 13 create_test_user, 14) 15from authentik.enterprise.providers.ssf.models import ( 16 SSFEventStatus, 17 SSFProvider, 18 Stream, 19 StreamEvent, 20) 21from authentik.lib.generators import generate_id 22from authentik.providers.oauth2.id_token import IDToken 23from authentik.providers.oauth2.models import AccessToken, OAuth2Provider 24 25 26class TestSSFAuth(APITestCase): 27 def setUp(self): 28 self.application = Application.objects.create(name=generate_id(), slug=generate_id()) 29 self.provider = SSFProvider.objects.create( 30 name=generate_id(), 31 signing_key=create_test_cert(), 32 backchannel_application=self.application, 33 ) 34 35 def test_stream_add_token(self): 36 """test stream add (token auth)""" 37 res = self.client.post( 38 reverse( 39 "authentik_providers_ssf:stream", 40 kwargs={"application_slug": self.application.slug}, 41 ), 42 data={ 43 "iss": "https://authentik.company/.well-known/ssf-configuration/foo/5", 44 "aud": ["https://app.authentik.company"], 45 "delivery": { 46 "method": "https://schemas.openid.net/secevent/risc/delivery-method/push", 47 "endpoint_url": "https://app.authentik.company", 48 }, 49 "events_requested": [ 50 "https://schemas.openid.net/secevent/caep/event-type/credential-change", 51 "https://schemas.openid.net/secevent/caep/event-type/session-revoked", 52 ], 53 "format": "iss_sub", 54 }, 55 HTTP_AUTHORIZATION=f"Bearer {self.provider.token.key}", 56 ) 57 self.assertEqual(res.status_code, 201) 58 stream = Stream.objects.filter(provider=self.provider).first() 59 self.assertIsNotNone(stream) 60 event = StreamEvent.objects.filter(stream=stream).first() 61 self.assertIsNotNone(event) 62 self.assertEqual(event.status, SSFEventStatus.PENDING_FAILED) 63 self.assertEqual( 64 event.payload["events"], 65 {"https://schemas.openid.net/secevent/ssf/event-type/verification": {"state": None}}, 66 ) 67 68 def test_stream_add_oidc(self): 69 """test stream add (oidc auth)""" 70 provider = OAuth2Provider.objects.create( 71 name=generate_id(), 72 authorization_flow=create_test_flow(), 73 ) 74 self.application.provider = provider 75 self.application.save() 76 user = create_test_admin_user() 77 token = AccessToken.objects.create( 78 provider=provider, 79 user=user, 80 token=generate_id(), 81 auth_time=timezone.now(), 82 _scope="openid user profile", 83 _id_token=json.dumps( 84 asdict( 85 IDToken("foo", "bar"), 86 ) 87 ), 88 ) 89 90 res = self.client.post( 91 reverse( 92 "authentik_providers_ssf:stream", 93 kwargs={"application_slug": self.application.slug}, 94 ), 95 data={ 96 "iss": "https://authentik.company/.well-known/ssf-configuration/foo/5", 97 "aud": ["https://app.authentik.company"], 98 "delivery": { 99 "method": "https://schemas.openid.net/secevent/risc/delivery-method/push", 100 "endpoint_url": "https://app.authentik.company", 101 }, 102 "events_requested": [ 103 "https://schemas.openid.net/secevent/caep/event-type/credential-change", 104 "https://schemas.openid.net/secevent/caep/event-type/session-revoked", 105 ], 106 "format": "iss_sub", 107 }, 108 HTTP_AUTHORIZATION=f"Bearer {token.token}", 109 ) 110 self.assertEqual(res.status_code, 201) 111 stream = Stream.objects.filter(provider=self.provider).first() 112 self.assertIsNotNone(stream) 113 event = StreamEvent.objects.filter(stream=stream).first() 114 self.assertIsNotNone(event) 115 self.assertEqual(event.status, SSFEventStatus.PENDING_FAILED) 116 self.assertEqual( 117 event.payload["events"], 118 {"https://schemas.openid.net/secevent/ssf/event-type/verification": {"state": None}}, 119 ) 120 121 def test_token_invalid(self): 122 res = self.client.post( 123 reverse( 124 "authentik_providers_ssf:stream", 125 kwargs={"application_slug": self.application.slug}, 126 ), 127 data={ 128 "iss": "https://authentik.company/.well-known/ssf-configuration/foo/5", 129 "aud": ["https://app.authentik.company"], 130 "delivery": { 131 "method": "https://schemas.openid.net/secevent/risc/delivery-method/push", 132 "endpoint_url": "https://app.authentik.company", 133 }, 134 "events_requested": [ 135 "https://schemas.openid.net/secevent/caep/event-type/credential-change", 136 "https://schemas.openid.net/secevent/caep/event-type/session-revoked", 137 ], 138 "format": "iss_sub", 139 }, 140 HTTP_AUTHORIZATION=f"Bearer {self.provider.token.key}a", 141 ) 142 # Response code needs to be 401 according to spec 143 self.assertEqual(res.status_code, 401) 144 145 def test_token_unrelated(self): 146 token = Token.objects.create( 147 identifier=generate_id(), user=create_test_user(), intent=TokenIntents.INTENT_API 148 ) 149 res = self.client.post( 150 reverse( 151 "authentik_providers_ssf:stream", 152 kwargs={"application_slug": self.application.slug}, 153 ), 154 data={ 155 "iss": "https://authentik.company/.well-known/ssf-configuration/foo/5", 156 "aud": ["https://app.authentik.company"], 157 "delivery": { 158 "method": "https://schemas.openid.net/secevent/risc/delivery-method/push", 159 "endpoint_url": "https://app.authentik.company", 160 }, 161 "events_requested": [ 162 "https://schemas.openid.net/secevent/caep/event-type/credential-change", 163 "https://schemas.openid.net/secevent/caep/event-type/session-revoked", 164 ], 165 "format": "iss_sub", 166 }, 167 HTTP_AUTHORIZATION=f"Bearer {token.key}", 168 ) 169 # Response code needs to be 401 according to spec 170 self.assertEqual(res.status_code, 401)
class
TestSSFAuth(rest_framework.test.APITestCase):
27class TestSSFAuth(APITestCase): 28 def setUp(self): 29 self.application = Application.objects.create(name=generate_id(), slug=generate_id()) 30 self.provider = SSFProvider.objects.create( 31 name=generate_id(), 32 signing_key=create_test_cert(), 33 backchannel_application=self.application, 34 ) 35 36 def test_stream_add_token(self): 37 """test stream add (token auth)""" 38 res = self.client.post( 39 reverse( 40 "authentik_providers_ssf:stream", 41 kwargs={"application_slug": self.application.slug}, 42 ), 43 data={ 44 "iss": "https://authentik.company/.well-known/ssf-configuration/foo/5", 45 "aud": ["https://app.authentik.company"], 46 "delivery": { 47 "method": "https://schemas.openid.net/secevent/risc/delivery-method/push", 48 "endpoint_url": "https://app.authentik.company", 49 }, 50 "events_requested": [ 51 "https://schemas.openid.net/secevent/caep/event-type/credential-change", 52 "https://schemas.openid.net/secevent/caep/event-type/session-revoked", 53 ], 54 "format": "iss_sub", 55 }, 56 HTTP_AUTHORIZATION=f"Bearer {self.provider.token.key}", 57 ) 58 self.assertEqual(res.status_code, 201) 59 stream = Stream.objects.filter(provider=self.provider).first() 60 self.assertIsNotNone(stream) 61 event = StreamEvent.objects.filter(stream=stream).first() 62 self.assertIsNotNone(event) 63 self.assertEqual(event.status, SSFEventStatus.PENDING_FAILED) 64 self.assertEqual( 65 event.payload["events"], 66 {"https://schemas.openid.net/secevent/ssf/event-type/verification": {"state": None}}, 67 ) 68 69 def test_stream_add_oidc(self): 70 """test stream add (oidc auth)""" 71 provider = OAuth2Provider.objects.create( 72 name=generate_id(), 73 authorization_flow=create_test_flow(), 74 ) 75 self.application.provider = provider 76 self.application.save() 77 user = create_test_admin_user() 78 token = AccessToken.objects.create( 79 provider=provider, 80 user=user, 81 token=generate_id(), 82 auth_time=timezone.now(), 83 _scope="openid user profile", 84 _id_token=json.dumps( 85 asdict( 86 IDToken("foo", "bar"), 87 ) 88 ), 89 ) 90 91 res = self.client.post( 92 reverse( 93 "authentik_providers_ssf:stream", 94 kwargs={"application_slug": self.application.slug}, 95 ), 96 data={ 97 "iss": "https://authentik.company/.well-known/ssf-configuration/foo/5", 98 "aud": ["https://app.authentik.company"], 99 "delivery": { 100 "method": "https://schemas.openid.net/secevent/risc/delivery-method/push", 101 "endpoint_url": "https://app.authentik.company", 102 }, 103 "events_requested": [ 104 "https://schemas.openid.net/secevent/caep/event-type/credential-change", 105 "https://schemas.openid.net/secevent/caep/event-type/session-revoked", 106 ], 107 "format": "iss_sub", 108 }, 109 HTTP_AUTHORIZATION=f"Bearer {token.token}", 110 ) 111 self.assertEqual(res.status_code, 201) 112 stream = Stream.objects.filter(provider=self.provider).first() 113 self.assertIsNotNone(stream) 114 event = StreamEvent.objects.filter(stream=stream).first() 115 self.assertIsNotNone(event) 116 self.assertEqual(event.status, SSFEventStatus.PENDING_FAILED) 117 self.assertEqual( 118 event.payload["events"], 119 {"https://schemas.openid.net/secevent/ssf/event-type/verification": {"state": None}}, 120 ) 121 122 def test_token_invalid(self): 123 res = self.client.post( 124 reverse( 125 "authentik_providers_ssf:stream", 126 kwargs={"application_slug": self.application.slug}, 127 ), 128 data={ 129 "iss": "https://authentik.company/.well-known/ssf-configuration/foo/5", 130 "aud": ["https://app.authentik.company"], 131 "delivery": { 132 "method": "https://schemas.openid.net/secevent/risc/delivery-method/push", 133 "endpoint_url": "https://app.authentik.company", 134 }, 135 "events_requested": [ 136 "https://schemas.openid.net/secevent/caep/event-type/credential-change", 137 "https://schemas.openid.net/secevent/caep/event-type/session-revoked", 138 ], 139 "format": "iss_sub", 140 }, 141 HTTP_AUTHORIZATION=f"Bearer {self.provider.token.key}a", 142 ) 143 # Response code needs to be 401 according to spec 144 self.assertEqual(res.status_code, 401) 145 146 def test_token_unrelated(self): 147 token = Token.objects.create( 148 identifier=generate_id(), user=create_test_user(), intent=TokenIntents.INTENT_API 149 ) 150 res = self.client.post( 151 reverse( 152 "authentik_providers_ssf:stream", 153 kwargs={"application_slug": self.application.slug}, 154 ), 155 data={ 156 "iss": "https://authentik.company/.well-known/ssf-configuration/foo/5", 157 "aud": ["https://app.authentik.company"], 158 "delivery": { 159 "method": "https://schemas.openid.net/secevent/risc/delivery-method/push", 160 "endpoint_url": "https://app.authentik.company", 161 }, 162 "events_requested": [ 163 "https://schemas.openid.net/secevent/caep/event-type/credential-change", 164 "https://schemas.openid.net/secevent/caep/event-type/session-revoked", 165 ], 166 "format": "iss_sub", 167 }, 168 HTTP_AUTHORIZATION=f"Bearer {token.key}", 169 ) 170 # Response code needs to be 401 according to spec 171 self.assertEqual(res.status_code, 401)
Similar to TransactionTestCase, but use transaction.atomic() to achieve
test isolation.
In most situations, TestCase should be preferred to TransactionTestCase as it allows faster execution. However, there are some situations where using TransactionTestCase might be necessary (e.g. testing some transactional behavior).
On database backends with no transaction support, TestCase behaves as TransactionTestCase.
def
setUp(self):
28 def setUp(self): 29 self.application = Application.objects.create(name=generate_id(), slug=generate_id()) 30 self.provider = SSFProvider.objects.create( 31 name=generate_id(), 32 signing_key=create_test_cert(), 33 backchannel_application=self.application, 34 )
Hook method for setting up the test fixture before exercising it.
def
test_stream_add_token(self):
36 def test_stream_add_token(self): 37 """test stream add (token auth)""" 38 res = self.client.post( 39 reverse( 40 "authentik_providers_ssf:stream", 41 kwargs={"application_slug": self.application.slug}, 42 ), 43 data={ 44 "iss": "https://authentik.company/.well-known/ssf-configuration/foo/5", 45 "aud": ["https://app.authentik.company"], 46 "delivery": { 47 "method": "https://schemas.openid.net/secevent/risc/delivery-method/push", 48 "endpoint_url": "https://app.authentik.company", 49 }, 50 "events_requested": [ 51 "https://schemas.openid.net/secevent/caep/event-type/credential-change", 52 "https://schemas.openid.net/secevent/caep/event-type/session-revoked", 53 ], 54 "format": "iss_sub", 55 }, 56 HTTP_AUTHORIZATION=f"Bearer {self.provider.token.key}", 57 ) 58 self.assertEqual(res.status_code, 201) 59 stream = Stream.objects.filter(provider=self.provider).first() 60 self.assertIsNotNone(stream) 61 event = StreamEvent.objects.filter(stream=stream).first() 62 self.assertIsNotNone(event) 63 self.assertEqual(event.status, SSFEventStatus.PENDING_FAILED) 64 self.assertEqual( 65 event.payload["events"], 66 {"https://schemas.openid.net/secevent/ssf/event-type/verification": {"state": None}}, 67 )
test stream add (token auth)
def
test_stream_add_oidc(self):
69 def test_stream_add_oidc(self): 70 """test stream add (oidc auth)""" 71 provider = OAuth2Provider.objects.create( 72 name=generate_id(), 73 authorization_flow=create_test_flow(), 74 ) 75 self.application.provider = provider 76 self.application.save() 77 user = create_test_admin_user() 78 token = AccessToken.objects.create( 79 provider=provider, 80 user=user, 81 token=generate_id(), 82 auth_time=timezone.now(), 83 _scope="openid user profile", 84 _id_token=json.dumps( 85 asdict( 86 IDToken("foo", "bar"), 87 ) 88 ), 89 ) 90 91 res = self.client.post( 92 reverse( 93 "authentik_providers_ssf:stream", 94 kwargs={"application_slug": self.application.slug}, 95 ), 96 data={ 97 "iss": "https://authentik.company/.well-known/ssf-configuration/foo/5", 98 "aud": ["https://app.authentik.company"], 99 "delivery": { 100 "method": "https://schemas.openid.net/secevent/risc/delivery-method/push", 101 "endpoint_url": "https://app.authentik.company", 102 }, 103 "events_requested": [ 104 "https://schemas.openid.net/secevent/caep/event-type/credential-change", 105 "https://schemas.openid.net/secevent/caep/event-type/session-revoked", 106 ], 107 "format": "iss_sub", 108 }, 109 HTTP_AUTHORIZATION=f"Bearer {token.token}", 110 ) 111 self.assertEqual(res.status_code, 201) 112 stream = Stream.objects.filter(provider=self.provider).first() 113 self.assertIsNotNone(stream) 114 event = StreamEvent.objects.filter(stream=stream).first() 115 self.assertIsNotNone(event) 116 self.assertEqual(event.status, SSFEventStatus.PENDING_FAILED) 117 self.assertEqual( 118 event.payload["events"], 119 {"https://schemas.openid.net/secevent/ssf/event-type/verification": {"state": None}}, 120 )
test stream add (oidc auth)
def
test_token_invalid(self):
122 def test_token_invalid(self): 123 res = self.client.post( 124 reverse( 125 "authentik_providers_ssf:stream", 126 kwargs={"application_slug": self.application.slug}, 127 ), 128 data={ 129 "iss": "https://authentik.company/.well-known/ssf-configuration/foo/5", 130 "aud": ["https://app.authentik.company"], 131 "delivery": { 132 "method": "https://schemas.openid.net/secevent/risc/delivery-method/push", 133 "endpoint_url": "https://app.authentik.company", 134 }, 135 "events_requested": [ 136 "https://schemas.openid.net/secevent/caep/event-type/credential-change", 137 "https://schemas.openid.net/secevent/caep/event-type/session-revoked", 138 ], 139 "format": "iss_sub", 140 }, 141 HTTP_AUTHORIZATION=f"Bearer {self.provider.token.key}a", 142 ) 143 # Response code needs to be 401 according to spec 144 self.assertEqual(res.status_code, 401)