authentik.enterprise.endpoints.connectors.agent.tests.test_connector_auth_fed

  1from json import loads
  2
  3from django.urls import reverse
  4from django.utils.timezone import now
  5from jwt import decode
  6from rest_framework.test import APITestCase
  7
  8from authentik.blueprints.tests import reconcile_app
  9from authentik.core.models import Group
 10from authentik.core.tests.utils import create_test_cert, create_test_user
 11from authentik.endpoints.connectors.agent.api.connectors import AgentDeviceConnection
 12from authentik.endpoints.connectors.agent.models import AgentConnector, EnrollmentToken
 13from authentik.endpoints.models import Device, DeviceAccessGroup
 14from authentik.lib.generators import generate_id
 15from authentik.policies.models import PolicyBinding
 16from authentik.providers.oauth2.models import AccessToken, OAuth2Provider
 17
 18
 19class TestConnectorAuthFed(APITestCase):
 20
 21    def setUp(self):
 22        self.connector = AgentConnector.objects.create(name=generate_id())
 23        self.token = EnrollmentToken.objects.create(name=generate_id(), connector=self.connector)
 24        self.device = Device.objects.create(
 25            name=generate_id(),
 26            identifier=generate_id(),
 27        )
 28        self.connection = AgentDeviceConnection.objects.create(
 29            device=self.device,
 30            connector=self.connector,
 31        )
 32        self.user = create_test_user()
 33        self.provider = OAuth2Provider.objects.create(
 34            name=generate_id(), signing_key=create_test_cert()
 35        )
 36        self.raw_token = self.provider.encode({"foo": "bar"})
 37        self.token = AccessToken.objects.create(
 38            provider=self.provider, user=self.user, token=self.raw_token, auth_time=now()
 39        )
 40        self.connector.jwt_federation_providers.add(self.provider)
 41
 42    @reconcile_app("authentik_crypto")
 43    def test_auth_fed(self):
 44        response = self.client.post(
 45            reverse("authentik_api:agentconnector-auth-fed") + f"?device={self.device.name}",
 46            HTTP_AUTHORIZATION=f"Bearer {self.raw_token}",
 47        )
 48        self.assertEqual(response.status_code, 400)
 49
 50    @reconcile_app("authentik_crypto")
 51    def test_auth_fed_policy_group(self):
 52        device_group = DeviceAccessGroup.objects.create(name=generate_id())
 53        self.device.access_group = device_group
 54        self.device.save()
 55
 56        group = Group.objects.create(name=generate_id())
 57        group.users.add(self.user)
 58
 59        PolicyBinding.objects.create(target=device_group, group=group, order=0)
 60
 61        response = self.client.post(
 62            reverse("authentik_api:agentconnector-auth-fed") + f"?device={self.device.name}",
 63            HTTP_AUTHORIZATION=f"Bearer {self.raw_token}",
 64        )
 65        self.assertEqual(response.status_code, 200)
 66        res = loads(response.content)
 67        token = decode(res["token"], options={"verify_signature": False})
 68        self.assertEqual(token["iss"], "goauthentik.io/platform")
 69        self.assertEqual(token["aud"], str(self.device.pk))
 70
 71    @reconcile_app("authentik_crypto")
 72    def test_auth_fed_policy_group_deny(self):
 73        device_group = DeviceAccessGroup.objects.create(name=generate_id())
 74        self.device.access_group = device_group
 75        self.device.save()
 76
 77        group = Group.objects.create(name=generate_id())
 78        # group.users.add(self.user)
 79
 80        PolicyBinding.objects.create(target=device_group, group=group, order=0)
 81
 82        response = self.client.post(
 83            reverse("authentik_api:agentconnector-auth-fed") + f"?device={self.device.name}",
 84            HTTP_AUTHORIZATION=f"Bearer {self.raw_token}",
 85        )
 86        self.assertEqual(response.status_code, 400)
 87        self.assertJSONEqual(
 88            response.content,
 89            {
 90                "policy_result": "Policy denied access",
 91                "policy_messages": [],
 92            },
 93        )
 94
 95    @reconcile_app("authentik_crypto")
 96    def test_auth_fed_invalid(self):
 97        # No token
 98        response = self.client.post(
 99            reverse("authentik_api:agentconnector-auth-fed") + f"?device={self.device.name}foo",
100        )
101        self.assertEqual(response.status_code, 403)
102        # No device
103        response = self.client.post(
104            reverse("authentik_api:agentconnector-auth-fed") + f"?device={self.device.name}foo",
105            HTTP_AUTHORIZATION=f"Bearer {self.raw_token}",
106        )
107        self.assertEqual(response.status_code, 403)
108        # invalid token
109        response = self.client.post(
110            reverse("authentik_api:agentconnector-auth-fed") + f"?device={self.device.name}",
111            HTTP_AUTHORIZATION=f"Bearer {self.raw_token}aa",
112        )
113        self.assertEqual(response.status_code, 403)
class TestConnectorAuthFed(rest_framework.test.APITestCase):
 20class TestConnectorAuthFed(APITestCase):
 21
 22    def setUp(self):
 23        self.connector = AgentConnector.objects.create(name=generate_id())
 24        self.token = EnrollmentToken.objects.create(name=generate_id(), connector=self.connector)
 25        self.device = Device.objects.create(
 26            name=generate_id(),
 27            identifier=generate_id(),
 28        )
 29        self.connection = AgentDeviceConnection.objects.create(
 30            device=self.device,
 31            connector=self.connector,
 32        )
 33        self.user = create_test_user()
 34        self.provider = OAuth2Provider.objects.create(
 35            name=generate_id(), signing_key=create_test_cert()
 36        )
 37        self.raw_token = self.provider.encode({"foo": "bar"})
 38        self.token = AccessToken.objects.create(
 39            provider=self.provider, user=self.user, token=self.raw_token, auth_time=now()
 40        )
 41        self.connector.jwt_federation_providers.add(self.provider)
 42
 43    @reconcile_app("authentik_crypto")
 44    def test_auth_fed(self):
 45        response = self.client.post(
 46            reverse("authentik_api:agentconnector-auth-fed") + f"?device={self.device.name}",
 47            HTTP_AUTHORIZATION=f"Bearer {self.raw_token}",
 48        )
 49        self.assertEqual(response.status_code, 400)
 50
 51    @reconcile_app("authentik_crypto")
 52    def test_auth_fed_policy_group(self):
 53        device_group = DeviceAccessGroup.objects.create(name=generate_id())
 54        self.device.access_group = device_group
 55        self.device.save()
 56
 57        group = Group.objects.create(name=generate_id())
 58        group.users.add(self.user)
 59
 60        PolicyBinding.objects.create(target=device_group, group=group, order=0)
 61
 62        response = self.client.post(
 63            reverse("authentik_api:agentconnector-auth-fed") + f"?device={self.device.name}",
 64            HTTP_AUTHORIZATION=f"Bearer {self.raw_token}",
 65        )
 66        self.assertEqual(response.status_code, 200)
 67        res = loads(response.content)
 68        token = decode(res["token"], options={"verify_signature": False})
 69        self.assertEqual(token["iss"], "goauthentik.io/platform")
 70        self.assertEqual(token["aud"], str(self.device.pk))
 71
 72    @reconcile_app("authentik_crypto")
 73    def test_auth_fed_policy_group_deny(self):
 74        device_group = DeviceAccessGroup.objects.create(name=generate_id())
 75        self.device.access_group = device_group
 76        self.device.save()
 77
 78        group = Group.objects.create(name=generate_id())
 79        # group.users.add(self.user)
 80
 81        PolicyBinding.objects.create(target=device_group, group=group, order=0)
 82
 83        response = self.client.post(
 84            reverse("authentik_api:agentconnector-auth-fed") + f"?device={self.device.name}",
 85            HTTP_AUTHORIZATION=f"Bearer {self.raw_token}",
 86        )
 87        self.assertEqual(response.status_code, 400)
 88        self.assertJSONEqual(
 89            response.content,
 90            {
 91                "policy_result": "Policy denied access",
 92                "policy_messages": [],
 93            },
 94        )
 95
 96    @reconcile_app("authentik_crypto")
 97    def test_auth_fed_invalid(self):
 98        # No token
 99        response = self.client.post(
100            reverse("authentik_api:agentconnector-auth-fed") + f"?device={self.device.name}foo",
101        )
102        self.assertEqual(response.status_code, 403)
103        # No device
104        response = self.client.post(
105            reverse("authentik_api:agentconnector-auth-fed") + f"?device={self.device.name}foo",
106            HTTP_AUTHORIZATION=f"Bearer {self.raw_token}",
107        )
108        self.assertEqual(response.status_code, 403)
109        # invalid token
110        response = self.client.post(
111            reverse("authentik_api:agentconnector-auth-fed") + f"?device={self.device.name}",
112            HTTP_AUTHORIZATION=f"Bearer {self.raw_token}aa",
113        )
114        self.assertEqual(response.status_code, 403)

Similar to TransactionTestCase, but use transaction.atomic() to achieve test isolation.

In most situations, TestCase should be preferred to TransactionTestCase as it allows faster execution. However, there are some situations where using TransactionTestCase might be necessary (e.g. testing some transactional behavior).

On database backends with no transaction support, TestCase behaves as TransactionTestCase.

def setUp(self):
22    def setUp(self):
23        self.connector = AgentConnector.objects.create(name=generate_id())
24        self.token = EnrollmentToken.objects.create(name=generate_id(), connector=self.connector)
25        self.device = Device.objects.create(
26            name=generate_id(),
27            identifier=generate_id(),
28        )
29        self.connection = AgentDeviceConnection.objects.create(
30            device=self.device,
31            connector=self.connector,
32        )
33        self.user = create_test_user()
34        self.provider = OAuth2Provider.objects.create(
35            name=generate_id(), signing_key=create_test_cert()
36        )
37        self.raw_token = self.provider.encode({"foo": "bar"})
38        self.token = AccessToken.objects.create(
39            provider=self.provider, user=self.user, token=self.raw_token, auth_time=now()
40        )
41        self.connector.jwt_federation_providers.add(self.provider)

Hook method for setting up the test fixture before exercising it.

@reconcile_app('authentik_crypto')
def test_auth_fed(self):
43    @reconcile_app("authentik_crypto")
44    def test_auth_fed(self):
45        response = self.client.post(
46            reverse("authentik_api:agentconnector-auth-fed") + f"?device={self.device.name}",
47            HTTP_AUTHORIZATION=f"Bearer {self.raw_token}",
48        )
49        self.assertEqual(response.status_code, 400)
@reconcile_app('authentik_crypto')
def test_auth_fed_policy_group(self):
51    @reconcile_app("authentik_crypto")
52    def test_auth_fed_policy_group(self):
53        device_group = DeviceAccessGroup.objects.create(name=generate_id())
54        self.device.access_group = device_group
55        self.device.save()
56
57        group = Group.objects.create(name=generate_id())
58        group.users.add(self.user)
59
60        PolicyBinding.objects.create(target=device_group, group=group, order=0)
61
62        response = self.client.post(
63            reverse("authentik_api:agentconnector-auth-fed") + f"?device={self.device.name}",
64            HTTP_AUTHORIZATION=f"Bearer {self.raw_token}",
65        )
66        self.assertEqual(response.status_code, 200)
67        res = loads(response.content)
68        token = decode(res["token"], options={"verify_signature": False})
69        self.assertEqual(token["iss"], "goauthentik.io/platform")
70        self.assertEqual(token["aud"], str(self.device.pk))
@reconcile_app('authentik_crypto')
def test_auth_fed_policy_group_deny(self):
72    @reconcile_app("authentik_crypto")
73    def test_auth_fed_policy_group_deny(self):
74        device_group = DeviceAccessGroup.objects.create(name=generate_id())
75        self.device.access_group = device_group
76        self.device.save()
77
78        group = Group.objects.create(name=generate_id())
79        # group.users.add(self.user)
80
81        PolicyBinding.objects.create(target=device_group, group=group, order=0)
82
83        response = self.client.post(
84            reverse("authentik_api:agentconnector-auth-fed") + f"?device={self.device.name}",
85            HTTP_AUTHORIZATION=f"Bearer {self.raw_token}",
86        )
87        self.assertEqual(response.status_code, 400)
88        self.assertJSONEqual(
89            response.content,
90            {
91                "policy_result": "Policy denied access",
92                "policy_messages": [],
93            },
94        )
@reconcile_app('authentik_crypto')
def test_auth_fed_invalid(self):
 96    @reconcile_app("authentik_crypto")
 97    def test_auth_fed_invalid(self):
 98        # No token
 99        response = self.client.post(
100            reverse("authentik_api:agentconnector-auth-fed") + f"?device={self.device.name}foo",
101        )
102        self.assertEqual(response.status_code, 403)
103        # No device
104        response = self.client.post(
105            reverse("authentik_api:agentconnector-auth-fed") + f"?device={self.device.name}foo",
106            HTTP_AUTHORIZATION=f"Bearer {self.raw_token}",
107        )
108        self.assertEqual(response.status_code, 403)
109        # invalid token
110        response = self.client.post(
111            reverse("authentik_api:agentconnector-auth-fed") + f"?device={self.device.name}",
112            HTTP_AUTHORIZATION=f"Bearer {self.raw_token}aa",
113        )
114        self.assertEqual(response.status_code, 403)