authentik.enterprise.endpoints.connectors.google_chrome.controller
1from json import dumps, loads 2 3from django.http import HttpRequest, HttpResponseRedirect 4from django.urls import reverse 5from googleapiclient.discovery import build 6 7from authentik.endpoints.controller import BaseController, Capabilities 8from authentik.endpoints.facts import DeviceFacts, OSFamily 9from authentik.endpoints.models import Device, DeviceConnection 10from authentik.enterprise.endpoints.connectors.google_chrome.google_schema import ( 11 DeviceSignals, 12 VerifyChallengeResponseResult, 13) 14from authentik.enterprise.endpoints.connectors.google_chrome.models import GoogleChromeConnector 15from authentik.policies.utils import delete_none_values 16 17# Header we get from chrome that initiates verified access 18HEADER_DEVICE_TRUST = "X-Device-Trust" 19# Header we send to the client with the challenge 20HEADER_ACCESS_CHALLENGE = "X-Verified-Access-Challenge" 21# Header we get back from the client that we verify with google 22HEADER_ACCESS_CHALLENGE_RESPONSE = "X-Verified-Access-Challenge-Response" 23# Header value for x-device-trust that initiates the flow 24DEVICE_TRUST_VERIFIED_ACCESS = "VerifiedAccess" 25 26 27class GoogleChromeController(BaseController[GoogleChromeConnector]): 28 29 def __init__(self, connector): 30 super().__init__(connector) 31 self.google_client = build( 32 "verifiedaccess", 33 "v2", 34 cache_discovery=False, 35 **connector.google_credentials(), 36 ) 37 38 @staticmethod 39 def vendor_identifier() -> str: 40 return "chrome.google.com" 41 42 def capabilities(self) -> list[Capabilities]: 43 return [Capabilities.STAGE_ENDPOINTS, Capabilities.ENROLL_AUTOMATIC_USER] 44 45 def generate_challenge(self, request: HttpRequest) -> HttpResponseRedirect: 46 challenge = self.google_client.challenge().generate().execute() 47 res = HttpResponseRedirect( 48 request.build_absolute_uri( 49 reverse("authentik_endpoints_connectors_google_chrome:chrome") 50 ) 51 ) 52 res[HEADER_ACCESS_CHALLENGE] = dumps(challenge) 53 return res 54 55 def validate_challenge(self, response: str) -> Device: 56 response = VerifyChallengeResponseResult( 57 self.google_client.challenge().verify(body=loads(response)).execute() 58 ) 59 # Remove deprecated string representation of deviceSignals 60 response.pop("deviceSignal", None) 61 signals = DeviceSignals(response["deviceSignals"]) 62 device, _ = Device.objects.update_or_create( 63 identifier=signals["serialNumber"], 64 defaults={ 65 "name": signals["hostname"], 66 }, 67 ) 68 conn, _ = DeviceConnection.objects.update_or_create( 69 device=device, 70 connector=self.connector, 71 ) 72 conn.create_snapshot(self.convert_data(signals)) 73 return device 74 75 def convert_os_family(self, family) -> OSFamily: 76 return { 77 "CHROME_OS": OSFamily.linux, 78 "CHROMIUM_OS": OSFamily.linux, 79 "WINDOWS": OSFamily.windows, 80 "MAC_OS_X": OSFamily.macOS, 81 "LINUX": OSFamily.linux, 82 }.get(family, OSFamily.other) 83 84 def convert_data(self, raw_signals: DeviceSignals): 85 data = { 86 "os": delete_none_values( 87 { 88 "family": self.convert_os_family(raw_signals["operatingSystem"]), 89 "version": raw_signals["osVersion"], 90 } 91 ), 92 "disks": [], 93 "network": delete_none_values( 94 { 95 "hostname": raw_signals["hostname"], 96 "interfaces": [], 97 "firewall_enabled": raw_signals["osFirewall"] == "OS_FIREWALL_ENABLED", 98 }, 99 ), 100 "hardware": delete_none_values( 101 { 102 "model": raw_signals["deviceModel"], 103 "manufacturer": raw_signals["deviceManufacturer"], 104 "serial": raw_signals["serialNumber"], 105 } 106 ), 107 "vendor": { 108 self.vendor_identifier(): { 109 "agent_version": raw_signals["browserVersion"], 110 "raw": raw_signals, 111 }, 112 }, 113 } 114 facts = DeviceFacts(data=data) 115 facts.is_valid(raise_exception=True) 116 return facts.validated_data
HEADER_DEVICE_TRUST =
'X-Device-Trust'
HEADER_ACCESS_CHALLENGE =
'X-Verified-Access-Challenge'
HEADER_ACCESS_CHALLENGE_RESPONSE =
'X-Verified-Access-Challenge-Response'
DEVICE_TRUST_VERIFIED_ACCESS =
'VerifiedAccess'
class
GoogleChromeController(authentik.endpoints.controller.BaseController[authentik.enterprise.endpoints.connectors.google_chrome.models.GoogleChromeConnector]):
28class GoogleChromeController(BaseController[GoogleChromeConnector]): 29 30 def __init__(self, connector): 31 super().__init__(connector) 32 self.google_client = build( 33 "verifiedaccess", 34 "v2", 35 cache_discovery=False, 36 **connector.google_credentials(), 37 ) 38 39 @staticmethod 40 def vendor_identifier() -> str: 41 return "chrome.google.com" 42 43 def capabilities(self) -> list[Capabilities]: 44 return [Capabilities.STAGE_ENDPOINTS, Capabilities.ENROLL_AUTOMATIC_USER] 45 46 def generate_challenge(self, request: HttpRequest) -> HttpResponseRedirect: 47 challenge = self.google_client.challenge().generate().execute() 48 res = HttpResponseRedirect( 49 request.build_absolute_uri( 50 reverse("authentik_endpoints_connectors_google_chrome:chrome") 51 ) 52 ) 53 res[HEADER_ACCESS_CHALLENGE] = dumps(challenge) 54 return res 55 56 def validate_challenge(self, response: str) -> Device: 57 response = VerifyChallengeResponseResult( 58 self.google_client.challenge().verify(body=loads(response)).execute() 59 ) 60 # Remove deprecated string representation of deviceSignals 61 response.pop("deviceSignal", None) 62 signals = DeviceSignals(response["deviceSignals"]) 63 device, _ = Device.objects.update_or_create( 64 identifier=signals["serialNumber"], 65 defaults={ 66 "name": signals["hostname"], 67 }, 68 ) 69 conn, _ = DeviceConnection.objects.update_or_create( 70 device=device, 71 connector=self.connector, 72 ) 73 conn.create_snapshot(self.convert_data(signals)) 74 return device 75 76 def convert_os_family(self, family) -> OSFamily: 77 return { 78 "CHROME_OS": OSFamily.linux, 79 "CHROMIUM_OS": OSFamily.linux, 80 "WINDOWS": OSFamily.windows, 81 "MAC_OS_X": OSFamily.macOS, 82 "LINUX": OSFamily.linux, 83 }.get(family, OSFamily.other) 84 85 def convert_data(self, raw_signals: DeviceSignals): 86 data = { 87 "os": delete_none_values( 88 { 89 "family": self.convert_os_family(raw_signals["operatingSystem"]), 90 "version": raw_signals["osVersion"], 91 } 92 ), 93 "disks": [], 94 "network": delete_none_values( 95 { 96 "hostname": raw_signals["hostname"], 97 "interfaces": [], 98 "firewall_enabled": raw_signals["osFirewall"] == "OS_FIREWALL_ENABLED", 99 }, 100 ), 101 "hardware": delete_none_values( 102 { 103 "model": raw_signals["deviceModel"], 104 "manufacturer": raw_signals["deviceManufacturer"], 105 "serial": raw_signals["serialNumber"], 106 } 107 ), 108 "vendor": { 109 self.vendor_identifier(): { 110 "agent_version": raw_signals["browserVersion"], 111 "raw": raw_signals, 112 }, 113 }, 114 } 115 facts = DeviceFacts(data=data) 116 facts.is_valid(raise_exception=True) 117 return facts.validated_data
Abstract base class for generic types.
On Python 3.12 and newer, generic classes implicitly inherit from Generic when they declare a parameter list after the class's name::
class Mapping[KT, VT]:
def __getitem__(self, key: KT) -> VT:
...
# Etc.
On older versions of Python, however, generic classes have to explicitly inherit from Generic.
After a class has been declared to be generic, it can then be used as follows::
def lookup_name[KT, VT](mapping: Mapping[KT, VT], key: KT, default: VT) -> VT:
try:
return mapping[key]
except KeyError:
return default
def
generate_challenge( self, request: django.http.request.HttpRequest) -> django.http.response.HttpResponseRedirect:
46 def generate_challenge(self, request: HttpRequest) -> HttpResponseRedirect: 47 challenge = self.google_client.challenge().generate().execute() 48 res = HttpResponseRedirect( 49 request.build_absolute_uri( 50 reverse("authentik_endpoints_connectors_google_chrome:chrome") 51 ) 52 ) 53 res[HEADER_ACCESS_CHALLENGE] = dumps(challenge) 54 return res
56 def validate_challenge(self, response: str) -> Device: 57 response = VerifyChallengeResponseResult( 58 self.google_client.challenge().verify(body=loads(response)).execute() 59 ) 60 # Remove deprecated string representation of deviceSignals 61 response.pop("deviceSignal", None) 62 signals = DeviceSignals(response["deviceSignals"]) 63 device, _ = Device.objects.update_or_create( 64 identifier=signals["serialNumber"], 65 defaults={ 66 "name": signals["hostname"], 67 }, 68 ) 69 conn, _ = DeviceConnection.objects.update_or_create( 70 device=device, 71 connector=self.connector, 72 ) 73 conn.create_snapshot(self.convert_data(signals)) 74 return device
def
convert_data( self, raw_signals: authentik.enterprise.endpoints.connectors.google_chrome.google_schema.DeviceSignals):
85 def convert_data(self, raw_signals: DeviceSignals): 86 data = { 87 "os": delete_none_values( 88 { 89 "family": self.convert_os_family(raw_signals["operatingSystem"]), 90 "version": raw_signals["osVersion"], 91 } 92 ), 93 "disks": [], 94 "network": delete_none_values( 95 { 96 "hostname": raw_signals["hostname"], 97 "interfaces": [], 98 "firewall_enabled": raw_signals["osFirewall"] == "OS_FIREWALL_ENABLED", 99 }, 100 ), 101 "hardware": delete_none_values( 102 { 103 "model": raw_signals["deviceModel"], 104 "manufacturer": raw_signals["deviceManufacturer"], 105 "serial": raw_signals["serialNumber"], 106 } 107 ), 108 "vendor": { 109 self.vendor_identifier(): { 110 "agent_version": raw_signals["browserVersion"], 111 "raw": raw_signals, 112 }, 113 }, 114 } 115 facts = DeviceFacts(data=data) 116 facts.is_valid(raise_exception=True) 117 return facts.validated_data