authentik.enterprise.endpoints.connectors.fleet.stage
1from cryptography.x509 import ( 2 Certificate, 3 Extension, 4 SubjectAlternativeName, 5 UniformResourceIdentifier, 6) 7from rest_framework.exceptions import PermissionDenied 8 9from authentik.crypto.models import CertificateKeyPair, fingerprint_sha256 10from authentik.endpoints.models import Device, EndpointStage, StageMode 11from authentik.enterprise.endpoints.connectors.fleet.models import FleetConnector 12from authentik.enterprise.stages.mtls.stage import PLAN_CONTEXT_CERTIFICATE, MTLSStageView 13from authentik.flows.planner import PLAN_CONTEXT_DEVICE 14 15FLEET_CONDITIONAL_ACCESS_URI_PREFIX = "urn:device:apple:uuid:" 16 17 18class FleetStageView(MTLSStageView): 19 def get_authorities(self): 20 stage: EndpointStage = self.executor.current_stage 21 connector = FleetConnector.objects.filter(pk=stage.connector_id).first() 22 controller = connector.controller(connector) 23 kp = CertificateKeyPair.objects.filter(managed=controller.mtls_ca_managed).first() 24 return [kp] if kp else None 25 26 def lookup_device(self, cert: Certificate, mode: StageMode): 27 san_ext: Extension[SubjectAlternativeName] = cert.extensions.get_extension_for_oid( 28 SubjectAlternativeName.oid 29 ) 30 raw_values = san_ext.value.get_values_for_type(UniformResourceIdentifier) 31 values = [x.removeprefix(FLEET_CONDITIONAL_ACCESS_URI_PREFIX).lower() for x in raw_values] 32 self.logger.debug("Looking for devices with uuid", fleet_device_uuid=values) 33 device = Device.objects.filter( 34 **{"deviceconnection__devicefactsnapshot__data__vendor__fleetdm.com__uuid__in": values} 35 ).first() 36 if not device and mode == StageMode.REQUIRED: 37 raise PermissionDenied("Failed to find device") 38 self.executor.plan.context[PLAN_CONTEXT_DEVICE] = device 39 self.executor.plan.context[PLAN_CONTEXT_CERTIFICATE] = self._cert_to_dict(cert) 40 return self.executor.stage_ok() 41 42 def dispatch(self, request, *args, **kwargs): 43 stage: EndpointStage = self.executor.current_stage 44 try: 45 cert = self.get_cert(stage.mode) 46 if not cert: 47 return self.executor.stage_ok() 48 self.logger.debug("Received certificate", cert=fingerprint_sha256(cert)) 49 return self.lookup_device(cert, stage.mode) 50 except PermissionDenied as exc: 51 return self.executor.stage_invalid(error_message=exc.detail)
FLEET_CONDITIONAL_ACCESS_URI_PREFIX =
'urn:device:apple:uuid:'
19class FleetStageView(MTLSStageView): 20 def get_authorities(self): 21 stage: EndpointStage = self.executor.current_stage 22 connector = FleetConnector.objects.filter(pk=stage.connector_id).first() 23 controller = connector.controller(connector) 24 kp = CertificateKeyPair.objects.filter(managed=controller.mtls_ca_managed).first() 25 return [kp] if kp else None 26 27 def lookup_device(self, cert: Certificate, mode: StageMode): 28 san_ext: Extension[SubjectAlternativeName] = cert.extensions.get_extension_for_oid( 29 SubjectAlternativeName.oid 30 ) 31 raw_values = san_ext.value.get_values_for_type(UniformResourceIdentifier) 32 values = [x.removeprefix(FLEET_CONDITIONAL_ACCESS_URI_PREFIX).lower() for x in raw_values] 33 self.logger.debug("Looking for devices with uuid", fleet_device_uuid=values) 34 device = Device.objects.filter( 35 **{"deviceconnection__devicefactsnapshot__data__vendor__fleetdm.com__uuid__in": values} 36 ).first() 37 if not device and mode == StageMode.REQUIRED: 38 raise PermissionDenied("Failed to find device") 39 self.executor.plan.context[PLAN_CONTEXT_DEVICE] = device 40 self.executor.plan.context[PLAN_CONTEXT_CERTIFICATE] = self._cert_to_dict(cert) 41 return self.executor.stage_ok() 42 43 def dispatch(self, request, *args, **kwargs): 44 stage: EndpointStage = self.executor.current_stage 45 try: 46 cert = self.get_cert(stage.mode) 47 if not cert: 48 return self.executor.stage_ok() 49 self.logger.debug("Received certificate", cert=fingerprint_sha256(cert)) 50 return self.lookup_device(cert, stage.mode) 51 except PermissionDenied as exc: 52 return self.executor.stage_invalid(error_message=exc.detail)
Stage view which response with a challenge
def
lookup_device( self, cert: cryptography.hazmat.bindings._rust.x509.Certificate, mode: authentik.endpoints.models.StageMode):
27 def lookup_device(self, cert: Certificate, mode: StageMode): 28 san_ext: Extension[SubjectAlternativeName] = cert.extensions.get_extension_for_oid( 29 SubjectAlternativeName.oid 30 ) 31 raw_values = san_ext.value.get_values_for_type(UniformResourceIdentifier) 32 values = [x.removeprefix(FLEET_CONDITIONAL_ACCESS_URI_PREFIX).lower() for x in raw_values] 33 self.logger.debug("Looking for devices with uuid", fleet_device_uuid=values) 34 device = Device.objects.filter( 35 **{"deviceconnection__devicefactsnapshot__data__vendor__fleetdm.com__uuid__in": values} 36 ).first() 37 if not device and mode == StageMode.REQUIRED: 38 raise PermissionDenied("Failed to find device") 39 self.executor.plan.context[PLAN_CONTEXT_DEVICE] = device 40 self.executor.plan.context[PLAN_CONTEXT_CERTIFICATE] = self._cert_to_dict(cert) 41 return self.executor.stage_ok()
def
dispatch(self, request, *args, **kwargs):
43 def dispatch(self, request, *args, **kwargs): 44 stage: EndpointStage = self.executor.current_stage 45 try: 46 cert = self.get_cert(stage.mode) 47 if not cert: 48 return self.executor.stage_ok() 49 self.logger.debug("Received certificate", cert=fingerprint_sha256(cert)) 50 return self.lookup_device(cert, stage.mode) 51 except PermissionDenied as exc: 52 return self.executor.stage_invalid(error_message=exc.detail)