authentik.endpoints.connectors.agent.tests.test_agent_api
1from datetime import timedelta 2 3from django.urls import reverse 4from django.utils.timezone import now 5from rest_framework.test import APITestCase 6 7from authentik.blueprints.tests import reconcile_app 8from authentik.core.tests.utils import create_test_admin_user 9from authentik.endpoints.connectors.agent.api.connectors import AgentDeviceConnection 10from authentik.endpoints.connectors.agent.models import AgentConnector, DeviceToken, EnrollmentToken 11from authentik.endpoints.facts import OSFamily 12from authentik.endpoints.models import Device, DeviceAccessGroup 13from authentik.lib.generators import generate_id 14 15CHECK_IN_DATA_VALID = { 16 "disks": [], 17 "hardware": { 18 "cpu_count": 10, 19 "cpu_name": "Apple M1 Pro", 20 "manufacturer": "Apple Inc.", 21 "memory_bytes": 34359738368, 22 "model": "MacBookPro18,1", 23 "serial": generate_id(), 24 }, 25 "network": { 26 "firewall_enabled": True, 27 "hostname": "jens-mbp.lab.beryju.org", 28 "interfaces": [], 29 }, 30 "os": {"arch": "arm64", "family": "mac_os", "name": "macOS", "version": "15.7.1"}, 31 "processes": [], 32 "vendor": {"io.goauthentik.platform": {"agent_version": "0.23.0-dev-8521"}}, 33} 34 35 36class TestAgentAPI(APITestCase): 37 38 def setUp(self): 39 self.connector = AgentConnector.objects.create(name=generate_id()) 40 self.token = EnrollmentToken.objects.create(name=generate_id(), connector=self.connector) 41 self.device = Device.objects.create( 42 identifier=generate_id(), 43 ) 44 self.connection = AgentDeviceConnection.objects.create( 45 device=self.device, 46 connector=self.connector, 47 ) 48 self.device_token = DeviceToken.objects.create( 49 device=self.connection, 50 key=generate_id(), 51 ) 52 53 def test_enroll(self): 54 response = self.client.post( 55 reverse("authentik_api:agentconnector-enroll"), 56 data={"device_serial": generate_id(), "device_name": "bar"}, 57 HTTP_AUTHORIZATION=f"Bearer {self.token.key}", 58 ) 59 self.assertEqual(response.status_code, 200) 60 61 def test_enroll_disabled(self): 62 self.connector.enabled = False 63 self.connector.save() 64 response = self.client.post( 65 reverse("authentik_api:agentconnector-enroll"), 66 data={"device_serial": generate_id(), "device_name": "bar"}, 67 HTTP_AUTHORIZATION=f"Bearer {self.token.key}", 68 ) 69 self.assertEqual(response.status_code, 403) 70 71 def test_enroll_token_delete(self): 72 response = self.client.post( 73 reverse("authentik_api:agentconnector-enroll"), 74 data={"device_serial": self.device.identifier, "device_name": "bar"}, 75 HTTP_AUTHORIZATION=f"Bearer {self.token.key}", 76 ) 77 self.assertEqual(response.status_code, 200) 78 self.assertFalse(DeviceToken.objects.filter(pk=self.device_token.pk).exists()) 79 self.assertEqual(DeviceToken.objects.filter(device=self.connection).count(), 1) 80 81 def test_enroll_group(self): 82 device_group = DeviceAccessGroup.objects.create(name=generate_id()) 83 self.token.device_group = device_group 84 self.token.save() 85 ident = generate_id() 86 response = self.client.post( 87 reverse("authentik_api:agentconnector-enroll"), 88 data={"device_serial": ident, "device_name": "bar"}, 89 HTTP_AUTHORIZATION=f"Bearer {self.token.key}", 90 ) 91 self.assertEqual(response.status_code, 200) 92 device = Device.objects.filter(identifier=ident).first() 93 self.assertIsNotNone(device) 94 self.assertEqual(device.access_group, device_group) 95 96 def test_enroll_expired(self): 97 dev_id = generate_id() 98 self.token.expiring = True 99 self.token.expires = now() - timedelta(hours=1) 100 self.token.save() 101 response = self.client.post( 102 reverse("authentik_api:agentconnector-enroll"), 103 data={"device_serial": dev_id, "device_name": "bar"}, 104 HTTP_AUTHORIZATION=f"Bearer {self.token.key}", 105 ) 106 self.assertEqual(response.status_code, 403) 107 self.assertFalse(Device.objects.filter(identifier=dev_id).exists()) 108 109 @reconcile_app("authentik_crypto") 110 def test_config(self): 111 response = self.client.get( 112 reverse("authentik_api:agentconnector-agent-config"), 113 HTTP_AUTHORIZATION=f"Bearer+agent {self.device_token.key}", 114 ) 115 self.assertEqual(response.status_code, 200) 116 117 @reconcile_app("authentik_crypto") 118 def test_config_disabled(self): 119 self.connector.enabled = False 120 self.connector.save() 121 response = self.client.get( 122 reverse("authentik_api:agentconnector-agent-config"), 123 HTTP_AUTHORIZATION=f"Bearer+agent {self.device_token.key}", 124 ) 125 self.assertEqual(response.status_code, 403) 126 127 def test_check_in(self): 128 response = self.client.post( 129 reverse("authentik_api:agentconnector-check-in"), 130 data=CHECK_IN_DATA_VALID, 131 HTTP_AUTHORIZATION=f"Bearer+agent {self.device_token.key}", 132 ) 133 self.assertEqual(response.status_code, 204) 134 135 def test_check_in_disabled(self): 136 self.connector.enabled = False 137 self.connector.save() 138 response = self.client.post( 139 reverse("authentik_api:agentconnector-check-in"), 140 data=CHECK_IN_DATA_VALID, 141 HTTP_AUTHORIZATION=f"Bearer+agent {self.device_token.key}", 142 ) 143 self.assertEqual(response.status_code, 403) 144 145 def test_check_in_token_expired(self): 146 self.device_token.expiring = True 147 self.device_token.expires = now() - timedelta(hours=1) 148 self.device_token.save() 149 response = self.client.post( 150 reverse("authentik_api:agentconnector-check-in"), 151 data=CHECK_IN_DATA_VALID, 152 HTTP_AUTHORIZATION=f"Bearer+agent {self.device_token.key}", 153 ) 154 self.assertEqual(response.status_code, 403) 155 156 def test_check_in_device_expired(self): 157 self.device.expiring = True 158 self.device.expires = now() - timedelta(hours=1) 159 self.device.save() 160 response = self.client.post( 161 reverse("authentik_api:agentconnector-check-in"), 162 data=CHECK_IN_DATA_VALID, 163 HTTP_AUTHORIZATION=f"Bearer+agent {self.device_token.key}", 164 ) 165 self.assertEqual(response.status_code, 403) 166 167 def test_mdm_api_wrong_platform(self): 168 self.client.force_login(create_test_admin_user()) 169 res = self.client.post( 170 reverse( 171 "authentik_api:agentconnector-mdm-config", 172 kwargs={ 173 "pk": self.connector.pk, 174 }, 175 ), 176 data={"platform": OSFamily.android, "enrollment_token": self.token.pk}, 177 ) 178 self.assertEqual(res.status_code, 400) 179 self.assertJSONEqual(res.content, {"platform": ["Selected platform not supported"]}) 180 181 def test_mdm_api_wrong_token(self): 182 self.client.force_login(create_test_admin_user()) 183 other_connector = AgentConnector.objects.create(name=generate_id()) 184 self.token.connector = other_connector 185 self.token.save() 186 res = self.client.post( 187 reverse( 188 "authentik_api:agentconnector-mdm-config", 189 kwargs={ 190 "pk": self.connector.pk, 191 }, 192 ), 193 data={"platform": OSFamily.macOS, "enrollment_token": self.token.pk}, 194 ) 195 self.assertEqual(res.status_code, 400) 196 self.assertJSONEqual(res.content, {"enrollment_token": ["Invalid token for connector"]}) 197 198 def test_mdm_api_expired_token(self): 199 self.client.force_login(create_test_admin_user()) 200 self.token.expires = now() - timedelta(hours=1) 201 self.token.save() 202 res = self.client.post( 203 reverse( 204 "authentik_api:agentconnector-mdm-config", 205 kwargs={ 206 "pk": self.connector.pk, 207 }, 208 ), 209 data={"platform": OSFamily.macOS, "enrollment_token": self.token.pk}, 210 ) 211 self.assertEqual(res.status_code, 400) 212 self.assertJSONEqual(res.content, {"enrollment_token": ["Token is expired"]}) 213 214 def test_mdm_api(self): 215 self.client.force_login(create_test_admin_user()) 216 res = self.client.post( 217 reverse( 218 "authentik_api:agentconnector-mdm-config", 219 kwargs={ 220 "pk": self.connector.pk, 221 }, 222 ), 223 data={"platform": OSFamily.macOS, "enrollment_token": self.token.pk}, 224 ) 225 self.assertEqual(res.status_code, 200)
CHECK_IN_DATA_VALID =
{'disks': [], 'hardware': {'cpu_count': 10, 'cpu_name': 'Apple M1 Pro', 'manufacturer': 'Apple Inc.', 'memory_bytes': 34359738368, 'model': 'MacBookPro18,1', 'serial': 'SNr7QHYyFseR948AzurcqnlW894DWN968S7OaNPJ'}, 'network': {'firewall_enabled': True, 'hostname': 'jens-mbp.lab.beryju.org', 'interfaces': []}, 'os': {'arch': 'arm64', 'family': 'mac_os', 'name': 'macOS', 'version': '15.7.1'}, 'processes': [], 'vendor': {'io.goauthentik.platform': {'agent_version': '0.23.0-dev-8521'}}}
class
TestAgentAPI(rest_framework.test.APITestCase):
37class TestAgentAPI(APITestCase): 38 39 def setUp(self): 40 self.connector = AgentConnector.objects.create(name=generate_id()) 41 self.token = EnrollmentToken.objects.create(name=generate_id(), connector=self.connector) 42 self.device = Device.objects.create( 43 identifier=generate_id(), 44 ) 45 self.connection = AgentDeviceConnection.objects.create( 46 device=self.device, 47 connector=self.connector, 48 ) 49 self.device_token = DeviceToken.objects.create( 50 device=self.connection, 51 key=generate_id(), 52 ) 53 54 def test_enroll(self): 55 response = self.client.post( 56 reverse("authentik_api:agentconnector-enroll"), 57 data={"device_serial": generate_id(), "device_name": "bar"}, 58 HTTP_AUTHORIZATION=f"Bearer {self.token.key}", 59 ) 60 self.assertEqual(response.status_code, 200) 61 62 def test_enroll_disabled(self): 63 self.connector.enabled = False 64 self.connector.save() 65 response = self.client.post( 66 reverse("authentik_api:agentconnector-enroll"), 67 data={"device_serial": generate_id(), "device_name": "bar"}, 68 HTTP_AUTHORIZATION=f"Bearer {self.token.key}", 69 ) 70 self.assertEqual(response.status_code, 403) 71 72 def test_enroll_token_delete(self): 73 response = self.client.post( 74 reverse("authentik_api:agentconnector-enroll"), 75 data={"device_serial": self.device.identifier, "device_name": "bar"}, 76 HTTP_AUTHORIZATION=f"Bearer {self.token.key}", 77 ) 78 self.assertEqual(response.status_code, 200) 79 self.assertFalse(DeviceToken.objects.filter(pk=self.device_token.pk).exists()) 80 self.assertEqual(DeviceToken.objects.filter(device=self.connection).count(), 1) 81 82 def test_enroll_group(self): 83 device_group = DeviceAccessGroup.objects.create(name=generate_id()) 84 self.token.device_group = device_group 85 self.token.save() 86 ident = generate_id() 87 response = self.client.post( 88 reverse("authentik_api:agentconnector-enroll"), 89 data={"device_serial": ident, "device_name": "bar"}, 90 HTTP_AUTHORIZATION=f"Bearer {self.token.key}", 91 ) 92 self.assertEqual(response.status_code, 200) 93 device = Device.objects.filter(identifier=ident).first() 94 self.assertIsNotNone(device) 95 self.assertEqual(device.access_group, device_group) 96 97 def test_enroll_expired(self): 98 dev_id = generate_id() 99 self.token.expiring = True 100 self.token.expires = now() - timedelta(hours=1) 101 self.token.save() 102 response = self.client.post( 103 reverse("authentik_api:agentconnector-enroll"), 104 data={"device_serial": dev_id, "device_name": "bar"}, 105 HTTP_AUTHORIZATION=f"Bearer {self.token.key}", 106 ) 107 self.assertEqual(response.status_code, 403) 108 self.assertFalse(Device.objects.filter(identifier=dev_id).exists()) 109 110 @reconcile_app("authentik_crypto") 111 def test_config(self): 112 response = self.client.get( 113 reverse("authentik_api:agentconnector-agent-config"), 114 HTTP_AUTHORIZATION=f"Bearer+agent {self.device_token.key}", 115 ) 116 self.assertEqual(response.status_code, 200) 117 118 @reconcile_app("authentik_crypto") 119 def test_config_disabled(self): 120 self.connector.enabled = False 121 self.connector.save() 122 response = self.client.get( 123 reverse("authentik_api:agentconnector-agent-config"), 124 HTTP_AUTHORIZATION=f"Bearer+agent {self.device_token.key}", 125 ) 126 self.assertEqual(response.status_code, 403) 127 128 def test_check_in(self): 129 response = self.client.post( 130 reverse("authentik_api:agentconnector-check-in"), 131 data=CHECK_IN_DATA_VALID, 132 HTTP_AUTHORIZATION=f"Bearer+agent {self.device_token.key}", 133 ) 134 self.assertEqual(response.status_code, 204) 135 136 def test_check_in_disabled(self): 137 self.connector.enabled = False 138 self.connector.save() 139 response = self.client.post( 140 reverse("authentik_api:agentconnector-check-in"), 141 data=CHECK_IN_DATA_VALID, 142 HTTP_AUTHORIZATION=f"Bearer+agent {self.device_token.key}", 143 ) 144 self.assertEqual(response.status_code, 403) 145 146 def test_check_in_token_expired(self): 147 self.device_token.expiring = True 148 self.device_token.expires = now() - timedelta(hours=1) 149 self.device_token.save() 150 response = self.client.post( 151 reverse("authentik_api:agentconnector-check-in"), 152 data=CHECK_IN_DATA_VALID, 153 HTTP_AUTHORIZATION=f"Bearer+agent {self.device_token.key}", 154 ) 155 self.assertEqual(response.status_code, 403) 156 157 def test_check_in_device_expired(self): 158 self.device.expiring = True 159 self.device.expires = now() - timedelta(hours=1) 160 self.device.save() 161 response = self.client.post( 162 reverse("authentik_api:agentconnector-check-in"), 163 data=CHECK_IN_DATA_VALID, 164 HTTP_AUTHORIZATION=f"Bearer+agent {self.device_token.key}", 165 ) 166 self.assertEqual(response.status_code, 403) 167 168 def test_mdm_api_wrong_platform(self): 169 self.client.force_login(create_test_admin_user()) 170 res = self.client.post( 171 reverse( 172 "authentik_api:agentconnector-mdm-config", 173 kwargs={ 174 "pk": self.connector.pk, 175 }, 176 ), 177 data={"platform": OSFamily.android, "enrollment_token": self.token.pk}, 178 ) 179 self.assertEqual(res.status_code, 400) 180 self.assertJSONEqual(res.content, {"platform": ["Selected platform not supported"]}) 181 182 def test_mdm_api_wrong_token(self): 183 self.client.force_login(create_test_admin_user()) 184 other_connector = AgentConnector.objects.create(name=generate_id()) 185 self.token.connector = other_connector 186 self.token.save() 187 res = self.client.post( 188 reverse( 189 "authentik_api:agentconnector-mdm-config", 190 kwargs={ 191 "pk": self.connector.pk, 192 }, 193 ), 194 data={"platform": OSFamily.macOS, "enrollment_token": self.token.pk}, 195 ) 196 self.assertEqual(res.status_code, 400) 197 self.assertJSONEqual(res.content, {"enrollment_token": ["Invalid token for connector"]}) 198 199 def test_mdm_api_expired_token(self): 200 self.client.force_login(create_test_admin_user()) 201 self.token.expires = now() - timedelta(hours=1) 202 self.token.save() 203 res = self.client.post( 204 reverse( 205 "authentik_api:agentconnector-mdm-config", 206 kwargs={ 207 "pk": self.connector.pk, 208 }, 209 ), 210 data={"platform": OSFamily.macOS, "enrollment_token": self.token.pk}, 211 ) 212 self.assertEqual(res.status_code, 400) 213 self.assertJSONEqual(res.content, {"enrollment_token": ["Token is expired"]}) 214 215 def test_mdm_api(self): 216 self.client.force_login(create_test_admin_user()) 217 res = self.client.post( 218 reverse( 219 "authentik_api:agentconnector-mdm-config", 220 kwargs={ 221 "pk": self.connector.pk, 222 }, 223 ), 224 data={"platform": OSFamily.macOS, "enrollment_token": self.token.pk}, 225 ) 226 self.assertEqual(res.status_code, 200)
Similar to TransactionTestCase, but use transaction.atomic() to achieve
test isolation.
In most situations, TestCase should be preferred to TransactionTestCase as it allows faster execution. However, there are some situations where using TransactionTestCase might be necessary (e.g. testing some transactional behavior).
On database backends with no transaction support, TestCase behaves as TransactionTestCase.
def
setUp(self):
39 def setUp(self): 40 self.connector = AgentConnector.objects.create(name=generate_id()) 41 self.token = EnrollmentToken.objects.create(name=generate_id(), connector=self.connector) 42 self.device = Device.objects.create( 43 identifier=generate_id(), 44 ) 45 self.connection = AgentDeviceConnection.objects.create( 46 device=self.device, 47 connector=self.connector, 48 ) 49 self.device_token = DeviceToken.objects.create( 50 device=self.connection, 51 key=generate_id(), 52 )
Hook method for setting up the test fixture before exercising it.
def
test_enroll_disabled(self):
62 def test_enroll_disabled(self): 63 self.connector.enabled = False 64 self.connector.save() 65 response = self.client.post( 66 reverse("authentik_api:agentconnector-enroll"), 67 data={"device_serial": generate_id(), "device_name": "bar"}, 68 HTTP_AUTHORIZATION=f"Bearer {self.token.key}", 69 ) 70 self.assertEqual(response.status_code, 403)
def
test_enroll_token_delete(self):
72 def test_enroll_token_delete(self): 73 response = self.client.post( 74 reverse("authentik_api:agentconnector-enroll"), 75 data={"device_serial": self.device.identifier, "device_name": "bar"}, 76 HTTP_AUTHORIZATION=f"Bearer {self.token.key}", 77 ) 78 self.assertEqual(response.status_code, 200) 79 self.assertFalse(DeviceToken.objects.filter(pk=self.device_token.pk).exists()) 80 self.assertEqual(DeviceToken.objects.filter(device=self.connection).count(), 1)
def
test_enroll_group(self):
82 def test_enroll_group(self): 83 device_group = DeviceAccessGroup.objects.create(name=generate_id()) 84 self.token.device_group = device_group 85 self.token.save() 86 ident = generate_id() 87 response = self.client.post( 88 reverse("authentik_api:agentconnector-enroll"), 89 data={"device_serial": ident, "device_name": "bar"}, 90 HTTP_AUTHORIZATION=f"Bearer {self.token.key}", 91 ) 92 self.assertEqual(response.status_code, 200) 93 device = Device.objects.filter(identifier=ident).first() 94 self.assertIsNotNone(device) 95 self.assertEqual(device.access_group, device_group)
def
test_enroll_expired(self):
97 def test_enroll_expired(self): 98 dev_id = generate_id() 99 self.token.expiring = True 100 self.token.expires = now() - timedelta(hours=1) 101 self.token.save() 102 response = self.client.post( 103 reverse("authentik_api:agentconnector-enroll"), 104 data={"device_serial": dev_id, "device_name": "bar"}, 105 HTTP_AUTHORIZATION=f"Bearer {self.token.key}", 106 ) 107 self.assertEqual(response.status_code, 403) 108 self.assertFalse(Device.objects.filter(identifier=dev_id).exists())
@reconcile_app('authentik_crypto')
def
test_config_disabled(self):
118 @reconcile_app("authentik_crypto") 119 def test_config_disabled(self): 120 self.connector.enabled = False 121 self.connector.save() 122 response = self.client.get( 123 reverse("authentik_api:agentconnector-agent-config"), 124 HTTP_AUTHORIZATION=f"Bearer+agent {self.device_token.key}", 125 ) 126 self.assertEqual(response.status_code, 403)
def
test_check_in_disabled(self):
136 def test_check_in_disabled(self): 137 self.connector.enabled = False 138 self.connector.save() 139 response = self.client.post( 140 reverse("authentik_api:agentconnector-check-in"), 141 data=CHECK_IN_DATA_VALID, 142 HTTP_AUTHORIZATION=f"Bearer+agent {self.device_token.key}", 143 ) 144 self.assertEqual(response.status_code, 403)
def
test_check_in_token_expired(self):
146 def test_check_in_token_expired(self): 147 self.device_token.expiring = True 148 self.device_token.expires = now() - timedelta(hours=1) 149 self.device_token.save() 150 response = self.client.post( 151 reverse("authentik_api:agentconnector-check-in"), 152 data=CHECK_IN_DATA_VALID, 153 HTTP_AUTHORIZATION=f"Bearer+agent {self.device_token.key}", 154 ) 155 self.assertEqual(response.status_code, 403)
def
test_check_in_device_expired(self):
157 def test_check_in_device_expired(self): 158 self.device.expiring = True 159 self.device.expires = now() - timedelta(hours=1) 160 self.device.save() 161 response = self.client.post( 162 reverse("authentik_api:agentconnector-check-in"), 163 data=CHECK_IN_DATA_VALID, 164 HTTP_AUTHORIZATION=f"Bearer+agent {self.device_token.key}", 165 ) 166 self.assertEqual(response.status_code, 403)
def
test_mdm_api_wrong_platform(self):
168 def test_mdm_api_wrong_platform(self): 169 self.client.force_login(create_test_admin_user()) 170 res = self.client.post( 171 reverse( 172 "authentik_api:agentconnector-mdm-config", 173 kwargs={ 174 "pk": self.connector.pk, 175 }, 176 ), 177 data={"platform": OSFamily.android, "enrollment_token": self.token.pk}, 178 ) 179 self.assertEqual(res.status_code, 400) 180 self.assertJSONEqual(res.content, {"platform": ["Selected platform not supported"]})
def
test_mdm_api_wrong_token(self):
182 def test_mdm_api_wrong_token(self): 183 self.client.force_login(create_test_admin_user()) 184 other_connector = AgentConnector.objects.create(name=generate_id()) 185 self.token.connector = other_connector 186 self.token.save() 187 res = self.client.post( 188 reverse( 189 "authentik_api:agentconnector-mdm-config", 190 kwargs={ 191 "pk": self.connector.pk, 192 }, 193 ), 194 data={"platform": OSFamily.macOS, "enrollment_token": self.token.pk}, 195 ) 196 self.assertEqual(res.status_code, 400) 197 self.assertJSONEqual(res.content, {"enrollment_token": ["Invalid token for connector"]})
def
test_mdm_api_expired_token(self):
199 def test_mdm_api_expired_token(self): 200 self.client.force_login(create_test_admin_user()) 201 self.token.expires = now() - timedelta(hours=1) 202 self.token.save() 203 res = self.client.post( 204 reverse( 205 "authentik_api:agentconnector-mdm-config", 206 kwargs={ 207 "pk": self.connector.pk, 208 }, 209 ), 210 data={"platform": OSFamily.macOS, "enrollment_token": self.token.pk}, 211 ) 212 self.assertEqual(res.status_code, 400) 213 self.assertJSONEqual(res.content, {"enrollment_token": ["Token is expired"]})
def
test_mdm_api(self):
215 def test_mdm_api(self): 216 self.client.force_login(create_test_admin_user()) 217 res = self.client.post( 218 reverse( 219 "authentik_api:agentconnector-mdm-config", 220 kwargs={ 221 "pk": self.connector.pk, 222 }, 223 ), 224 data={"platform": OSFamily.macOS, "enrollment_token": self.token.pk}, 225 ) 226 self.assertEqual(res.status_code, 200)