authentik.enterprise.endpoints.connectors.fleet.stage

 1from cryptography.x509 import (
 2    Certificate,
 3    Extension,
 4    SubjectAlternativeName,
 5    UniformResourceIdentifier,
 6)
 7from rest_framework.exceptions import PermissionDenied
 8
 9from authentik.crypto.models import CertificateKeyPair, fingerprint_sha256
10from authentik.endpoints.models import Device, EndpointStage, StageMode
11from authentik.enterprise.endpoints.connectors.fleet.models import FleetConnector
12from authentik.enterprise.stages.mtls.stage import PLAN_CONTEXT_CERTIFICATE, MTLSStageView
13from authentik.flows.planner import PLAN_CONTEXT_DEVICE
14
15FLEET_CONDITIONAL_ACCESS_URI_PREFIX = "urn:device:apple:uuid:"
16
17
18class FleetStageView(MTLSStageView):
19    def get_authorities(self):
20        stage: EndpointStage = self.executor.current_stage
21        connector = FleetConnector.objects.filter(pk=stage.connector_id).first()
22        controller = connector.controller(connector)
23        kp = CertificateKeyPair.objects.filter(managed=controller.mtls_ca_managed).first()
24        return [kp] if kp else None
25
26    def lookup_device(self, cert: Certificate, mode: StageMode):
27        san_ext: Extension[SubjectAlternativeName] = cert.extensions.get_extension_for_oid(
28            SubjectAlternativeName.oid
29        )
30        raw_values = san_ext.value.get_values_for_type(UniformResourceIdentifier)
31        values = [x.removeprefix(FLEET_CONDITIONAL_ACCESS_URI_PREFIX).lower() for x in raw_values]
32        self.logger.debug("Looking for devices with uuid", fleet_device_uuid=values)
33        device = Device.objects.filter(
34            **{"deviceconnection__devicefactsnapshot__data__vendor__fleetdm.com__uuid__in": values}
35        ).first()
36        if not device and mode == StageMode.REQUIRED:
37            raise PermissionDenied("Failed to find device")
38        self.executor.plan.context[PLAN_CONTEXT_DEVICE] = device
39        self.executor.plan.context[PLAN_CONTEXT_CERTIFICATE] = self._cert_to_dict(cert)
40        return self.executor.stage_ok()
41
42    def dispatch(self, request, *args, **kwargs):
43        stage: EndpointStage = self.executor.current_stage
44        try:
45            cert = self.get_cert(stage.mode)
46            if not cert:
47                return self.executor.stage_ok()
48            self.logger.debug("Received certificate", cert=fingerprint_sha256(cert))
49            return self.lookup_device(cert, stage.mode)
50        except PermissionDenied as exc:
51            return self.executor.stage_invalid(error_message=exc.detail)
FLEET_CONDITIONAL_ACCESS_URI_PREFIX = 'urn:device:apple:uuid:'
class FleetStageView(authentik.enterprise.stages.mtls.stage.MTLSStageView):
19class FleetStageView(MTLSStageView):
20    def get_authorities(self):
21        stage: EndpointStage = self.executor.current_stage
22        connector = FleetConnector.objects.filter(pk=stage.connector_id).first()
23        controller = connector.controller(connector)
24        kp = CertificateKeyPair.objects.filter(managed=controller.mtls_ca_managed).first()
25        return [kp] if kp else None
26
27    def lookup_device(self, cert: Certificate, mode: StageMode):
28        san_ext: Extension[SubjectAlternativeName] = cert.extensions.get_extension_for_oid(
29            SubjectAlternativeName.oid
30        )
31        raw_values = san_ext.value.get_values_for_type(UniformResourceIdentifier)
32        values = [x.removeprefix(FLEET_CONDITIONAL_ACCESS_URI_PREFIX).lower() for x in raw_values]
33        self.logger.debug("Looking for devices with uuid", fleet_device_uuid=values)
34        device = Device.objects.filter(
35            **{"deviceconnection__devicefactsnapshot__data__vendor__fleetdm.com__uuid__in": values}
36        ).first()
37        if not device and mode == StageMode.REQUIRED:
38            raise PermissionDenied("Failed to find device")
39        self.executor.plan.context[PLAN_CONTEXT_DEVICE] = device
40        self.executor.plan.context[PLAN_CONTEXT_CERTIFICATE] = self._cert_to_dict(cert)
41        return self.executor.stage_ok()
42
43    def dispatch(self, request, *args, **kwargs):
44        stage: EndpointStage = self.executor.current_stage
45        try:
46            cert = self.get_cert(stage.mode)
47            if not cert:
48                return self.executor.stage_ok()
49            self.logger.debug("Received certificate", cert=fingerprint_sha256(cert))
50            return self.lookup_device(cert, stage.mode)
51        except PermissionDenied as exc:
52            return self.executor.stage_invalid(error_message=exc.detail)

Stage view which response with a challenge

def get_authorities(self):
20    def get_authorities(self):
21        stage: EndpointStage = self.executor.current_stage
22        connector = FleetConnector.objects.filter(pk=stage.connector_id).first()
23        controller = connector.controller(connector)
24        kp = CertificateKeyPair.objects.filter(managed=controller.mtls_ca_managed).first()
25        return [kp] if kp else None
def lookup_device( self, cert: cryptography.hazmat.bindings._rust.x509.Certificate, mode: authentik.endpoints.models.StageMode):
27    def lookup_device(self, cert: Certificate, mode: StageMode):
28        san_ext: Extension[SubjectAlternativeName] = cert.extensions.get_extension_for_oid(
29            SubjectAlternativeName.oid
30        )
31        raw_values = san_ext.value.get_values_for_type(UniformResourceIdentifier)
32        values = [x.removeprefix(FLEET_CONDITIONAL_ACCESS_URI_PREFIX).lower() for x in raw_values]
33        self.logger.debug("Looking for devices with uuid", fleet_device_uuid=values)
34        device = Device.objects.filter(
35            **{"deviceconnection__devicefactsnapshot__data__vendor__fleetdm.com__uuid__in": values}
36        ).first()
37        if not device and mode == StageMode.REQUIRED:
38            raise PermissionDenied("Failed to find device")
39        self.executor.plan.context[PLAN_CONTEXT_DEVICE] = device
40        self.executor.plan.context[PLAN_CONTEXT_CERTIFICATE] = self._cert_to_dict(cert)
41        return self.executor.stage_ok()
def dispatch(self, request, *args, **kwargs):
43    def dispatch(self, request, *args, **kwargs):
44        stage: EndpointStage = self.executor.current_stage
45        try:
46            cert = self.get_cert(stage.mode)
47            if not cert:
48                return self.executor.stage_ok()
49            self.logger.debug("Received certificate", cert=fingerprint_sha256(cert))
50            return self.lookup_device(cert, stage.mode)
51        except PermissionDenied as exc:
52            return self.executor.stage_invalid(error_message=exc.detail)