authentik.enterprise.endpoints.connectors.agent.tests.test_connector_auth_ia
1from hashlib import sha256 2from json import loads 3from unittest.mock import MagicMock, patch 4from urllib.parse import parse_qs, urlparse 5 6from django.urls import reverse 7from jwt import decode 8 9from authentik.blueprints.tests import reconcile_app 10from authentik.core.models import Group 11from authentik.core.tests.utils import create_test_flow, create_test_user 12from authentik.endpoints.connectors.agent.api.connectors import AgentDeviceConnection 13from authentik.endpoints.connectors.agent.models import AgentConnector, DeviceToken, EnrollmentToken 14from authentik.endpoints.models import Device, DeviceAccessGroup 15from authentik.enterprise.endpoints.connectors.agent.views.auth_interactive import QS_AGENT_IA_TOKEN 16from authentik.enterprise.license import LicenseKey 17from authentik.enterprise.models import License 18from authentik.enterprise.tests.test_license import expiry_valid 19from authentik.flows.tests import FlowTestCase 20from authentik.lib.generators import generate_id 21from authentik.policies.models import PolicyBinding 22 23 24class TestConnectorAuthIA(FlowTestCase): 25 26 def setUp(self): 27 self.connector = AgentConnector.objects.create( 28 name=generate_id(), 29 authorization_flow=create_test_flow(), 30 ) 31 self.token = EnrollmentToken.objects.create(name=generate_id(), connector=self.connector) 32 self.device = Device.objects.create( 33 name=generate_id(), 34 identifier=generate_id(), 35 ) 36 self.connection = AgentDeviceConnection.objects.create( 37 device=self.device, 38 connector=self.connector, 39 ) 40 self.device_token = DeviceToken.objects.create( 41 device=self.connection, 42 key=generate_id(), 43 ) 44 self.user = create_test_user() 45 46 @patch( 47 "authentik.enterprise.license.LicenseKey.validate", 48 MagicMock( 49 return_value=LicenseKey( 50 aud="", 51 exp=expiry_valid, 52 name=generate_id(), 53 internal_users=100, 54 external_users=100, 55 ) 56 ), 57 ) 58 def test_auth_ia_initiate(self): 59 License.objects.create(key=generate_id()) 60 response = self.client.post( 61 reverse("authentik_api:agentconnector-auth-ia"), 62 HTTP_AUTHORIZATION=f"Bearer+agent {self.device_token.key}", 63 ) 64 self.assertEqual(response.status_code, 200) 65 66 @patch( 67 "authentik.enterprise.license.LicenseKey.validate", 68 MagicMock( 69 return_value=LicenseKey( 70 aud="", 71 exp=expiry_valid, 72 name=generate_id(), 73 internal_users=100, 74 external_users=100, 75 ) 76 ), 77 ) 78 @reconcile_app("authentik_crypto") 79 def test_auth_ia_fulfill(self): 80 License.objects.create(key=generate_id()) 81 self.client.force_login(self.user) 82 response = self.client.post( 83 reverse("authentik_api:agentconnector-auth-ia"), 84 HTTP_AUTHORIZATION=f"Bearer+agent {self.device_token.key}", 85 HTTP_X_AUTHENTIK_PLATFORM_AUTH_DTH=sha256(self.device_token.key.encode()).hexdigest(), 86 ) 87 self.assertEqual(response.status_code, 200) 88 res = loads(response.content) 89 response = self.client.get( 90 res["url"], 91 HTTP_AUTHORIZATION=f"Bearer+agent {self.device_token.key}", 92 HTTP_X_AUTHENTIK_PLATFORM_AUTH_DTH=sha256(self.device_token.key.encode()).hexdigest(), 93 ) 94 self.assertEqual(response.status_code, 200) 95 self.assertIn(b"Permission denied", response.content) 96 97 @patch( 98 "authentik.enterprise.license.LicenseKey.validate", 99 MagicMock( 100 return_value=LicenseKey( 101 aud="", 102 exp=expiry_valid, 103 name=generate_id(), 104 internal_users=100, 105 external_users=100, 106 ) 107 ), 108 ) 109 @reconcile_app("authentik_crypto") 110 def test_auth_ia_fulfill_policy(self): 111 License.objects.create(key=generate_id()) 112 device_group = DeviceAccessGroup.objects.create(name=generate_id()) 113 self.device.access_group = device_group 114 self.device.save() 115 116 group = Group.objects.create(name=generate_id()) 117 group.users.add(self.user) 118 119 PolicyBinding.objects.create(target=device_group, group=group, order=0) 120 121 self.client.force_login(self.user) 122 response = self.client.post( 123 reverse("authentik_api:agentconnector-auth-ia"), 124 HTTP_AUTHORIZATION=f"Bearer+agent {self.device_token.key}", 125 HTTP_X_AUTHENTIK_PLATFORM_AUTH_DTH=sha256(self.device_token.key.encode()).hexdigest(), 126 ) 127 self.assertEqual(response.status_code, 200) 128 res = loads(response.content) 129 response = self.client.get( 130 res["url"], 131 HTTP_AUTHORIZATION=f"Bearer+agent {self.device_token.key}", 132 HTTP_X_AUTHENTIK_PLATFORM_AUTH_DTH=sha256(self.device_token.key.encode()).hexdigest(), 133 ) 134 self.assertEqual(response.status_code, 302) 135 url = urlparse(response.url) 136 self.assertEqual(url.scheme, "goauthentik.io") 137 qs = parse_qs(url.query) 138 raw_token = qs[QS_AGENT_IA_TOKEN][0] 139 token = decode(raw_token.encode(), options={"verify_signature": False}) 140 self.assertEqual(token["iss"], "goauthentik.io/platform") 141 self.assertEqual(token["aud"], str(self.device.pk))
25class TestConnectorAuthIA(FlowTestCase): 26 27 def setUp(self): 28 self.connector = AgentConnector.objects.create( 29 name=generate_id(), 30 authorization_flow=create_test_flow(), 31 ) 32 self.token = EnrollmentToken.objects.create(name=generate_id(), connector=self.connector) 33 self.device = Device.objects.create( 34 name=generate_id(), 35 identifier=generate_id(), 36 ) 37 self.connection = AgentDeviceConnection.objects.create( 38 device=self.device, 39 connector=self.connector, 40 ) 41 self.device_token = DeviceToken.objects.create( 42 device=self.connection, 43 key=generate_id(), 44 ) 45 self.user = create_test_user() 46 47 @patch( 48 "authentik.enterprise.license.LicenseKey.validate", 49 MagicMock( 50 return_value=LicenseKey( 51 aud="", 52 exp=expiry_valid, 53 name=generate_id(), 54 internal_users=100, 55 external_users=100, 56 ) 57 ), 58 ) 59 def test_auth_ia_initiate(self): 60 License.objects.create(key=generate_id()) 61 response = self.client.post( 62 reverse("authentik_api:agentconnector-auth-ia"), 63 HTTP_AUTHORIZATION=f"Bearer+agent {self.device_token.key}", 64 ) 65 self.assertEqual(response.status_code, 200) 66 67 @patch( 68 "authentik.enterprise.license.LicenseKey.validate", 69 MagicMock( 70 return_value=LicenseKey( 71 aud="", 72 exp=expiry_valid, 73 name=generate_id(), 74 internal_users=100, 75 external_users=100, 76 ) 77 ), 78 ) 79 @reconcile_app("authentik_crypto") 80 def test_auth_ia_fulfill(self): 81 License.objects.create(key=generate_id()) 82 self.client.force_login(self.user) 83 response = self.client.post( 84 reverse("authentik_api:agentconnector-auth-ia"), 85 HTTP_AUTHORIZATION=f"Bearer+agent {self.device_token.key}", 86 HTTP_X_AUTHENTIK_PLATFORM_AUTH_DTH=sha256(self.device_token.key.encode()).hexdigest(), 87 ) 88 self.assertEqual(response.status_code, 200) 89 res = loads(response.content) 90 response = self.client.get( 91 res["url"], 92 HTTP_AUTHORIZATION=f"Bearer+agent {self.device_token.key}", 93 HTTP_X_AUTHENTIK_PLATFORM_AUTH_DTH=sha256(self.device_token.key.encode()).hexdigest(), 94 ) 95 self.assertEqual(response.status_code, 200) 96 self.assertIn(b"Permission denied", response.content) 97 98 @patch( 99 "authentik.enterprise.license.LicenseKey.validate", 100 MagicMock( 101 return_value=LicenseKey( 102 aud="", 103 exp=expiry_valid, 104 name=generate_id(), 105 internal_users=100, 106 external_users=100, 107 ) 108 ), 109 ) 110 @reconcile_app("authentik_crypto") 111 def test_auth_ia_fulfill_policy(self): 112 License.objects.create(key=generate_id()) 113 device_group = DeviceAccessGroup.objects.create(name=generate_id()) 114 self.device.access_group = device_group 115 self.device.save() 116 117 group = Group.objects.create(name=generate_id()) 118 group.users.add(self.user) 119 120 PolicyBinding.objects.create(target=device_group, group=group, order=0) 121 122 self.client.force_login(self.user) 123 response = self.client.post( 124 reverse("authentik_api:agentconnector-auth-ia"), 125 HTTP_AUTHORIZATION=f"Bearer+agent {self.device_token.key}", 126 HTTP_X_AUTHENTIK_PLATFORM_AUTH_DTH=sha256(self.device_token.key.encode()).hexdigest(), 127 ) 128 self.assertEqual(response.status_code, 200) 129 res = loads(response.content) 130 response = self.client.get( 131 res["url"], 132 HTTP_AUTHORIZATION=f"Bearer+agent {self.device_token.key}", 133 HTTP_X_AUTHENTIK_PLATFORM_AUTH_DTH=sha256(self.device_token.key.encode()).hexdigest(), 134 ) 135 self.assertEqual(response.status_code, 302) 136 url = urlparse(response.url) 137 self.assertEqual(url.scheme, "goauthentik.io") 138 qs = parse_qs(url.query) 139 raw_token = qs[QS_AGENT_IA_TOKEN][0] 140 token = decode(raw_token.encode(), options={"verify_signature": False}) 141 self.assertEqual(token["iss"], "goauthentik.io/platform") 142 self.assertEqual(token["aud"], str(self.device.pk))
Helpers for testing flows and stages.
def
setUp(self):
27 def setUp(self): 28 self.connector = AgentConnector.objects.create( 29 name=generate_id(), 30 authorization_flow=create_test_flow(), 31 ) 32 self.token = EnrollmentToken.objects.create(name=generate_id(), connector=self.connector) 33 self.device = Device.objects.create( 34 name=generate_id(), 35 identifier=generate_id(), 36 ) 37 self.connection = AgentDeviceConnection.objects.create( 38 device=self.device, 39 connector=self.connector, 40 ) 41 self.device_token = DeviceToken.objects.create( 42 device=self.connection, 43 key=generate_id(), 44 ) 45 self.user = create_test_user()
Hook method for setting up the test fixture before exercising it.
@patch('authentik.enterprise.license.LicenseKey.validate', MagicMock(return_value=LicenseKey(aud='', exp=expiry_valid, name=generate_id(), internal_users=100, external_users=100)))
def
test_auth_ia_initiate(self):
47 @patch( 48 "authentik.enterprise.license.LicenseKey.validate", 49 MagicMock( 50 return_value=LicenseKey( 51 aud="", 52 exp=expiry_valid, 53 name=generate_id(), 54 internal_users=100, 55 external_users=100, 56 ) 57 ), 58 ) 59 def test_auth_ia_initiate(self): 60 License.objects.create(key=generate_id()) 61 response = self.client.post( 62 reverse("authentik_api:agentconnector-auth-ia"), 63 HTTP_AUTHORIZATION=f"Bearer+agent {self.device_token.key}", 64 ) 65 self.assertEqual(response.status_code, 200)
@patch('authentik.enterprise.license.LicenseKey.validate', MagicMock(return_value=LicenseKey(aud='', exp=expiry_valid, name=generate_id(), internal_users=100, external_users=100)))
@reconcile_app('authentik_crypto')
def
test_auth_ia_fulfill(self):
67 @patch( 68 "authentik.enterprise.license.LicenseKey.validate", 69 MagicMock( 70 return_value=LicenseKey( 71 aud="", 72 exp=expiry_valid, 73 name=generate_id(), 74 internal_users=100, 75 external_users=100, 76 ) 77 ), 78 ) 79 @reconcile_app("authentik_crypto") 80 def test_auth_ia_fulfill(self): 81 License.objects.create(key=generate_id()) 82 self.client.force_login(self.user) 83 response = self.client.post( 84 reverse("authentik_api:agentconnector-auth-ia"), 85 HTTP_AUTHORIZATION=f"Bearer+agent {self.device_token.key}", 86 HTTP_X_AUTHENTIK_PLATFORM_AUTH_DTH=sha256(self.device_token.key.encode()).hexdigest(), 87 ) 88 self.assertEqual(response.status_code, 200) 89 res = loads(response.content) 90 response = self.client.get( 91 res["url"], 92 HTTP_AUTHORIZATION=f"Bearer+agent {self.device_token.key}", 93 HTTP_X_AUTHENTIK_PLATFORM_AUTH_DTH=sha256(self.device_token.key.encode()).hexdigest(), 94 ) 95 self.assertEqual(response.status_code, 200) 96 self.assertIn(b"Permission denied", response.content)
@patch('authentik.enterprise.license.LicenseKey.validate', MagicMock(return_value=LicenseKey(aud='', exp=expiry_valid, name=generate_id(), internal_users=100, external_users=100)))
@reconcile_app('authentik_crypto')
def
test_auth_ia_fulfill_policy(self):
98 @patch( 99 "authentik.enterprise.license.LicenseKey.validate", 100 MagicMock( 101 return_value=LicenseKey( 102 aud="", 103 exp=expiry_valid, 104 name=generate_id(), 105 internal_users=100, 106 external_users=100, 107 ) 108 ), 109 ) 110 @reconcile_app("authentik_crypto") 111 def test_auth_ia_fulfill_policy(self): 112 License.objects.create(key=generate_id()) 113 device_group = DeviceAccessGroup.objects.create(name=generate_id()) 114 self.device.access_group = device_group 115 self.device.save() 116 117 group = Group.objects.create(name=generate_id()) 118 group.users.add(self.user) 119 120 PolicyBinding.objects.create(target=device_group, group=group, order=0) 121 122 self.client.force_login(self.user) 123 response = self.client.post( 124 reverse("authentik_api:agentconnector-auth-ia"), 125 HTTP_AUTHORIZATION=f"Bearer+agent {self.device_token.key}", 126 HTTP_X_AUTHENTIK_PLATFORM_AUTH_DTH=sha256(self.device_token.key.encode()).hexdigest(), 127 ) 128 self.assertEqual(response.status_code, 200) 129 res = loads(response.content) 130 response = self.client.get( 131 res["url"], 132 HTTP_AUTHORIZATION=f"Bearer+agent {self.device_token.key}", 133 HTTP_X_AUTHENTIK_PLATFORM_AUTH_DTH=sha256(self.device_token.key.encode()).hexdigest(), 134 ) 135 self.assertEqual(response.status_code, 302) 136 url = urlparse(response.url) 137 self.assertEqual(url.scheme, "goauthentik.io") 138 qs = parse_qs(url.query) 139 raw_token = qs[QS_AGENT_IA_TOKEN][0] 140 token = decode(raw_token.encode(), options={"verify_signature": False}) 141 self.assertEqual(token["iss"], "goauthentik.io/platform") 142 self.assertEqual(token["aud"], str(self.device.pk))