authentik.enterprise.endpoints.connectors.agent.tests.test_connector_auth_ia

  1from hashlib import sha256
  2from json import loads
  3from unittest.mock import MagicMock, patch
  4from urllib.parse import parse_qs, urlparse
  5
  6from django.urls import reverse
  7from jwt import decode
  8
  9from authentik.blueprints.tests import reconcile_app
 10from authentik.core.models import Group
 11from authentik.core.tests.utils import create_test_flow, create_test_user
 12from authentik.endpoints.connectors.agent.api.connectors import AgentDeviceConnection
 13from authentik.endpoints.connectors.agent.models import AgentConnector, DeviceToken, EnrollmentToken
 14from authentik.endpoints.models import Device, DeviceAccessGroup
 15from authentik.enterprise.endpoints.connectors.agent.views.auth_interactive import QS_AGENT_IA_TOKEN
 16from authentik.enterprise.license import LicenseKey
 17from authentik.enterprise.models import License
 18from authentik.enterprise.tests.test_license import expiry_valid
 19from authentik.flows.tests import FlowTestCase
 20from authentik.lib.generators import generate_id
 21from authentik.policies.models import PolicyBinding
 22
 23
 24class TestConnectorAuthIA(FlowTestCase):
 25
 26    def setUp(self):
 27        self.connector = AgentConnector.objects.create(
 28            name=generate_id(),
 29            authorization_flow=create_test_flow(),
 30        )
 31        self.token = EnrollmentToken.objects.create(name=generate_id(), connector=self.connector)
 32        self.device = Device.objects.create(
 33            name=generate_id(),
 34            identifier=generate_id(),
 35        )
 36        self.connection = AgentDeviceConnection.objects.create(
 37            device=self.device,
 38            connector=self.connector,
 39        )
 40        self.device_token = DeviceToken.objects.create(
 41            device=self.connection,
 42            key=generate_id(),
 43        )
 44        self.user = create_test_user()
 45
 46    @patch(
 47        "authentik.enterprise.license.LicenseKey.validate",
 48        MagicMock(
 49            return_value=LicenseKey(
 50                aud="",
 51                exp=expiry_valid,
 52                name=generate_id(),
 53                internal_users=100,
 54                external_users=100,
 55            )
 56        ),
 57    )
 58    def test_auth_ia_initiate(self):
 59        License.objects.create(key=generate_id())
 60        response = self.client.post(
 61            reverse("authentik_api:agentconnector-auth-ia"),
 62            HTTP_AUTHORIZATION=f"Bearer+agent {self.device_token.key}",
 63        )
 64        self.assertEqual(response.status_code, 200)
 65
 66    @patch(
 67        "authentik.enterprise.license.LicenseKey.validate",
 68        MagicMock(
 69            return_value=LicenseKey(
 70                aud="",
 71                exp=expiry_valid,
 72                name=generate_id(),
 73                internal_users=100,
 74                external_users=100,
 75            )
 76        ),
 77    )
 78    @reconcile_app("authentik_crypto")
 79    def test_auth_ia_fulfill(self):
 80        License.objects.create(key=generate_id())
 81        self.client.force_login(self.user)
 82        response = self.client.post(
 83            reverse("authentik_api:agentconnector-auth-ia"),
 84            HTTP_AUTHORIZATION=f"Bearer+agent {self.device_token.key}",
 85            HTTP_X_AUTHENTIK_PLATFORM_AUTH_DTH=sha256(self.device_token.key.encode()).hexdigest(),
 86        )
 87        self.assertEqual(response.status_code, 200)
 88        res = loads(response.content)
 89        response = self.client.get(
 90            res["url"],
 91            HTTP_AUTHORIZATION=f"Bearer+agent {self.device_token.key}",
 92            HTTP_X_AUTHENTIK_PLATFORM_AUTH_DTH=sha256(self.device_token.key.encode()).hexdigest(),
 93        )
 94        self.assertEqual(response.status_code, 200)
 95        self.assertIn(b"Permission denied", response.content)
 96
 97    @patch(
 98        "authentik.enterprise.license.LicenseKey.validate",
 99        MagicMock(
100            return_value=LicenseKey(
101                aud="",
102                exp=expiry_valid,
103                name=generate_id(),
104                internal_users=100,
105                external_users=100,
106            )
107        ),
108    )
109    @reconcile_app("authentik_crypto")
110    def test_auth_ia_fulfill_policy(self):
111        License.objects.create(key=generate_id())
112        device_group = DeviceAccessGroup.objects.create(name=generate_id())
113        self.device.access_group = device_group
114        self.device.save()
115
116        group = Group.objects.create(name=generate_id())
117        group.users.add(self.user)
118
119        PolicyBinding.objects.create(target=device_group, group=group, order=0)
120
121        self.client.force_login(self.user)
122        response = self.client.post(
123            reverse("authentik_api:agentconnector-auth-ia"),
124            HTTP_AUTHORIZATION=f"Bearer+agent {self.device_token.key}",
125            HTTP_X_AUTHENTIK_PLATFORM_AUTH_DTH=sha256(self.device_token.key.encode()).hexdigest(),
126        )
127        self.assertEqual(response.status_code, 200)
128        res = loads(response.content)
129        response = self.client.get(
130            res["url"],
131            HTTP_AUTHORIZATION=f"Bearer+agent {self.device_token.key}",
132            HTTP_X_AUTHENTIK_PLATFORM_AUTH_DTH=sha256(self.device_token.key.encode()).hexdigest(),
133        )
134        self.assertEqual(response.status_code, 302)
135        url = urlparse(response.url)
136        self.assertEqual(url.scheme, "goauthentik.io")
137        qs = parse_qs(url.query)
138        raw_token = qs[QS_AGENT_IA_TOKEN][0]
139        token = decode(raw_token.encode(), options={"verify_signature": False})
140        self.assertEqual(token["iss"], "goauthentik.io/platform")
141        self.assertEqual(token["aud"], str(self.device.pk))
class TestConnectorAuthIA(authentik.flows.tests.FlowTestCase):
 25class TestConnectorAuthIA(FlowTestCase):
 26
 27    def setUp(self):
 28        self.connector = AgentConnector.objects.create(
 29            name=generate_id(),
 30            authorization_flow=create_test_flow(),
 31        )
 32        self.token = EnrollmentToken.objects.create(name=generate_id(), connector=self.connector)
 33        self.device = Device.objects.create(
 34            name=generate_id(),
 35            identifier=generate_id(),
 36        )
 37        self.connection = AgentDeviceConnection.objects.create(
 38            device=self.device,
 39            connector=self.connector,
 40        )
 41        self.device_token = DeviceToken.objects.create(
 42            device=self.connection,
 43            key=generate_id(),
 44        )
 45        self.user = create_test_user()
 46
 47    @patch(
 48        "authentik.enterprise.license.LicenseKey.validate",
 49        MagicMock(
 50            return_value=LicenseKey(
 51                aud="",
 52                exp=expiry_valid,
 53                name=generate_id(),
 54                internal_users=100,
 55                external_users=100,
 56            )
 57        ),
 58    )
 59    def test_auth_ia_initiate(self):
 60        License.objects.create(key=generate_id())
 61        response = self.client.post(
 62            reverse("authentik_api:agentconnector-auth-ia"),
 63            HTTP_AUTHORIZATION=f"Bearer+agent {self.device_token.key}",
 64        )
 65        self.assertEqual(response.status_code, 200)
 66
 67    @patch(
 68        "authentik.enterprise.license.LicenseKey.validate",
 69        MagicMock(
 70            return_value=LicenseKey(
 71                aud="",
 72                exp=expiry_valid,
 73                name=generate_id(),
 74                internal_users=100,
 75                external_users=100,
 76            )
 77        ),
 78    )
 79    @reconcile_app("authentik_crypto")
 80    def test_auth_ia_fulfill(self):
 81        License.objects.create(key=generate_id())
 82        self.client.force_login(self.user)
 83        response = self.client.post(
 84            reverse("authentik_api:agentconnector-auth-ia"),
 85            HTTP_AUTHORIZATION=f"Bearer+agent {self.device_token.key}",
 86            HTTP_X_AUTHENTIK_PLATFORM_AUTH_DTH=sha256(self.device_token.key.encode()).hexdigest(),
 87        )
 88        self.assertEqual(response.status_code, 200)
 89        res = loads(response.content)
 90        response = self.client.get(
 91            res["url"],
 92            HTTP_AUTHORIZATION=f"Bearer+agent {self.device_token.key}",
 93            HTTP_X_AUTHENTIK_PLATFORM_AUTH_DTH=sha256(self.device_token.key.encode()).hexdigest(),
 94        )
 95        self.assertEqual(response.status_code, 200)
 96        self.assertIn(b"Permission denied", response.content)
 97
 98    @patch(
 99        "authentik.enterprise.license.LicenseKey.validate",
100        MagicMock(
101            return_value=LicenseKey(
102                aud="",
103                exp=expiry_valid,
104                name=generate_id(),
105                internal_users=100,
106                external_users=100,
107            )
108        ),
109    )
110    @reconcile_app("authentik_crypto")
111    def test_auth_ia_fulfill_policy(self):
112        License.objects.create(key=generate_id())
113        device_group = DeviceAccessGroup.objects.create(name=generate_id())
114        self.device.access_group = device_group
115        self.device.save()
116
117        group = Group.objects.create(name=generate_id())
118        group.users.add(self.user)
119
120        PolicyBinding.objects.create(target=device_group, group=group, order=0)
121
122        self.client.force_login(self.user)
123        response = self.client.post(
124            reverse("authentik_api:agentconnector-auth-ia"),
125            HTTP_AUTHORIZATION=f"Bearer+agent {self.device_token.key}",
126            HTTP_X_AUTHENTIK_PLATFORM_AUTH_DTH=sha256(self.device_token.key.encode()).hexdigest(),
127        )
128        self.assertEqual(response.status_code, 200)
129        res = loads(response.content)
130        response = self.client.get(
131            res["url"],
132            HTTP_AUTHORIZATION=f"Bearer+agent {self.device_token.key}",
133            HTTP_X_AUTHENTIK_PLATFORM_AUTH_DTH=sha256(self.device_token.key.encode()).hexdigest(),
134        )
135        self.assertEqual(response.status_code, 302)
136        url = urlparse(response.url)
137        self.assertEqual(url.scheme, "goauthentik.io")
138        qs = parse_qs(url.query)
139        raw_token = qs[QS_AGENT_IA_TOKEN][0]
140        token = decode(raw_token.encode(), options={"verify_signature": False})
141        self.assertEqual(token["iss"], "goauthentik.io/platform")
142        self.assertEqual(token["aud"], str(self.device.pk))

Helpers for testing flows and stages.

def setUp(self):
27    def setUp(self):
28        self.connector = AgentConnector.objects.create(
29            name=generate_id(),
30            authorization_flow=create_test_flow(),
31        )
32        self.token = EnrollmentToken.objects.create(name=generate_id(), connector=self.connector)
33        self.device = Device.objects.create(
34            name=generate_id(),
35            identifier=generate_id(),
36        )
37        self.connection = AgentDeviceConnection.objects.create(
38            device=self.device,
39            connector=self.connector,
40        )
41        self.device_token = DeviceToken.objects.create(
42            device=self.connection,
43            key=generate_id(),
44        )
45        self.user = create_test_user()

Hook method for setting up the test fixture before exercising it.

@patch('authentik.enterprise.license.LicenseKey.validate', MagicMock(return_value=LicenseKey(aud='', exp=expiry_valid, name=generate_id(), internal_users=100, external_users=100)))
def test_auth_ia_initiate(self):
47    @patch(
48        "authentik.enterprise.license.LicenseKey.validate",
49        MagicMock(
50            return_value=LicenseKey(
51                aud="",
52                exp=expiry_valid,
53                name=generate_id(),
54                internal_users=100,
55                external_users=100,
56            )
57        ),
58    )
59    def test_auth_ia_initiate(self):
60        License.objects.create(key=generate_id())
61        response = self.client.post(
62            reverse("authentik_api:agentconnector-auth-ia"),
63            HTTP_AUTHORIZATION=f"Bearer+agent {self.device_token.key}",
64        )
65        self.assertEqual(response.status_code, 200)
@patch('authentik.enterprise.license.LicenseKey.validate', MagicMock(return_value=LicenseKey(aud='', exp=expiry_valid, name=generate_id(), internal_users=100, external_users=100)))
@reconcile_app('authentik_crypto')
def test_auth_ia_fulfill(self):
67    @patch(
68        "authentik.enterprise.license.LicenseKey.validate",
69        MagicMock(
70            return_value=LicenseKey(
71                aud="",
72                exp=expiry_valid,
73                name=generate_id(),
74                internal_users=100,
75                external_users=100,
76            )
77        ),
78    )
79    @reconcile_app("authentik_crypto")
80    def test_auth_ia_fulfill(self):
81        License.objects.create(key=generate_id())
82        self.client.force_login(self.user)
83        response = self.client.post(
84            reverse("authentik_api:agentconnector-auth-ia"),
85            HTTP_AUTHORIZATION=f"Bearer+agent {self.device_token.key}",
86            HTTP_X_AUTHENTIK_PLATFORM_AUTH_DTH=sha256(self.device_token.key.encode()).hexdigest(),
87        )
88        self.assertEqual(response.status_code, 200)
89        res = loads(response.content)
90        response = self.client.get(
91            res["url"],
92            HTTP_AUTHORIZATION=f"Bearer+agent {self.device_token.key}",
93            HTTP_X_AUTHENTIK_PLATFORM_AUTH_DTH=sha256(self.device_token.key.encode()).hexdigest(),
94        )
95        self.assertEqual(response.status_code, 200)
96        self.assertIn(b"Permission denied", response.content)
@patch('authentik.enterprise.license.LicenseKey.validate', MagicMock(return_value=LicenseKey(aud='', exp=expiry_valid, name=generate_id(), internal_users=100, external_users=100)))
@reconcile_app('authentik_crypto')
def test_auth_ia_fulfill_policy(self):
 98    @patch(
 99        "authentik.enterprise.license.LicenseKey.validate",
100        MagicMock(
101            return_value=LicenseKey(
102                aud="",
103                exp=expiry_valid,
104                name=generate_id(),
105                internal_users=100,
106                external_users=100,
107            )
108        ),
109    )
110    @reconcile_app("authentik_crypto")
111    def test_auth_ia_fulfill_policy(self):
112        License.objects.create(key=generate_id())
113        device_group = DeviceAccessGroup.objects.create(name=generate_id())
114        self.device.access_group = device_group
115        self.device.save()
116
117        group = Group.objects.create(name=generate_id())
118        group.users.add(self.user)
119
120        PolicyBinding.objects.create(target=device_group, group=group, order=0)
121
122        self.client.force_login(self.user)
123        response = self.client.post(
124            reverse("authentik_api:agentconnector-auth-ia"),
125            HTTP_AUTHORIZATION=f"Bearer+agent {self.device_token.key}",
126            HTTP_X_AUTHENTIK_PLATFORM_AUTH_DTH=sha256(self.device_token.key.encode()).hexdigest(),
127        )
128        self.assertEqual(response.status_code, 200)
129        res = loads(response.content)
130        response = self.client.get(
131            res["url"],
132            HTTP_AUTHORIZATION=f"Bearer+agent {self.device_token.key}",
133            HTTP_X_AUTHENTIK_PLATFORM_AUTH_DTH=sha256(self.device_token.key.encode()).hexdigest(),
134        )
135        self.assertEqual(response.status_code, 302)
136        url = urlparse(response.url)
137        self.assertEqual(url.scheme, "goauthentik.io")
138        qs = parse_qs(url.query)
139        raw_token = qs[QS_AGENT_IA_TOKEN][0]
140        token = decode(raw_token.encode(), options={"verify_signature": False})
141        self.assertEqual(token["iss"], "goauthentik.io/platform")
142        self.assertEqual(token["aud"], str(self.device.pk))