authentik.enterprise.endpoints.connectors.google_chrome.controller

  1from json import dumps, loads
  2
  3from django.http import HttpRequest, HttpResponseRedirect
  4from django.urls import reverse
  5from googleapiclient.discovery import build
  6
  7from authentik.endpoints.controller import BaseController, Capabilities
  8from authentik.endpoints.facts import DeviceFacts, OSFamily
  9from authentik.endpoints.models import Device, DeviceConnection
 10from authentik.enterprise.endpoints.connectors.google_chrome.google_schema import (
 11    DeviceSignals,
 12    VerifyChallengeResponseResult,
 13)
 14from authentik.enterprise.endpoints.connectors.google_chrome.models import GoogleChromeConnector
 15from authentik.policies.utils import delete_none_values
 16
 17# Header we get from chrome that initiates verified access
 18HEADER_DEVICE_TRUST = "X-Device-Trust"
 19# Header we send to the client with the challenge
 20HEADER_ACCESS_CHALLENGE = "X-Verified-Access-Challenge"
 21# Header we get back from the client that we verify with google
 22HEADER_ACCESS_CHALLENGE_RESPONSE = "X-Verified-Access-Challenge-Response"
 23# Header value for x-device-trust that initiates the flow
 24DEVICE_TRUST_VERIFIED_ACCESS = "VerifiedAccess"
 25
 26
 27class GoogleChromeController(BaseController[GoogleChromeConnector]):
 28
 29    def __init__(self, connector):
 30        super().__init__(connector)
 31        self.google_client = build(
 32            "verifiedaccess",
 33            "v2",
 34            cache_discovery=False,
 35            **connector.google_credentials(),
 36        )
 37
 38    @staticmethod
 39    def vendor_identifier() -> str:
 40        return "chrome.google.com"
 41
 42    def capabilities(self) -> list[Capabilities]:
 43        return [Capabilities.STAGE_ENDPOINTS, Capabilities.ENROLL_AUTOMATIC_USER]
 44
 45    def generate_challenge(self, request: HttpRequest) -> HttpResponseRedirect:
 46        challenge = self.google_client.challenge().generate().execute()
 47        res = HttpResponseRedirect(
 48            request.build_absolute_uri(
 49                reverse("authentik_endpoints_connectors_google_chrome:chrome")
 50            )
 51        )
 52        res[HEADER_ACCESS_CHALLENGE] = dumps(challenge)
 53        return res
 54
 55    def validate_challenge(self, response: str) -> Device:
 56        response = VerifyChallengeResponseResult(
 57            self.google_client.challenge().verify(body=loads(response)).execute()
 58        )
 59        # Remove deprecated string representation of deviceSignals
 60        response.pop("deviceSignal", None)
 61        signals = DeviceSignals(response["deviceSignals"])
 62        device, _ = Device.objects.update_or_create(
 63            identifier=signals["serialNumber"],
 64            defaults={
 65                "name": signals["hostname"],
 66            },
 67        )
 68        conn, _ = DeviceConnection.objects.update_or_create(
 69            device=device,
 70            connector=self.connector,
 71        )
 72        conn.create_snapshot(self.convert_data(signals))
 73        return device
 74
 75    def convert_os_family(self, family) -> OSFamily:
 76        return {
 77            "CHROME_OS": OSFamily.linux,
 78            "CHROMIUM_OS": OSFamily.linux,
 79            "WINDOWS": OSFamily.windows,
 80            "MAC_OS_X": OSFamily.macOS,
 81            "LINUX": OSFamily.linux,
 82        }.get(family, OSFamily.other)
 83
 84    def convert_data(self, raw_signals: DeviceSignals):
 85        data = {
 86            "os": delete_none_values(
 87                {
 88                    "family": self.convert_os_family(raw_signals["operatingSystem"]),
 89                    "version": raw_signals["osVersion"],
 90                }
 91            ),
 92            "disks": [],
 93            "network": delete_none_values(
 94                {
 95                    "hostname": raw_signals["hostname"],
 96                    "interfaces": [],
 97                    "firewall_enabled": raw_signals["osFirewall"] == "OS_FIREWALL_ENABLED",
 98                },
 99            ),
100            "hardware": delete_none_values(
101                {
102                    "model": raw_signals["deviceModel"],
103                    "manufacturer": raw_signals["deviceManufacturer"],
104                    "serial": raw_signals["serialNumber"],
105                }
106            ),
107            "vendor": {
108                self.vendor_identifier(): {
109                    "agent_version": raw_signals["browserVersion"],
110                    "raw": raw_signals,
111                },
112            },
113        }
114        facts = DeviceFacts(data=data)
115        facts.is_valid(raise_exception=True)
116        return facts.validated_data
HEADER_DEVICE_TRUST = 'X-Device-Trust'
HEADER_ACCESS_CHALLENGE = 'X-Verified-Access-Challenge'
HEADER_ACCESS_CHALLENGE_RESPONSE = 'X-Verified-Access-Challenge-Response'
DEVICE_TRUST_VERIFIED_ACCESS = 'VerifiedAccess'
 28class GoogleChromeController(BaseController[GoogleChromeConnector]):
 29
 30    def __init__(self, connector):
 31        super().__init__(connector)
 32        self.google_client = build(
 33            "verifiedaccess",
 34            "v2",
 35            cache_discovery=False,
 36            **connector.google_credentials(),
 37        )
 38
 39    @staticmethod
 40    def vendor_identifier() -> str:
 41        return "chrome.google.com"
 42
 43    def capabilities(self) -> list[Capabilities]:
 44        return [Capabilities.STAGE_ENDPOINTS, Capabilities.ENROLL_AUTOMATIC_USER]
 45
 46    def generate_challenge(self, request: HttpRequest) -> HttpResponseRedirect:
 47        challenge = self.google_client.challenge().generate().execute()
 48        res = HttpResponseRedirect(
 49            request.build_absolute_uri(
 50                reverse("authentik_endpoints_connectors_google_chrome:chrome")
 51            )
 52        )
 53        res[HEADER_ACCESS_CHALLENGE] = dumps(challenge)
 54        return res
 55
 56    def validate_challenge(self, response: str) -> Device:
 57        response = VerifyChallengeResponseResult(
 58            self.google_client.challenge().verify(body=loads(response)).execute()
 59        )
 60        # Remove deprecated string representation of deviceSignals
 61        response.pop("deviceSignal", None)
 62        signals = DeviceSignals(response["deviceSignals"])
 63        device, _ = Device.objects.update_or_create(
 64            identifier=signals["serialNumber"],
 65            defaults={
 66                "name": signals["hostname"],
 67            },
 68        )
 69        conn, _ = DeviceConnection.objects.update_or_create(
 70            device=device,
 71            connector=self.connector,
 72        )
 73        conn.create_snapshot(self.convert_data(signals))
 74        return device
 75
 76    def convert_os_family(self, family) -> OSFamily:
 77        return {
 78            "CHROME_OS": OSFamily.linux,
 79            "CHROMIUM_OS": OSFamily.linux,
 80            "WINDOWS": OSFamily.windows,
 81            "MAC_OS_X": OSFamily.macOS,
 82            "LINUX": OSFamily.linux,
 83        }.get(family, OSFamily.other)
 84
 85    def convert_data(self, raw_signals: DeviceSignals):
 86        data = {
 87            "os": delete_none_values(
 88                {
 89                    "family": self.convert_os_family(raw_signals["operatingSystem"]),
 90                    "version": raw_signals["osVersion"],
 91                }
 92            ),
 93            "disks": [],
 94            "network": delete_none_values(
 95                {
 96                    "hostname": raw_signals["hostname"],
 97                    "interfaces": [],
 98                    "firewall_enabled": raw_signals["osFirewall"] == "OS_FIREWALL_ENABLED",
 99                },
100            ),
101            "hardware": delete_none_values(
102                {
103                    "model": raw_signals["deviceModel"],
104                    "manufacturer": raw_signals["deviceManufacturer"],
105                    "serial": raw_signals["serialNumber"],
106                }
107            ),
108            "vendor": {
109                self.vendor_identifier(): {
110                    "agent_version": raw_signals["browserVersion"],
111                    "raw": raw_signals,
112                },
113            },
114        }
115        facts = DeviceFacts(data=data)
116        facts.is_valid(raise_exception=True)
117        return facts.validated_data

Abstract base class for generic types.

On Python 3.12 and newer, generic classes implicitly inherit from Generic when they declare a parameter list after the class's name::

class Mapping[KT, VT]:
    def __getitem__(self, key: KT) -> VT:
        ...
    # Etc.

On older versions of Python, however, generic classes have to explicitly inherit from Generic.

After a class has been declared to be generic, it can then be used as follows::

def lookup_name[KT, VT](mapping: Mapping[KT, VT], key: KT, default: VT) -> VT:
    try:
        return mapping[key]
    except KeyError:
        return default
GoogleChromeController(connector)
30    def __init__(self, connector):
31        super().__init__(connector)
32        self.google_client = build(
33            "verifiedaccess",
34            "v2",
35            cache_discovery=False,
36            **connector.google_credentials(),
37        )
google_client
@staticmethod
def vendor_identifier() -> str:
39    @staticmethod
40    def vendor_identifier() -> str:
41        return "chrome.google.com"
def capabilities(self) -> list[authentik.endpoints.controller.Capabilities]:
43    def capabilities(self) -> list[Capabilities]:
44        return [Capabilities.STAGE_ENDPOINTS, Capabilities.ENROLL_AUTOMATIC_USER]
def generate_challenge( self, request: django.http.request.HttpRequest) -> django.http.response.HttpResponseRedirect:
46    def generate_challenge(self, request: HttpRequest) -> HttpResponseRedirect:
47        challenge = self.google_client.challenge().generate().execute()
48        res = HttpResponseRedirect(
49            request.build_absolute_uri(
50                reverse("authentik_endpoints_connectors_google_chrome:chrome")
51            )
52        )
53        res[HEADER_ACCESS_CHALLENGE] = dumps(challenge)
54        return res
def validate_challenge(self, response: str) -> authentik.endpoints.models.Device:
56    def validate_challenge(self, response: str) -> Device:
57        response = VerifyChallengeResponseResult(
58            self.google_client.challenge().verify(body=loads(response)).execute()
59        )
60        # Remove deprecated string representation of deviceSignals
61        response.pop("deviceSignal", None)
62        signals = DeviceSignals(response["deviceSignals"])
63        device, _ = Device.objects.update_or_create(
64            identifier=signals["serialNumber"],
65            defaults={
66                "name": signals["hostname"],
67            },
68        )
69        conn, _ = DeviceConnection.objects.update_or_create(
70            device=device,
71            connector=self.connector,
72        )
73        conn.create_snapshot(self.convert_data(signals))
74        return device
def convert_os_family(self, family) -> authentik.endpoints.facts.OSFamily:
76    def convert_os_family(self, family) -> OSFamily:
77        return {
78            "CHROME_OS": OSFamily.linux,
79            "CHROMIUM_OS": OSFamily.linux,
80            "WINDOWS": OSFamily.windows,
81            "MAC_OS_X": OSFamily.macOS,
82            "LINUX": OSFamily.linux,
83        }.get(family, OSFamily.other)
 85    def convert_data(self, raw_signals: DeviceSignals):
 86        data = {
 87            "os": delete_none_values(
 88                {
 89                    "family": self.convert_os_family(raw_signals["operatingSystem"]),
 90                    "version": raw_signals["osVersion"],
 91                }
 92            ),
 93            "disks": [],
 94            "network": delete_none_values(
 95                {
 96                    "hostname": raw_signals["hostname"],
 97                    "interfaces": [],
 98                    "firewall_enabled": raw_signals["osFirewall"] == "OS_FIREWALL_ENABLED",
 99                },
100            ),
101            "hardware": delete_none_values(
102                {
103                    "model": raw_signals["deviceModel"],
104                    "manufacturer": raw_signals["deviceManufacturer"],
105                    "serial": raw_signals["serialNumber"],
106                }
107            ),
108            "vendor": {
109                self.vendor_identifier(): {
110                    "agent_version": raw_signals["browserVersion"],
111                    "raw": raw_signals,
112                },
113            },
114        }
115        facts = DeviceFacts(data=data)
116        facts.is_valid(raise_exception=True)
117        return facts.validated_data