authentik.enterprise.endpoints.connectors.agent.tests.test_connector_auth_fed

  1from json import loads
  2
  3from django.urls import reverse
  4from django.utils.timezone import now
  5from jwt import decode
  6from rest_framework.test import APITestCase
  7
  8from authentik.blueprints.tests import reconcile_app
  9from authentik.core.models import Group
 10from authentik.core.tests.utils import create_test_cert, create_test_user
 11from authentik.endpoints.connectors.agent.api.connectors import AgentDeviceConnection
 12from authentik.endpoints.connectors.agent.models import AgentConnector, EnrollmentToken
 13from authentik.endpoints.models import Device, DeviceAccessGroup
 14from authentik.lib.generators import generate_id
 15from authentik.policies.models import PolicyBinding
 16from authentik.providers.oauth2.models import AccessToken, OAuth2Provider
 17
 18
 19class TestConnectorAuthFed(APITestCase):
 20
 21    def setUp(self):
 22        self.ssh_host_key = generate_id()
 23        self.connector = AgentConnector.objects.create(name=generate_id())
 24        self.token = EnrollmentToken.objects.create(name=generate_id(), connector=self.connector)
 25        self.device = Device.objects.create(
 26            name=generate_id(),
 27            identifier=generate_id(),
 28        )
 29        self.connection = AgentDeviceConnection.objects.create(
 30            device=self.device,
 31            connector=self.connector,
 32        )
 33        self.connection.create_snapshot(
 34            data={
 35                "vendor": {
 36                    "goauthentik.io/platform": {
 37                        "ssh_host_keys": [
 38                            "foo",
 39                            self.ssh_host_key,
 40                            "baz",
 41                        ]
 42                    }
 43                }
 44            }
 45        )
 46        self.user = create_test_user()
 47        self.provider = OAuth2Provider.objects.create(
 48            name=generate_id(), signing_key=create_test_cert()
 49        )
 50        self.raw_token = self.provider.encode({"foo": "bar"})
 51        self.token = AccessToken.objects.create(
 52            provider=self.provider, user=self.user, token=self.raw_token, auth_time=now()
 53        )
 54        self.connector.jwt_federation_providers.add(self.provider)
 55
 56    @reconcile_app("authentik_crypto")
 57    def test_auth_fed_no_access(self):
 58        response = self.client.post(
 59            reverse("authentik_api:agentconnector-auth-fed") + f"?device={self.device.name}",
 60            HTTP_AUTHORIZATION=f"Bearer {self.raw_token}",
 61        )
 62        self.assertEqual(response.status_code, 400)
 63
 64    @reconcile_app("authentik_crypto")
 65    def test_auth_fed_policy_group_by_host_key(self):
 66        device_group = DeviceAccessGroup.objects.create(name=generate_id())
 67        self.device.access_group = device_group
 68        self.device.save()
 69
 70        group = Group.objects.create(name=generate_id())
 71        group.users.add(self.user)
 72
 73        PolicyBinding.objects.create(target=device_group, group=group, order=0)
 74
 75        response = self.client.post(
 76            reverse("authentik_api:agentconnector-auth-fed") + f"?device={self.ssh_host_key}",
 77            HTTP_AUTHORIZATION=f"Bearer {self.raw_token}",
 78        )
 79        self.assertEqual(response.status_code, 200)
 80        res = loads(response.content)
 81        token = decode(res["token"], options={"verify_signature": False})
 82        self.assertEqual(token["iss"], "goauthentik.io/platform")
 83        self.assertEqual(token["aud"], str(self.device.pk))
 84
 85    @reconcile_app("authentik_crypto")
 86    def test_auth_fed_policy_group(self):
 87        device_group = DeviceAccessGroup.objects.create(name=generate_id())
 88        self.device.access_group = device_group
 89        self.device.save()
 90
 91        group = Group.objects.create(name=generate_id())
 92        group.users.add(self.user)
 93
 94        PolicyBinding.objects.create(target=device_group, group=group, order=0)
 95
 96        response = self.client.post(
 97            reverse("authentik_api:agentconnector-auth-fed") + f"?device={self.device.name}",
 98            HTTP_AUTHORIZATION=f"Bearer {self.raw_token}",
 99        )
100        self.assertEqual(response.status_code, 200)
101        res = loads(response.content)
102        token = decode(res["token"], options={"verify_signature": False})
103        self.assertEqual(token["iss"], "goauthentik.io/platform")
104        self.assertEqual(token["aud"], str(self.device.pk))
105
106    @reconcile_app("authentik_crypto")
107    def test_auth_fed_policy_group_deny(self):
108        device_group = DeviceAccessGroup.objects.create(name=generate_id())
109        self.device.access_group = device_group
110        self.device.save()
111
112        group = Group.objects.create(name=generate_id())
113        # group.users.add(self.user)
114
115        PolicyBinding.objects.create(target=device_group, group=group, order=0)
116
117        response = self.client.post(
118            reverse("authentik_api:agentconnector-auth-fed") + f"?device={self.device.name}",
119            HTTP_AUTHORIZATION=f"Bearer {self.raw_token}",
120        )
121        self.assertEqual(response.status_code, 400)
122        self.assertJSONEqual(
123            response.content,
124            {
125                "policy_result": "Policy denied access",
126                "policy_messages": [],
127            },
128        )
129
130    @reconcile_app("authentik_crypto")
131    def test_auth_fed_invalid(self):
132        # No token
133        response = self.client.post(
134            reverse("authentik_api:agentconnector-auth-fed") + f"?device={self.device.name}foo",
135        )
136        self.assertEqual(response.status_code, 403)
137        # No device
138        response = self.client.post(
139            reverse("authentik_api:agentconnector-auth-fed") + f"?device={self.device.name}foo",
140            HTTP_AUTHORIZATION=f"Bearer {self.raw_token}",
141        )
142        self.assertEqual(response.status_code, 403)
143        # invalid token
144        response = self.client.post(
145            reverse("authentik_api:agentconnector-auth-fed") + f"?device={self.device.name}",
146            HTTP_AUTHORIZATION=f"Bearer {self.raw_token}aa",
147        )
148        self.assertEqual(response.status_code, 403)
class TestConnectorAuthFed(rest_framework.test.APITestCase):
 20class TestConnectorAuthFed(APITestCase):
 21
 22    def setUp(self):
 23        self.ssh_host_key = generate_id()
 24        self.connector = AgentConnector.objects.create(name=generate_id())
 25        self.token = EnrollmentToken.objects.create(name=generate_id(), connector=self.connector)
 26        self.device = Device.objects.create(
 27            name=generate_id(),
 28            identifier=generate_id(),
 29        )
 30        self.connection = AgentDeviceConnection.objects.create(
 31            device=self.device,
 32            connector=self.connector,
 33        )
 34        self.connection.create_snapshot(
 35            data={
 36                "vendor": {
 37                    "goauthentik.io/platform": {
 38                        "ssh_host_keys": [
 39                            "foo",
 40                            self.ssh_host_key,
 41                            "baz",
 42                        ]
 43                    }
 44                }
 45            }
 46        )
 47        self.user = create_test_user()
 48        self.provider = OAuth2Provider.objects.create(
 49            name=generate_id(), signing_key=create_test_cert()
 50        )
 51        self.raw_token = self.provider.encode({"foo": "bar"})
 52        self.token = AccessToken.objects.create(
 53            provider=self.provider, user=self.user, token=self.raw_token, auth_time=now()
 54        )
 55        self.connector.jwt_federation_providers.add(self.provider)
 56
 57    @reconcile_app("authentik_crypto")
 58    def test_auth_fed_no_access(self):
 59        response = self.client.post(
 60            reverse("authentik_api:agentconnector-auth-fed") + f"?device={self.device.name}",
 61            HTTP_AUTHORIZATION=f"Bearer {self.raw_token}",
 62        )
 63        self.assertEqual(response.status_code, 400)
 64
 65    @reconcile_app("authentik_crypto")
 66    def test_auth_fed_policy_group_by_host_key(self):
 67        device_group = DeviceAccessGroup.objects.create(name=generate_id())
 68        self.device.access_group = device_group
 69        self.device.save()
 70
 71        group = Group.objects.create(name=generate_id())
 72        group.users.add(self.user)
 73
 74        PolicyBinding.objects.create(target=device_group, group=group, order=0)
 75
 76        response = self.client.post(
 77            reverse("authentik_api:agentconnector-auth-fed") + f"?device={self.ssh_host_key}",
 78            HTTP_AUTHORIZATION=f"Bearer {self.raw_token}",
 79        )
 80        self.assertEqual(response.status_code, 200)
 81        res = loads(response.content)
 82        token = decode(res["token"], options={"verify_signature": False})
 83        self.assertEqual(token["iss"], "goauthentik.io/platform")
 84        self.assertEqual(token["aud"], str(self.device.pk))
 85
 86    @reconcile_app("authentik_crypto")
 87    def test_auth_fed_policy_group(self):
 88        device_group = DeviceAccessGroup.objects.create(name=generate_id())
 89        self.device.access_group = device_group
 90        self.device.save()
 91
 92        group = Group.objects.create(name=generate_id())
 93        group.users.add(self.user)
 94
 95        PolicyBinding.objects.create(target=device_group, group=group, order=0)
 96
 97        response = self.client.post(
 98            reverse("authentik_api:agentconnector-auth-fed") + f"?device={self.device.name}",
 99            HTTP_AUTHORIZATION=f"Bearer {self.raw_token}",
100        )
101        self.assertEqual(response.status_code, 200)
102        res = loads(response.content)
103        token = decode(res["token"], options={"verify_signature": False})
104        self.assertEqual(token["iss"], "goauthentik.io/platform")
105        self.assertEqual(token["aud"], str(self.device.pk))
106
107    @reconcile_app("authentik_crypto")
108    def test_auth_fed_policy_group_deny(self):
109        device_group = DeviceAccessGroup.objects.create(name=generate_id())
110        self.device.access_group = device_group
111        self.device.save()
112
113        group = Group.objects.create(name=generate_id())
114        # group.users.add(self.user)
115
116        PolicyBinding.objects.create(target=device_group, group=group, order=0)
117
118        response = self.client.post(
119            reverse("authentik_api:agentconnector-auth-fed") + f"?device={self.device.name}",
120            HTTP_AUTHORIZATION=f"Bearer {self.raw_token}",
121        )
122        self.assertEqual(response.status_code, 400)
123        self.assertJSONEqual(
124            response.content,
125            {
126                "policy_result": "Policy denied access",
127                "policy_messages": [],
128            },
129        )
130
131    @reconcile_app("authentik_crypto")
132    def test_auth_fed_invalid(self):
133        # No token
134        response = self.client.post(
135            reverse("authentik_api:agentconnector-auth-fed") + f"?device={self.device.name}foo",
136        )
137        self.assertEqual(response.status_code, 403)
138        # No device
139        response = self.client.post(
140            reverse("authentik_api:agentconnector-auth-fed") + f"?device={self.device.name}foo",
141            HTTP_AUTHORIZATION=f"Bearer {self.raw_token}",
142        )
143        self.assertEqual(response.status_code, 403)
144        # invalid token
145        response = self.client.post(
146            reverse("authentik_api:agentconnector-auth-fed") + f"?device={self.device.name}",
147            HTTP_AUTHORIZATION=f"Bearer {self.raw_token}aa",
148        )
149        self.assertEqual(response.status_code, 403)

Similar to TransactionTestCase, but use transaction.atomic() to achieve test isolation.

In most situations, TestCase should be preferred to TransactionTestCase as it allows faster execution. However, there are some situations where using TransactionTestCase might be necessary (e.g. testing some transactional behavior).

On database backends with no transaction support, TestCase behaves as TransactionTestCase.

def setUp(self):
22    def setUp(self):
23        self.ssh_host_key = generate_id()
24        self.connector = AgentConnector.objects.create(name=generate_id())
25        self.token = EnrollmentToken.objects.create(name=generate_id(), connector=self.connector)
26        self.device = Device.objects.create(
27            name=generate_id(),
28            identifier=generate_id(),
29        )
30        self.connection = AgentDeviceConnection.objects.create(
31            device=self.device,
32            connector=self.connector,
33        )
34        self.connection.create_snapshot(
35            data={
36                "vendor": {
37                    "goauthentik.io/platform": {
38                        "ssh_host_keys": [
39                            "foo",
40                            self.ssh_host_key,
41                            "baz",
42                        ]
43                    }
44                }
45            }
46        )
47        self.user = create_test_user()
48        self.provider = OAuth2Provider.objects.create(
49            name=generate_id(), signing_key=create_test_cert()
50        )
51        self.raw_token = self.provider.encode({"foo": "bar"})
52        self.token = AccessToken.objects.create(
53            provider=self.provider, user=self.user, token=self.raw_token, auth_time=now()
54        )
55        self.connector.jwt_federation_providers.add(self.provider)

Hook method for setting up the test fixture before exercising it.

@reconcile_app('authentik_crypto')
def test_auth_fed_no_access(self):
57    @reconcile_app("authentik_crypto")
58    def test_auth_fed_no_access(self):
59        response = self.client.post(
60            reverse("authentik_api:agentconnector-auth-fed") + f"?device={self.device.name}",
61            HTTP_AUTHORIZATION=f"Bearer {self.raw_token}",
62        )
63        self.assertEqual(response.status_code, 400)
@reconcile_app('authentik_crypto')
def test_auth_fed_policy_group_by_host_key(self):
65    @reconcile_app("authentik_crypto")
66    def test_auth_fed_policy_group_by_host_key(self):
67        device_group = DeviceAccessGroup.objects.create(name=generate_id())
68        self.device.access_group = device_group
69        self.device.save()
70
71        group = Group.objects.create(name=generate_id())
72        group.users.add(self.user)
73
74        PolicyBinding.objects.create(target=device_group, group=group, order=0)
75
76        response = self.client.post(
77            reverse("authentik_api:agentconnector-auth-fed") + f"?device={self.ssh_host_key}",
78            HTTP_AUTHORIZATION=f"Bearer {self.raw_token}",
79        )
80        self.assertEqual(response.status_code, 200)
81        res = loads(response.content)
82        token = decode(res["token"], options={"verify_signature": False})
83        self.assertEqual(token["iss"], "goauthentik.io/platform")
84        self.assertEqual(token["aud"], str(self.device.pk))
@reconcile_app('authentik_crypto')
def test_auth_fed_policy_group(self):
 86    @reconcile_app("authentik_crypto")
 87    def test_auth_fed_policy_group(self):
 88        device_group = DeviceAccessGroup.objects.create(name=generate_id())
 89        self.device.access_group = device_group
 90        self.device.save()
 91
 92        group = Group.objects.create(name=generate_id())
 93        group.users.add(self.user)
 94
 95        PolicyBinding.objects.create(target=device_group, group=group, order=0)
 96
 97        response = self.client.post(
 98            reverse("authentik_api:agentconnector-auth-fed") + f"?device={self.device.name}",
 99            HTTP_AUTHORIZATION=f"Bearer {self.raw_token}",
100        )
101        self.assertEqual(response.status_code, 200)
102        res = loads(response.content)
103        token = decode(res["token"], options={"verify_signature": False})
104        self.assertEqual(token["iss"], "goauthentik.io/platform")
105        self.assertEqual(token["aud"], str(self.device.pk))
@reconcile_app('authentik_crypto')
def test_auth_fed_policy_group_deny(self):
107    @reconcile_app("authentik_crypto")
108    def test_auth_fed_policy_group_deny(self):
109        device_group = DeviceAccessGroup.objects.create(name=generate_id())
110        self.device.access_group = device_group
111        self.device.save()
112
113        group = Group.objects.create(name=generate_id())
114        # group.users.add(self.user)
115
116        PolicyBinding.objects.create(target=device_group, group=group, order=0)
117
118        response = self.client.post(
119            reverse("authentik_api:agentconnector-auth-fed") + f"?device={self.device.name}",
120            HTTP_AUTHORIZATION=f"Bearer {self.raw_token}",
121        )
122        self.assertEqual(response.status_code, 400)
123        self.assertJSONEqual(
124            response.content,
125            {
126                "policy_result": "Policy denied access",
127                "policy_messages": [],
128            },
129        )
@reconcile_app('authentik_crypto')
def test_auth_fed_invalid(self):
131    @reconcile_app("authentik_crypto")
132    def test_auth_fed_invalid(self):
133        # No token
134        response = self.client.post(
135            reverse("authentik_api:agentconnector-auth-fed") + f"?device={self.device.name}foo",
136        )
137        self.assertEqual(response.status_code, 403)
138        # No device
139        response = self.client.post(
140            reverse("authentik_api:agentconnector-auth-fed") + f"?device={self.device.name}foo",
141            HTTP_AUTHORIZATION=f"Bearer {self.raw_token}",
142        )
143        self.assertEqual(response.status_code, 403)
144        # invalid token
145        response = self.client.post(
146            reverse("authentik_api:agentconnector-auth-fed") + f"?device={self.device.name}",
147            HTTP_AUTHORIZATION=f"Bearer {self.raw_token}aa",
148        )
149        self.assertEqual(response.status_code, 403)