authentik.enterprise.endpoints.connectors.fleet.tests.test_connector

  1from json import loads
  2
  3from requests_mock import Mocker
  4from rest_framework.test import APITestCase
  5
  6from authentik.endpoints.facts import OSFamily
  7from authentik.endpoints.models import Device
  8from authentik.enterprise.endpoints.connectors.fleet.models import FleetConnector
  9from authentik.events.models import NotificationWebhookMapping
 10from authentik.lib.generators import generate_id
 11from authentik.lib.tests.utils import load_fixture
 12
 13TEST_HOST_UBUNTU = loads(load_fixture("fixtures/host_ubuntu.json"))
 14TEST_HOST_FEDORA = loads(load_fixture("fixtures/host_fedora.json"))
 15TEST_HOST_MACOS = loads(load_fixture("fixtures/host_macos.json"))
 16TEST_HOST_WINDOWS = loads(load_fixture("fixtures/host_windows.json"))
 17
 18TEST_HOST = {"hosts": [TEST_HOST_UBUNTU, TEST_HOST_MACOS, TEST_HOST_WINDOWS, TEST_HOST_FEDORA]}
 19
 20
 21class TestFleetConnector(APITestCase):
 22    def setUp(self):
 23        self.connector = FleetConnector.objects.create(
 24            name=generate_id(),
 25            url="http://localhost",
 26            token=generate_id(),
 27            map_teams_access_group=True,
 28        )
 29
 30    def test_sync(self):
 31        controller = self.connector.controller(self.connector)
 32        with Mocker() as mock:
 33            mock.get(
 34                "http://localhost/api/v1/fleet/conditional_access/idp/apple/profile",
 35                text=load_fixture("fixtures/cond_acc_profile.mobileconfig"),
 36            )
 37            mock.get(
 38                "http://localhost/api/v1/fleet/hosts?order_key=hardware_serial&page=0&per_page=50&device_mapping=true&populate_software=true&populate_users=true",
 39                json=TEST_HOST,
 40            )
 41            mock.get(
 42                "http://localhost/api/v1/fleet/hosts?order_key=hardware_serial&page=1&per_page=50&device_mapping=true&populate_software=true&populate_users=true",
 43                json={"hosts": []},
 44            )
 45            controller.sync_endpoints()
 46        device = Device.objects.filter(
 47            identifier="VMware-56 4d 4a 5a b0 22 7b d7-9b a5 0b dc 8f f2 3b 60"
 48        ).first()
 49        self.assertIsNotNone(device)
 50        group = device.access_group
 51        self.assertIsNotNone(group)
 52        self.assertEqual(group.name, "prod")
 53        self.assertEqual(
 54            device.cached_facts.data,
 55            {
 56                "os": {
 57                    "arch": "x86_64",
 58                    "name": "Ubuntu",
 59                    "family": "linux",
 60                    "version": "24.04.3 LTS",
 61                },
 62                "disks": [],
 63                "vendor": {
 64                    "fleetdm.com": {
 65                        "policies": [],
 66                        "agent_version": "",
 67                        "uuid": "5a4a4d56-22b0-d77b-9ba5-0bdc8ff23b60",
 68                    }
 69                },
 70                "network": {"hostname": "ubuntu-desktop", "interfaces": []},
 71                "hardware": {
 72                    "model": "VMware20,1",
 73                    "serial": "VMware-56 4d 4a 5a b0 22 7b d7-9b a5 0b dc 8f f2 3b 60",
 74                    "cpu_count": 2,
 75                    "cpu_name": "Intel(R) Core(TM) i5-10500T CPU @ 2.30GHz",
 76                    "manufacturer": "VMware, Inc.",
 77                    "memory_bytes": 2062721024,
 78                },
 79                "software": [],
 80            },
 81        )
 82
 83    def test_sync_headers(self):
 84        mapping = NotificationWebhookMapping.objects.create(
 85            name=generate_id(), expression="""return {"foo": "bar"}"""
 86        )
 87        self.connector.headers_mapping = mapping
 88        self.connector.save()
 89        controller = self.connector.controller(self.connector)
 90        with Mocker() as mock:
 91            mock.get(
 92                "http://localhost/api/v1/fleet/conditional_access/idp/apple/profile",
 93                text=load_fixture("fixtures/cond_acc_profile.mobileconfig"),
 94            )
 95            mock.get(
 96                "http://localhost/api/v1/fleet/hosts?order_key=hardware_serial&page=0&per_page=50&device_mapping=true&populate_software=true&populate_users=true",
 97                json=TEST_HOST,
 98            )
 99            mock.get(
100                "http://localhost/api/v1/fleet/hosts?order_key=hardware_serial&page=1&per_page=50&device_mapping=true&populate_software=true&populate_users=true",
101                json={"hosts": []},
102            )
103            controller.sync_endpoints()
104        self.assertEqual(mock.call_count, 3)
105        self.assertEqual(mock.request_history[0].method, "GET")
106        self.assertEqual(mock.request_history[0].headers["foo"], "bar")
107        self.assertEqual(mock.request_history[1].method, "GET")
108        self.assertEqual(mock.request_history[1].headers["foo"], "bar")
109        self.assertEqual(mock.request_history[2].method, "GET")
110        self.assertEqual(mock.request_history[2].headers["foo"], "bar")
111
112    def test_map_host_linux(self):
113        controller = self.connector.controller(self.connector)
114        self.assertEqual(
115            controller.map_os(TEST_HOST_UBUNTU),
116            {
117                "arch": "x86_64",
118                "family": OSFamily.linux,
119                "name": "Ubuntu",
120                "version": "24.04.3 LTS",
121            },
122        )
123        self.assertEqual(
124            controller.map_os(TEST_HOST_FEDORA),
125            {
126                "arch": "x86_64",
127                "family": OSFamily.linux,
128                "name": "Fedora Linux",
129                "version": "43.0.0",
130            },
131        )
132
133    def test_map_host_windows(self):
134        controller = self.connector.controller(self.connector)
135        self.assertEqual(
136            controller.map_os(TEST_HOST_WINDOWS),
137            {
138                "arch": "x86_64",
139                "family": OSFamily.windows,
140                "name": "Windows Server 2022 Datacenter 21H2",
141                "version": "10.0.20348.4405",
142            },
143        )
144
145    def test_map_host_macos(self):
146        controller = self.connector.controller(self.connector)
147        self.assertEqual(
148            controller.map_os(TEST_HOST_MACOS),
149            {
150                "arch": "arm64e",
151                "family": OSFamily.macOS,
152                "name": "macOS",
153                "version": "26.3",
154            },
155        )
TEST_HOST_UBUNTU = {'created_at': '2025-11-01T17:25:34Z', 'updated_at': '2026-01-23T12:58:55Z', 'software': None, 'software_updated_at': '2026-01-23T12:58:55Z', 'id': 14, 'detail_updated_at': '2026-01-23T12:58:55Z', 'label_updated_at': '2026-01-23T12:58:55Z', 'policy_updated_at': '2026-01-23T12:29:58Z', 'last_enrolled_at': '2025-11-01T17:25:38Z', 'seen_time': '2026-01-23T13:17:27Z', 'refetch_requested': False, 'hostname': 'ubuntu-desktop', 'uuid': '5a4a4d56-22b0-d77b-9ba5-0bdc8ff23b60', 'platform': 'ubuntu', 'osquery_version': '5.21.0', 'orbit_version': None, 'fleet_desktop_version': None, 'scripts_enabled': None, 'os_version': 'Ubuntu 24.04.3 LTS', 'build': '', 'platform_like': 'debian', 'code_name': 'noble', 'uptime': 1631433000000000, 'memory': 2062721024, 'cpu_type': 'x86_64', 'cpu_subtype': '165', 'cpu_brand': 'Intel(R) Core(TM) i5-10500T CPU @ 2.30GHz', 'cpu_physical_cores': 2, 'cpu_logical_cores': 2, 'hardware_vendor': 'VMware, Inc.', 'hardware_model': 'VMware20,1', 'hardware_version': 'None', 'hardware_serial': 'VMware-56 4d 4a 5a b0 22 7b d7-9b a5 0b dc 8f f2 3b 60', 'computer_name': 'ubuntu-desktop', 'public_ip': '92.116.178.120', 'primary_ip': '10.120.20.61', 'primary_mac': '00:0c:29:f2:3b:60', 'distributed_interval': 10, 'config_tls_refresh': 60, 'logger_tls_period': 10, 'team_id': 2, 'pack_stats': None, 'team_name': 'prod', 'gigs_disk_space_available': 7.37, 'percent_disk_space_available': 31, 'gigs_total_disk_space': 23.08, 'gigs_all_disk_space': 23.08, 'issues': {'failing_policies_count': 0, 'critical_vulnerabilities_count': 0, 'total_issues_count': 0}, 'device_mapping': None, 'mdm': {'enrollment_status': None, 'dep_profile_error': False, 'server_url': None, 'name': '', 'encryption_key_available': False, 'connected_to_fleet': False}, 'refetch_critical_queries_until': None, 'last_restarted_at': '2026-01-04T15:48:22.390118Z', 'status': 'online', 'display_text': 'ubuntu-desktop', 'display_name': 'ubuntu-desktop'}
TEST_HOST_FEDORA = {'created_at': '2026-01-23T15:32:17Z', 'updated_at': '2026-01-23T15:32:28Z', 'software': None, 'software_updated_at': '2026-01-23T15:32:17Z', 'id': 16, 'detail_updated_at': '1970-01-02T00:00:00Z', 'label_updated_at': '1970-01-02T00:00:00Z', 'policy_updated_at': '1970-01-02T00:00:00Z', 'last_enrolled_at': '2026-01-23T15:32:19Z', 'seen_time': '2026-01-23T15:32:21Z', 'refetch_requested': True, 'hostname': 'fedora-workstation', 'uuid': '578c4d56-aff8-0793-14ae-7947392f5fec', 'platform': 'rhel', 'osquery_version': '5.21.0', 'orbit_version': None, 'fleet_desktop_version': None, 'scripts_enabled': None, 'os_version': 'Fedora Linux 43.0.0', 'build': '', 'platform_like': 'rhel', 'code_name': '', 'uptime': 0, 'memory': 4092518400, 'cpu_type': 'x86_64', 'cpu_subtype': '165', 'cpu_brand': 'Intel(R) Core(TM) i5-10500T CPU @ 2.30GHz', 'cpu_physical_cores': 2, 'cpu_logical_cores': 2, 'hardware_vendor': 'VMware, Inc.', 'hardware_model': 'VMware20,1', 'hardware_version': 'None', 'hardware_serial': 'VMware-56 4d 8c 57 f8 af 93 07-14 ae 79 47 39 2f 5f ec', 'computer_name': 'fedora-workstation', 'public_ip': '', 'primary_ip': '', 'primary_mac': '', 'distributed_interval': 10, 'config_tls_refresh': 0, 'logger_tls_period': 10, 'team_id': 2, 'pack_stats': None, 'team_name': 'prod', 'gigs_disk_space_available': 0, 'percent_disk_space_available': 0, 'gigs_total_disk_space': 0, 'gigs_all_disk_space': None, 'issues': {'failing_policies_count': 0, 'critical_vulnerabilities_count': 0, 'total_issues_count': 0}}
TEST_HOST_MACOS = {'created_at': '2026-02-18T16:31:34Z', 'updated_at': '2026-03-18T11:29:18Z', 'software': None, 'software_updated_at': '2026-03-18T11:29:17Z', 'id': 19, 'detail_updated_at': '2026-03-18T11:29:18Z', 'label_updated_at': '2026-03-18T11:29:18Z', 'policy_updated_at': '2026-03-18T11:29:18Z', 'last_enrolled_at': '2026-02-18T16:31:45Z', 'seen_time': '2026-03-18T11:31:34Z', 'refetch_requested': False, 'hostname': 'jens-mac-vm.local', 'uuid': '5BF422D6-6EAB-5156-AC5A-9EADC9524713', 'platform': 'darwin', 'osquery_version': '5.21.0', 'orbit_version': None, 'fleet_desktop_version': None, 'scripts_enabled': None, 'os_version': 'macOS 26.3', 'build': '25D125', 'platform_like': 'darwin', 'code_name': '', 'uptime': 653014000000000, 'memory': 4294967296, 'cpu_type': 'arm64e', 'cpu_subtype': 'ARM64E', 'cpu_brand': 'Apple M1 Pro (Virtual)', 'cpu_physical_cores': 8, 'cpu_logical_cores': 8, 'hardware_vendor': 'Apple Inc.', 'hardware_model': 'VirtualMac2,1', 'hardware_version': '', 'hardware_serial': 'ZV35VFDD50', 'computer_name': 'jens-mac-vm', 'timezone': None, 'public_ip': '92.116.179.252', 'primary_ip': '192.168.64.7', 'primary_mac': '5e:72:1c:89:98:29', 'distributed_interval': 10, 'config_tls_refresh': 60, 'logger_tls_period': 10, 'team_id': 5, 'pack_stats': None, 'team_name': 'dev', 'gigs_disk_space_available': 16.52, 'percent_disk_space_available': 26, 'gigs_total_disk_space': 62.83, 'gigs_all_disk_space': None, 'issues': {'failing_policies_count': 1, 'critical_vulnerabilities_count': 0, 'total_issues_count': 1}, 'device_mapping': None, 'mdm': {'enrollment_status': 'On (manual)', 'dep_profile_error': False, 'server_url': 'https://fleet.beryjuio-prod.k8s.beryju.io/mdm/apple/mdm', 'name': 'Fleet', 'encryption_key_available': False, 'connected_to_fleet': True}, 'refetch_critical_queries_until': None, 'last_restarted_at': '2026-03-10T22:05:44.00887Z', 'status': 'online', 'display_text': 'jens-mac-vm.local', 'display_name': 'jens-mac-vm', 'fleet_id': 5, 'fleet_name': 'dev'}
TEST_HOST_WINDOWS = {'created_at': '2025-10-19T12:44:09Z', 'updated_at': '2026-01-23T13:11:45Z', 'software': None, 'software_updated_at': '2026-01-22T06:57:30Z', 'id': 13, 'detail_updated_at': '2026-01-23T12:51:35Z', 'label_updated_at': '2026-01-23T12:51:35Z', 'policy_updated_at': '2026-01-23T13:11:45Z', 'last_enrolled_at': '2025-11-05T20:27:14Z', 'seen_time': '2026-01-23T13:17:33Z', 'refetch_requested': False, 'hostname': 'windows-server', 'uuid': 'CFF12F42-9F7D-A575-2C48-01BDC6A733FB', 'platform': 'windows', 'osquery_version': '5.21.0', 'orbit_version': None, 'fleet_desktop_version': None, 'scripts_enabled': None, 'os_version': 'Windows Server 2022 Datacenter 21H2 10.0.20348.4405', 'build': '20348', 'platform_like': 'windows', 'code_name': 'Microsoft Windows Server 2022 Datacenter', 'uptime': 217075000000000, 'memory': 4294967296, 'cpu_type': 'x86_64', 'cpu_subtype': '-1', 'cpu_brand': 'Intel(R) Core(TM) i5-10500T CPU @ 2.30GHz', 'cpu_physical_cores': 1, 'cpu_logical_cores': 2, 'hardware_vendor': 'VMware, Inc.', 'hardware_model': 'VMware20,1', 'hardware_version': '-1', 'hardware_serial': 'VMware-42 2f f1 cf 7d 9f 75 a5-2c 48 01 bd c6 a7 33 fb', 'computer_name': 'WINDOWS-SERVER', 'public_ip': '92.116.178.120', 'primary_ip': '10.120.20.78', 'primary_mac': '00:50:56:af:fb:3a', 'distributed_interval': 10, 'config_tls_refresh': 60, 'logger_tls_period': 10, 'team_id': 2, 'pack_stats': None, 'team_name': 'prod', 'gigs_disk_space_available': 68, 'percent_disk_space_available': 71, 'gigs_total_disk_space': 96, 'gigs_all_disk_space': None, 'issues': {'failing_policies_count': 0, 'critical_vulnerabilities_count': 5, 'total_issues_count': 5}, 'device_mapping': None, 'mdm': {'enrollment_status': None, 'dep_profile_error': False, 'server_url': None, 'name': '', 'encryption_key_available': False, 'connected_to_fleet': False}, 'refetch_critical_queries_until': None, 'last_restarted_at': '2026-01-21T00:33:38.178036Z', 'status': 'online', 'display_text': 'windows-server', 'display_name': 'WINDOWS-SERVER'}
TEST_HOST = {'hosts': [{'created_at': '2025-11-01T17:25:34Z', 'updated_at': '2026-01-23T12:58:55Z', 'software': None, 'software_updated_at': '2026-01-23T12:58:55Z', 'id': 14, 'detail_updated_at': '2026-01-23T12:58:55Z', 'label_updated_at': '2026-01-23T12:58:55Z', 'policy_updated_at': '2026-01-23T12:29:58Z', 'last_enrolled_at': '2025-11-01T17:25:38Z', 'seen_time': '2026-01-23T13:17:27Z', 'refetch_requested': False, 'hostname': 'ubuntu-desktop', 'uuid': '5a4a4d56-22b0-d77b-9ba5-0bdc8ff23b60', 'platform': 'ubuntu', 'osquery_version': '5.21.0', 'orbit_version': None, 'fleet_desktop_version': None, 'scripts_enabled': None, 'os_version': 'Ubuntu 24.04.3 LTS', 'build': '', 'platform_like': 'debian', 'code_name': 'noble', 'uptime': 1631433000000000, 'memory': 2062721024, 'cpu_type': 'x86_64', 'cpu_subtype': '165', 'cpu_brand': 'Intel(R) Core(TM) i5-10500T CPU @ 2.30GHz', 'cpu_physical_cores': 2, 'cpu_logical_cores': 2, 'hardware_vendor': 'VMware, Inc.', 'hardware_model': 'VMware20,1', 'hardware_version': 'None', 'hardware_serial': 'VMware-56 4d 4a 5a b0 22 7b d7-9b a5 0b dc 8f f2 3b 60', 'computer_name': 'ubuntu-desktop', 'public_ip': '92.116.178.120', 'primary_ip': '10.120.20.61', 'primary_mac': '00:0c:29:f2:3b:60', 'distributed_interval': 10, 'config_tls_refresh': 60, 'logger_tls_period': 10, 'team_id': 2, 'pack_stats': None, 'team_name': 'prod', 'gigs_disk_space_available': 7.37, 'percent_disk_space_available': 31, 'gigs_total_disk_space': 23.08, 'gigs_all_disk_space': 23.08, 'issues': {'failing_policies_count': 0, 'critical_vulnerabilities_count': 0, 'total_issues_count': 0}, 'device_mapping': None, 'mdm': {'enrollment_status': None, 'dep_profile_error': False, 'server_url': None, 'name': '', 'encryption_key_available': False, 'connected_to_fleet': False}, 'refetch_critical_queries_until': None, 'last_restarted_at': '2026-01-04T15:48:22.390118Z', 'status': 'online', 'display_text': 'ubuntu-desktop', 'display_name': 'ubuntu-desktop'}, {'created_at': '2026-02-18T16:31:34Z', 'updated_at': '2026-03-18T11:29:18Z', 'software': None, 'software_updated_at': '2026-03-18T11:29:17Z', 'id': 19, 'detail_updated_at': '2026-03-18T11:29:18Z', 'label_updated_at': '2026-03-18T11:29:18Z', 'policy_updated_at': '2026-03-18T11:29:18Z', 'last_enrolled_at': '2026-02-18T16:31:45Z', 'seen_time': '2026-03-18T11:31:34Z', 'refetch_requested': False, 'hostname': 'jens-mac-vm.local', 'uuid': '5BF422D6-6EAB-5156-AC5A-9EADC9524713', 'platform': 'darwin', 'osquery_version': '5.21.0', 'orbit_version': None, 'fleet_desktop_version': None, 'scripts_enabled': None, 'os_version': 'macOS 26.3', 'build': '25D125', 'platform_like': 'darwin', 'code_name': '', 'uptime': 653014000000000, 'memory': 4294967296, 'cpu_type': 'arm64e', 'cpu_subtype': 'ARM64E', 'cpu_brand': 'Apple M1 Pro (Virtual)', 'cpu_physical_cores': 8, 'cpu_logical_cores': 8, 'hardware_vendor': 'Apple Inc.', 'hardware_model': 'VirtualMac2,1', 'hardware_version': '', 'hardware_serial': 'ZV35VFDD50', 'computer_name': 'jens-mac-vm', 'timezone': None, 'public_ip': '92.116.179.252', 'primary_ip': '192.168.64.7', 'primary_mac': '5e:72:1c:89:98:29', 'distributed_interval': 10, 'config_tls_refresh': 60, 'logger_tls_period': 10, 'team_id': 5, 'pack_stats': None, 'team_name': 'dev', 'gigs_disk_space_available': 16.52, 'percent_disk_space_available': 26, 'gigs_total_disk_space': 62.83, 'gigs_all_disk_space': None, 'issues': {'failing_policies_count': 1, 'critical_vulnerabilities_count': 0, 'total_issues_count': 1}, 'device_mapping': None, 'mdm': {'enrollment_status': 'On (manual)', 'dep_profile_error': False, 'server_url': 'https://fleet.beryjuio-prod.k8s.beryju.io/mdm/apple/mdm', 'name': 'Fleet', 'encryption_key_available': False, 'connected_to_fleet': True}, 'refetch_critical_queries_until': None, 'last_restarted_at': '2026-03-10T22:05:44.00887Z', 'status': 'online', 'display_text': 'jens-mac-vm.local', 'display_name': 'jens-mac-vm', 'fleet_id': 5, 'fleet_name': 'dev'}, {'created_at': '2025-10-19T12:44:09Z', 'updated_at': '2026-01-23T13:11:45Z', 'software': None, 'software_updated_at': '2026-01-22T06:57:30Z', 'id': 13, 'detail_updated_at': '2026-01-23T12:51:35Z', 'label_updated_at': '2026-01-23T12:51:35Z', 'policy_updated_at': '2026-01-23T13:11:45Z', 'last_enrolled_at': '2025-11-05T20:27:14Z', 'seen_time': '2026-01-23T13:17:33Z', 'refetch_requested': False, 'hostname': 'windows-server', 'uuid': 'CFF12F42-9F7D-A575-2C48-01BDC6A733FB', 'platform': 'windows', 'osquery_version': '5.21.0', 'orbit_version': None, 'fleet_desktop_version': None, 'scripts_enabled': None, 'os_version': 'Windows Server 2022 Datacenter 21H2 10.0.20348.4405', 'build': '20348', 'platform_like': 'windows', 'code_name': 'Microsoft Windows Server 2022 Datacenter', 'uptime': 217075000000000, 'memory': 4294967296, 'cpu_type': 'x86_64', 'cpu_subtype': '-1', 'cpu_brand': 'Intel(R) Core(TM) i5-10500T CPU @ 2.30GHz', 'cpu_physical_cores': 1, 'cpu_logical_cores': 2, 'hardware_vendor': 'VMware, Inc.', 'hardware_model': 'VMware20,1', 'hardware_version': '-1', 'hardware_serial': 'VMware-42 2f f1 cf 7d 9f 75 a5-2c 48 01 bd c6 a7 33 fb', 'computer_name': 'WINDOWS-SERVER', 'public_ip': '92.116.178.120', 'primary_ip': '10.120.20.78', 'primary_mac': '00:50:56:af:fb:3a', 'distributed_interval': 10, 'config_tls_refresh': 60, 'logger_tls_period': 10, 'team_id': 2, 'pack_stats': None, 'team_name': 'prod', 'gigs_disk_space_available': 68, 'percent_disk_space_available': 71, 'gigs_total_disk_space': 96, 'gigs_all_disk_space': None, 'issues': {'failing_policies_count': 0, 'critical_vulnerabilities_count': 5, 'total_issues_count': 5}, 'device_mapping': None, 'mdm': {'enrollment_status': None, 'dep_profile_error': False, 'server_url': None, 'name': '', 'encryption_key_available': False, 'connected_to_fleet': False}, 'refetch_critical_queries_until': None, 'last_restarted_at': '2026-01-21T00:33:38.178036Z', 'status': 'online', 'display_text': 'windows-server', 'display_name': 'WINDOWS-SERVER'}, {'created_at': '2026-01-23T15:32:17Z', 'updated_at': '2026-01-23T15:32:28Z', 'software': None, 'software_updated_at': '2026-01-23T15:32:17Z', 'id': 16, 'detail_updated_at': '1970-01-02T00:00:00Z', 'label_updated_at': '1970-01-02T00:00:00Z', 'policy_updated_at': '1970-01-02T00:00:00Z', 'last_enrolled_at': '2026-01-23T15:32:19Z', 'seen_time': '2026-01-23T15:32:21Z', 'refetch_requested': True, 'hostname': 'fedora-workstation', 'uuid': '578c4d56-aff8-0793-14ae-7947392f5fec', 'platform': 'rhel', 'osquery_version': '5.21.0', 'orbit_version': None, 'fleet_desktop_version': None, 'scripts_enabled': None, 'os_version': 'Fedora Linux 43.0.0', 'build': '', 'platform_like': 'rhel', 'code_name': '', 'uptime': 0, 'memory': 4092518400, 'cpu_type': 'x86_64', 'cpu_subtype': '165', 'cpu_brand': 'Intel(R) Core(TM) i5-10500T CPU @ 2.30GHz', 'cpu_physical_cores': 2, 'cpu_logical_cores': 2, 'hardware_vendor': 'VMware, Inc.', 'hardware_model': 'VMware20,1', 'hardware_version': 'None', 'hardware_serial': 'VMware-56 4d 8c 57 f8 af 93 07-14 ae 79 47 39 2f 5f ec', 'computer_name': 'fedora-workstation', 'public_ip': '', 'primary_ip': '', 'primary_mac': '', 'distributed_interval': 10, 'config_tls_refresh': 0, 'logger_tls_period': 10, 'team_id': 2, 'pack_stats': None, 'team_name': 'prod', 'gigs_disk_space_available': 0, 'percent_disk_space_available': 0, 'gigs_total_disk_space': 0, 'gigs_all_disk_space': None, 'issues': {'failing_policies_count': 0, 'critical_vulnerabilities_count': 0, 'total_issues_count': 0}}]}
class TestFleetConnector(rest_framework.test.APITestCase):
 22class TestFleetConnector(APITestCase):
 23    def setUp(self):
 24        self.connector = FleetConnector.objects.create(
 25            name=generate_id(),
 26            url="http://localhost",
 27            token=generate_id(),
 28            map_teams_access_group=True,
 29        )
 30
 31    def test_sync(self):
 32        controller = self.connector.controller(self.connector)
 33        with Mocker() as mock:
 34            mock.get(
 35                "http://localhost/api/v1/fleet/conditional_access/idp/apple/profile",
 36                text=load_fixture("fixtures/cond_acc_profile.mobileconfig"),
 37            )
 38            mock.get(
 39                "http://localhost/api/v1/fleet/hosts?order_key=hardware_serial&page=0&per_page=50&device_mapping=true&populate_software=true&populate_users=true",
 40                json=TEST_HOST,
 41            )
 42            mock.get(
 43                "http://localhost/api/v1/fleet/hosts?order_key=hardware_serial&page=1&per_page=50&device_mapping=true&populate_software=true&populate_users=true",
 44                json={"hosts": []},
 45            )
 46            controller.sync_endpoints()
 47        device = Device.objects.filter(
 48            identifier="VMware-56 4d 4a 5a b0 22 7b d7-9b a5 0b dc 8f f2 3b 60"
 49        ).first()
 50        self.assertIsNotNone(device)
 51        group = device.access_group
 52        self.assertIsNotNone(group)
 53        self.assertEqual(group.name, "prod")
 54        self.assertEqual(
 55            device.cached_facts.data,
 56            {
 57                "os": {
 58                    "arch": "x86_64",
 59                    "name": "Ubuntu",
 60                    "family": "linux",
 61                    "version": "24.04.3 LTS",
 62                },
 63                "disks": [],
 64                "vendor": {
 65                    "fleetdm.com": {
 66                        "policies": [],
 67                        "agent_version": "",
 68                        "uuid": "5a4a4d56-22b0-d77b-9ba5-0bdc8ff23b60",
 69                    }
 70                },
 71                "network": {"hostname": "ubuntu-desktop", "interfaces": []},
 72                "hardware": {
 73                    "model": "VMware20,1",
 74                    "serial": "VMware-56 4d 4a 5a b0 22 7b d7-9b a5 0b dc 8f f2 3b 60",
 75                    "cpu_count": 2,
 76                    "cpu_name": "Intel(R) Core(TM) i5-10500T CPU @ 2.30GHz",
 77                    "manufacturer": "VMware, Inc.",
 78                    "memory_bytes": 2062721024,
 79                },
 80                "software": [],
 81            },
 82        )
 83
 84    def test_sync_headers(self):
 85        mapping = NotificationWebhookMapping.objects.create(
 86            name=generate_id(), expression="""return {"foo": "bar"}"""
 87        )
 88        self.connector.headers_mapping = mapping
 89        self.connector.save()
 90        controller = self.connector.controller(self.connector)
 91        with Mocker() as mock:
 92            mock.get(
 93                "http://localhost/api/v1/fleet/conditional_access/idp/apple/profile",
 94                text=load_fixture("fixtures/cond_acc_profile.mobileconfig"),
 95            )
 96            mock.get(
 97                "http://localhost/api/v1/fleet/hosts?order_key=hardware_serial&page=0&per_page=50&device_mapping=true&populate_software=true&populate_users=true",
 98                json=TEST_HOST,
 99            )
100            mock.get(
101                "http://localhost/api/v1/fleet/hosts?order_key=hardware_serial&page=1&per_page=50&device_mapping=true&populate_software=true&populate_users=true",
102                json={"hosts": []},
103            )
104            controller.sync_endpoints()
105        self.assertEqual(mock.call_count, 3)
106        self.assertEqual(mock.request_history[0].method, "GET")
107        self.assertEqual(mock.request_history[0].headers["foo"], "bar")
108        self.assertEqual(mock.request_history[1].method, "GET")
109        self.assertEqual(mock.request_history[1].headers["foo"], "bar")
110        self.assertEqual(mock.request_history[2].method, "GET")
111        self.assertEqual(mock.request_history[2].headers["foo"], "bar")
112
113    def test_map_host_linux(self):
114        controller = self.connector.controller(self.connector)
115        self.assertEqual(
116            controller.map_os(TEST_HOST_UBUNTU),
117            {
118                "arch": "x86_64",
119                "family": OSFamily.linux,
120                "name": "Ubuntu",
121                "version": "24.04.3 LTS",
122            },
123        )
124        self.assertEqual(
125            controller.map_os(TEST_HOST_FEDORA),
126            {
127                "arch": "x86_64",
128                "family": OSFamily.linux,
129                "name": "Fedora Linux",
130                "version": "43.0.0",
131            },
132        )
133
134    def test_map_host_windows(self):
135        controller = self.connector.controller(self.connector)
136        self.assertEqual(
137            controller.map_os(TEST_HOST_WINDOWS),
138            {
139                "arch": "x86_64",
140                "family": OSFamily.windows,
141                "name": "Windows Server 2022 Datacenter 21H2",
142                "version": "10.0.20348.4405",
143            },
144        )
145
146    def test_map_host_macos(self):
147        controller = self.connector.controller(self.connector)
148        self.assertEqual(
149            controller.map_os(TEST_HOST_MACOS),
150            {
151                "arch": "arm64e",
152                "family": OSFamily.macOS,
153                "name": "macOS",
154                "version": "26.3",
155            },
156        )

Similar to TransactionTestCase, but use transaction.atomic() to achieve test isolation.

In most situations, TestCase should be preferred to TransactionTestCase as it allows faster execution. However, there are some situations where using TransactionTestCase might be necessary (e.g. testing some transactional behavior).

On database backends with no transaction support, TestCase behaves as TransactionTestCase.

def setUp(self):
23    def setUp(self):
24        self.connector = FleetConnector.objects.create(
25            name=generate_id(),
26            url="http://localhost",
27            token=generate_id(),
28            map_teams_access_group=True,
29        )

Hook method for setting up the test fixture before exercising it.

def test_sync(self):
31    def test_sync(self):
32        controller = self.connector.controller(self.connector)
33        with Mocker() as mock:
34            mock.get(
35                "http://localhost/api/v1/fleet/conditional_access/idp/apple/profile",
36                text=load_fixture("fixtures/cond_acc_profile.mobileconfig"),
37            )
38            mock.get(
39                "http://localhost/api/v1/fleet/hosts?order_key=hardware_serial&page=0&per_page=50&device_mapping=true&populate_software=true&populate_users=true",
40                json=TEST_HOST,
41            )
42            mock.get(
43                "http://localhost/api/v1/fleet/hosts?order_key=hardware_serial&page=1&per_page=50&device_mapping=true&populate_software=true&populate_users=true",
44                json={"hosts": []},
45            )
46            controller.sync_endpoints()
47        device = Device.objects.filter(
48            identifier="VMware-56 4d 4a 5a b0 22 7b d7-9b a5 0b dc 8f f2 3b 60"
49        ).first()
50        self.assertIsNotNone(device)
51        group = device.access_group
52        self.assertIsNotNone(group)
53        self.assertEqual(group.name, "prod")
54        self.assertEqual(
55            device.cached_facts.data,
56            {
57                "os": {
58                    "arch": "x86_64",
59                    "name": "Ubuntu",
60                    "family": "linux",
61                    "version": "24.04.3 LTS",
62                },
63                "disks": [],
64                "vendor": {
65                    "fleetdm.com": {
66                        "policies": [],
67                        "agent_version": "",
68                        "uuid": "5a4a4d56-22b0-d77b-9ba5-0bdc8ff23b60",
69                    }
70                },
71                "network": {"hostname": "ubuntu-desktop", "interfaces": []},
72                "hardware": {
73                    "model": "VMware20,1",
74                    "serial": "VMware-56 4d 4a 5a b0 22 7b d7-9b a5 0b dc 8f f2 3b 60",
75                    "cpu_count": 2,
76                    "cpu_name": "Intel(R) Core(TM) i5-10500T CPU @ 2.30GHz",
77                    "manufacturer": "VMware, Inc.",
78                    "memory_bytes": 2062721024,
79                },
80                "software": [],
81            },
82        )
def test_sync_headers(self):
 84    def test_sync_headers(self):
 85        mapping = NotificationWebhookMapping.objects.create(
 86            name=generate_id(), expression="""return {"foo": "bar"}"""
 87        )
 88        self.connector.headers_mapping = mapping
 89        self.connector.save()
 90        controller = self.connector.controller(self.connector)
 91        with Mocker() as mock:
 92            mock.get(
 93                "http://localhost/api/v1/fleet/conditional_access/idp/apple/profile",
 94                text=load_fixture("fixtures/cond_acc_profile.mobileconfig"),
 95            )
 96            mock.get(
 97                "http://localhost/api/v1/fleet/hosts?order_key=hardware_serial&page=0&per_page=50&device_mapping=true&populate_software=true&populate_users=true",
 98                json=TEST_HOST,
 99            )
100            mock.get(
101                "http://localhost/api/v1/fleet/hosts?order_key=hardware_serial&page=1&per_page=50&device_mapping=true&populate_software=true&populate_users=true",
102                json={"hosts": []},
103            )
104            controller.sync_endpoints()
105        self.assertEqual(mock.call_count, 3)
106        self.assertEqual(mock.request_history[0].method, "GET")
107        self.assertEqual(mock.request_history[0].headers["foo"], "bar")
108        self.assertEqual(mock.request_history[1].method, "GET")
109        self.assertEqual(mock.request_history[1].headers["foo"], "bar")
110        self.assertEqual(mock.request_history[2].method, "GET")
111        self.assertEqual(mock.request_history[2].headers["foo"], "bar")
def test_map_host_linux(self):
113    def test_map_host_linux(self):
114        controller = self.connector.controller(self.connector)
115        self.assertEqual(
116            controller.map_os(TEST_HOST_UBUNTU),
117            {
118                "arch": "x86_64",
119                "family": OSFamily.linux,
120                "name": "Ubuntu",
121                "version": "24.04.3 LTS",
122            },
123        )
124        self.assertEqual(
125            controller.map_os(TEST_HOST_FEDORA),
126            {
127                "arch": "x86_64",
128                "family": OSFamily.linux,
129                "name": "Fedora Linux",
130                "version": "43.0.0",
131            },
132        )
def test_map_host_windows(self):
134    def test_map_host_windows(self):
135        controller = self.connector.controller(self.connector)
136        self.assertEqual(
137            controller.map_os(TEST_HOST_WINDOWS),
138            {
139                "arch": "x86_64",
140                "family": OSFamily.windows,
141                "name": "Windows Server 2022 Datacenter 21H2",
142                "version": "10.0.20348.4405",
143            },
144        )
def test_map_host_macos(self):
146    def test_map_host_macos(self):
147        controller = self.connector.controller(self.connector)
148        self.assertEqual(
149            controller.map_os(TEST_HOST_MACOS),
150            {
151                "arch": "arm64e",
152                "family": OSFamily.macOS,
153                "name": "macOS",
154                "version": "26.3",
155            },
156        )