authentik.enterprise.endpoints.connectors.agent.tests.test_connector_auth_fed
1from json import loads 2 3from django.urls import reverse 4from django.utils.timezone import now 5from jwt import decode 6from rest_framework.test import APITestCase 7 8from authentik.blueprints.tests import reconcile_app 9from authentik.core.models import Group 10from authentik.core.tests.utils import create_test_cert, create_test_user 11from authentik.endpoints.connectors.agent.api.connectors import AgentDeviceConnection 12from authentik.endpoints.connectors.agent.models import AgentConnector, EnrollmentToken 13from authentik.endpoints.models import Device, DeviceAccessGroup 14from authentik.lib.generators import generate_id 15from authentik.policies.models import PolicyBinding 16from authentik.providers.oauth2.models import AccessToken, OAuth2Provider 17 18 19class TestConnectorAuthFed(APITestCase): 20 21 def setUp(self): 22 self.ssh_host_key = generate_id() 23 self.connector = AgentConnector.objects.create(name=generate_id()) 24 self.token = EnrollmentToken.objects.create(name=generate_id(), connector=self.connector) 25 self.device = Device.objects.create( 26 name=generate_id(), 27 identifier=generate_id(), 28 ) 29 self.connection = AgentDeviceConnection.objects.create( 30 device=self.device, 31 connector=self.connector, 32 ) 33 self.connection.create_snapshot( 34 data={ 35 "vendor": { 36 "goauthentik.io/platform": { 37 "ssh_host_keys": [ 38 "foo", 39 self.ssh_host_key, 40 "baz", 41 ] 42 } 43 } 44 } 45 ) 46 self.user = create_test_user() 47 self.provider = OAuth2Provider.objects.create( 48 name=generate_id(), signing_key=create_test_cert() 49 ) 50 self.raw_token = self.provider.encode({"foo": "bar"}) 51 self.token = AccessToken.objects.create( 52 provider=self.provider, user=self.user, token=self.raw_token, auth_time=now() 53 ) 54 self.connector.jwt_federation_providers.add(self.provider) 55 56 @reconcile_app("authentik_crypto") 57 def test_auth_fed_no_access(self): 58 response = self.client.post( 59 reverse("authentik_api:agentconnector-auth-fed") + f"?device={self.device.name}", 60 HTTP_AUTHORIZATION=f"Bearer {self.raw_token}", 61 ) 62 self.assertEqual(response.status_code, 400) 63 64 @reconcile_app("authentik_crypto") 65 def test_auth_fed_policy_group_by_host_key(self): 66 device_group = DeviceAccessGroup.objects.create(name=generate_id()) 67 self.device.access_group = device_group 68 self.device.save() 69 70 group = Group.objects.create(name=generate_id()) 71 group.users.add(self.user) 72 73 PolicyBinding.objects.create(target=device_group, group=group, order=0) 74 75 response = self.client.post( 76 reverse("authentik_api:agentconnector-auth-fed") + f"?device={self.ssh_host_key}", 77 HTTP_AUTHORIZATION=f"Bearer {self.raw_token}", 78 ) 79 self.assertEqual(response.status_code, 200) 80 res = loads(response.content) 81 token = decode(res["token"], options={"verify_signature": False}) 82 self.assertEqual(token["iss"], "goauthentik.io/platform") 83 self.assertEqual(token["aud"], str(self.device.pk)) 84 85 @reconcile_app("authentik_crypto") 86 def test_auth_fed_policy_group(self): 87 device_group = DeviceAccessGroup.objects.create(name=generate_id()) 88 self.device.access_group = device_group 89 self.device.save() 90 91 group = Group.objects.create(name=generate_id()) 92 group.users.add(self.user) 93 94 PolicyBinding.objects.create(target=device_group, group=group, order=0) 95 96 response = self.client.post( 97 reverse("authentik_api:agentconnector-auth-fed") + f"?device={self.device.name}", 98 HTTP_AUTHORIZATION=f"Bearer {self.raw_token}", 99 ) 100 self.assertEqual(response.status_code, 200) 101 res = loads(response.content) 102 token = decode(res["token"], options={"verify_signature": False}) 103 self.assertEqual(token["iss"], "goauthentik.io/platform") 104 self.assertEqual(token["aud"], str(self.device.pk)) 105 106 @reconcile_app("authentik_crypto") 107 def test_auth_fed_policy_group_deny(self): 108 device_group = DeviceAccessGroup.objects.create(name=generate_id()) 109 self.device.access_group = device_group 110 self.device.save() 111 112 group = Group.objects.create(name=generate_id()) 113 # group.users.add(self.user) 114 115 PolicyBinding.objects.create(target=device_group, group=group, order=0) 116 117 response = self.client.post( 118 reverse("authentik_api:agentconnector-auth-fed") + f"?device={self.device.name}", 119 HTTP_AUTHORIZATION=f"Bearer {self.raw_token}", 120 ) 121 self.assertEqual(response.status_code, 400) 122 self.assertJSONEqual( 123 response.content, 124 { 125 "policy_result": "Policy denied access", 126 "policy_messages": [], 127 }, 128 ) 129 130 @reconcile_app("authentik_crypto") 131 def test_auth_fed_invalid(self): 132 # No token 133 response = self.client.post( 134 reverse("authentik_api:agentconnector-auth-fed") + f"?device={self.device.name}foo", 135 ) 136 self.assertEqual(response.status_code, 403) 137 # No device 138 response = self.client.post( 139 reverse("authentik_api:agentconnector-auth-fed") + f"?device={self.device.name}foo", 140 HTTP_AUTHORIZATION=f"Bearer {self.raw_token}", 141 ) 142 self.assertEqual(response.status_code, 403) 143 # invalid token 144 response = self.client.post( 145 reverse("authentik_api:agentconnector-auth-fed") + f"?device={self.device.name}", 146 HTTP_AUTHORIZATION=f"Bearer {self.raw_token}aa", 147 ) 148 self.assertEqual(response.status_code, 403)
class
TestConnectorAuthFed(rest_framework.test.APITestCase):
20class TestConnectorAuthFed(APITestCase): 21 22 def setUp(self): 23 self.ssh_host_key = generate_id() 24 self.connector = AgentConnector.objects.create(name=generate_id()) 25 self.token = EnrollmentToken.objects.create(name=generate_id(), connector=self.connector) 26 self.device = Device.objects.create( 27 name=generate_id(), 28 identifier=generate_id(), 29 ) 30 self.connection = AgentDeviceConnection.objects.create( 31 device=self.device, 32 connector=self.connector, 33 ) 34 self.connection.create_snapshot( 35 data={ 36 "vendor": { 37 "goauthentik.io/platform": { 38 "ssh_host_keys": [ 39 "foo", 40 self.ssh_host_key, 41 "baz", 42 ] 43 } 44 } 45 } 46 ) 47 self.user = create_test_user() 48 self.provider = OAuth2Provider.objects.create( 49 name=generate_id(), signing_key=create_test_cert() 50 ) 51 self.raw_token = self.provider.encode({"foo": "bar"}) 52 self.token = AccessToken.objects.create( 53 provider=self.provider, user=self.user, token=self.raw_token, auth_time=now() 54 ) 55 self.connector.jwt_federation_providers.add(self.provider) 56 57 @reconcile_app("authentik_crypto") 58 def test_auth_fed_no_access(self): 59 response = self.client.post( 60 reverse("authentik_api:agentconnector-auth-fed") + f"?device={self.device.name}", 61 HTTP_AUTHORIZATION=f"Bearer {self.raw_token}", 62 ) 63 self.assertEqual(response.status_code, 400) 64 65 @reconcile_app("authentik_crypto") 66 def test_auth_fed_policy_group_by_host_key(self): 67 device_group = DeviceAccessGroup.objects.create(name=generate_id()) 68 self.device.access_group = device_group 69 self.device.save() 70 71 group = Group.objects.create(name=generate_id()) 72 group.users.add(self.user) 73 74 PolicyBinding.objects.create(target=device_group, group=group, order=0) 75 76 response = self.client.post( 77 reverse("authentik_api:agentconnector-auth-fed") + f"?device={self.ssh_host_key}", 78 HTTP_AUTHORIZATION=f"Bearer {self.raw_token}", 79 ) 80 self.assertEqual(response.status_code, 200) 81 res = loads(response.content) 82 token = decode(res["token"], options={"verify_signature": False}) 83 self.assertEqual(token["iss"], "goauthentik.io/platform") 84 self.assertEqual(token["aud"], str(self.device.pk)) 85 86 @reconcile_app("authentik_crypto") 87 def test_auth_fed_policy_group(self): 88 device_group = DeviceAccessGroup.objects.create(name=generate_id()) 89 self.device.access_group = device_group 90 self.device.save() 91 92 group = Group.objects.create(name=generate_id()) 93 group.users.add(self.user) 94 95 PolicyBinding.objects.create(target=device_group, group=group, order=0) 96 97 response = self.client.post( 98 reverse("authentik_api:agentconnector-auth-fed") + f"?device={self.device.name}", 99 HTTP_AUTHORIZATION=f"Bearer {self.raw_token}", 100 ) 101 self.assertEqual(response.status_code, 200) 102 res = loads(response.content) 103 token = decode(res["token"], options={"verify_signature": False}) 104 self.assertEqual(token["iss"], "goauthentik.io/platform") 105 self.assertEqual(token["aud"], str(self.device.pk)) 106 107 @reconcile_app("authentik_crypto") 108 def test_auth_fed_policy_group_deny(self): 109 device_group = DeviceAccessGroup.objects.create(name=generate_id()) 110 self.device.access_group = device_group 111 self.device.save() 112 113 group = Group.objects.create(name=generate_id()) 114 # group.users.add(self.user) 115 116 PolicyBinding.objects.create(target=device_group, group=group, order=0) 117 118 response = self.client.post( 119 reverse("authentik_api:agentconnector-auth-fed") + f"?device={self.device.name}", 120 HTTP_AUTHORIZATION=f"Bearer {self.raw_token}", 121 ) 122 self.assertEqual(response.status_code, 400) 123 self.assertJSONEqual( 124 response.content, 125 { 126 "policy_result": "Policy denied access", 127 "policy_messages": [], 128 }, 129 ) 130 131 @reconcile_app("authentik_crypto") 132 def test_auth_fed_invalid(self): 133 # No token 134 response = self.client.post( 135 reverse("authentik_api:agentconnector-auth-fed") + f"?device={self.device.name}foo", 136 ) 137 self.assertEqual(response.status_code, 403) 138 # No device 139 response = self.client.post( 140 reverse("authentik_api:agentconnector-auth-fed") + f"?device={self.device.name}foo", 141 HTTP_AUTHORIZATION=f"Bearer {self.raw_token}", 142 ) 143 self.assertEqual(response.status_code, 403) 144 # invalid token 145 response = self.client.post( 146 reverse("authentik_api:agentconnector-auth-fed") + f"?device={self.device.name}", 147 HTTP_AUTHORIZATION=f"Bearer {self.raw_token}aa", 148 ) 149 self.assertEqual(response.status_code, 403)
Similar to TransactionTestCase, but use transaction.atomic() to achieve
test isolation.
In most situations, TestCase should be preferred to TransactionTestCase as it allows faster execution. However, there are some situations where using TransactionTestCase might be necessary (e.g. testing some transactional behavior).
On database backends with no transaction support, TestCase behaves as TransactionTestCase.
def
setUp(self):
22 def setUp(self): 23 self.ssh_host_key = generate_id() 24 self.connector = AgentConnector.objects.create(name=generate_id()) 25 self.token = EnrollmentToken.objects.create(name=generate_id(), connector=self.connector) 26 self.device = Device.objects.create( 27 name=generate_id(), 28 identifier=generate_id(), 29 ) 30 self.connection = AgentDeviceConnection.objects.create( 31 device=self.device, 32 connector=self.connector, 33 ) 34 self.connection.create_snapshot( 35 data={ 36 "vendor": { 37 "goauthentik.io/platform": { 38 "ssh_host_keys": [ 39 "foo", 40 self.ssh_host_key, 41 "baz", 42 ] 43 } 44 } 45 } 46 ) 47 self.user = create_test_user() 48 self.provider = OAuth2Provider.objects.create( 49 name=generate_id(), signing_key=create_test_cert() 50 ) 51 self.raw_token = self.provider.encode({"foo": "bar"}) 52 self.token = AccessToken.objects.create( 53 provider=self.provider, user=self.user, token=self.raw_token, auth_time=now() 54 ) 55 self.connector.jwt_federation_providers.add(self.provider)
Hook method for setting up the test fixture before exercising it.
@reconcile_app('authentik_crypto')
def
test_auth_fed_policy_group_by_host_key(self):
65 @reconcile_app("authentik_crypto") 66 def test_auth_fed_policy_group_by_host_key(self): 67 device_group = DeviceAccessGroup.objects.create(name=generate_id()) 68 self.device.access_group = device_group 69 self.device.save() 70 71 group = Group.objects.create(name=generate_id()) 72 group.users.add(self.user) 73 74 PolicyBinding.objects.create(target=device_group, group=group, order=0) 75 76 response = self.client.post( 77 reverse("authentik_api:agentconnector-auth-fed") + f"?device={self.ssh_host_key}", 78 HTTP_AUTHORIZATION=f"Bearer {self.raw_token}", 79 ) 80 self.assertEqual(response.status_code, 200) 81 res = loads(response.content) 82 token = decode(res["token"], options={"verify_signature": False}) 83 self.assertEqual(token["iss"], "goauthentik.io/platform") 84 self.assertEqual(token["aud"], str(self.device.pk))
@reconcile_app('authentik_crypto')
def
test_auth_fed_policy_group(self):
86 @reconcile_app("authentik_crypto") 87 def test_auth_fed_policy_group(self): 88 device_group = DeviceAccessGroup.objects.create(name=generate_id()) 89 self.device.access_group = device_group 90 self.device.save() 91 92 group = Group.objects.create(name=generate_id()) 93 group.users.add(self.user) 94 95 PolicyBinding.objects.create(target=device_group, group=group, order=0) 96 97 response = self.client.post( 98 reverse("authentik_api:agentconnector-auth-fed") + f"?device={self.device.name}", 99 HTTP_AUTHORIZATION=f"Bearer {self.raw_token}", 100 ) 101 self.assertEqual(response.status_code, 200) 102 res = loads(response.content) 103 token = decode(res["token"], options={"verify_signature": False}) 104 self.assertEqual(token["iss"], "goauthentik.io/platform") 105 self.assertEqual(token["aud"], str(self.device.pk))
@reconcile_app('authentik_crypto')
def
test_auth_fed_policy_group_deny(self):
107 @reconcile_app("authentik_crypto") 108 def test_auth_fed_policy_group_deny(self): 109 device_group = DeviceAccessGroup.objects.create(name=generate_id()) 110 self.device.access_group = device_group 111 self.device.save() 112 113 group = Group.objects.create(name=generate_id()) 114 # group.users.add(self.user) 115 116 PolicyBinding.objects.create(target=device_group, group=group, order=0) 117 118 response = self.client.post( 119 reverse("authentik_api:agentconnector-auth-fed") + f"?device={self.device.name}", 120 HTTP_AUTHORIZATION=f"Bearer {self.raw_token}", 121 ) 122 self.assertEqual(response.status_code, 400) 123 self.assertJSONEqual( 124 response.content, 125 { 126 "policy_result": "Policy denied access", 127 "policy_messages": [], 128 }, 129 )
@reconcile_app('authentik_crypto')
def
test_auth_fed_invalid(self):
131 @reconcile_app("authentik_crypto") 132 def test_auth_fed_invalid(self): 133 # No token 134 response = self.client.post( 135 reverse("authentik_api:agentconnector-auth-fed") + f"?device={self.device.name}foo", 136 ) 137 self.assertEqual(response.status_code, 403) 138 # No device 139 response = self.client.post( 140 reverse("authentik_api:agentconnector-auth-fed") + f"?device={self.device.name}foo", 141 HTTP_AUTHORIZATION=f"Bearer {self.raw_token}", 142 ) 143 self.assertEqual(response.status_code, 403) 144 # invalid token 145 response = self.client.post( 146 reverse("authentik_api:agentconnector-auth-fed") + f"?device={self.device.name}", 147 HTTP_AUTHORIZATION=f"Bearer {self.raw_token}aa", 148 ) 149 self.assertEqual(response.status_code, 403)