authentik.enterprise.providers.google_workspace.clients.base

 1from django.db.models import Model
 2from django.http import HttpResponseBadRequest, HttpResponseNotFound
 3from google.auth.exceptions import GoogleAuthError, TransportError
 4from googleapiclient.discovery import build
 5from googleapiclient.errors import Error, HttpError
 6from googleapiclient.http import HttpRequest
 7from httplib2 import HttpLib2Error, HttpLib2ErrorWithResponse
 8
 9from authentik.enterprise.providers.google_workspace.models import GoogleWorkspaceProvider
10from authentik.lib.sync.outgoing import HTTP_CONFLICT
11from authentik.lib.sync.outgoing.base import SAFE_METHODS, BaseOutgoingSyncClient
12from authentik.lib.sync.outgoing.exceptions import (
13    BadRequestSyncException,
14    DryRunRejected,
15    NotFoundSyncException,
16    ObjectExistsSyncException,
17    StopSync,
18    TransientSyncException,
19)
20
21
22class GoogleWorkspaceSyncClient[TModel: Model, TConnection: Model, TSchema: dict](
23    BaseOutgoingSyncClient[TModel, TConnection, TSchema, GoogleWorkspaceProvider]
24):
25    """Base client for syncing to google workspace"""
26
27    domains: list
28
29    def __init__(self, provider: GoogleWorkspaceProvider) -> None:
30        super().__init__(provider)
31        self.directory_service = build(
32            "admin",
33            "directory_v1",
34            cache_discovery=False,
35            **provider.google_credentials(),
36        )
37        self.__prefetch_domains()
38
39    def __prefetch_domains(self):
40        self.domains = []
41        domains = self._request(self.directory_service.domains().list(customer="my_customer"))
42        for domain in domains.get("domains", []):
43            domain_name = domain.get("domainName")
44            self.domains.append(domain_name)
45
46    def _request(self, request: HttpRequest):
47        if self.provider.dry_run and request.method.upper() not in SAFE_METHODS:
48            raise DryRunRejected(request.uri, request.method, request.body)
49        try:
50            response = request.execute()
51        except GoogleAuthError as exc:
52            if isinstance(exc, TransportError):
53                raise TransientSyncException(f"Failed to send request: {str(exc)}") from exc
54            raise StopSync(exc) from exc
55        except HttpLib2Error as exc:
56            if isinstance(exc, HttpLib2ErrorWithResponse):
57                self._response_handle_status_code(request.body, exc.response.status, exc)
58            raise TransientSyncException(f"Failed to send request: {str(exc)}") from exc
59        except HttpError as exc:
60            self._response_handle_status_code(request.body, exc.status_code, exc)
61            raise TransientSyncException(f"Failed to send request: {str(exc)}") from exc
62        except Error as exc:
63            raise TransientSyncException(f"Failed to send request: {str(exc)}") from exc
64        return response
65
66    def _response_handle_status_code(self, request: dict, status_code: int, root_exc: Exception):
67        if status_code == HttpResponseNotFound.status_code:
68            raise NotFoundSyncException("Object not found") from root_exc
69        if status_code == HTTP_CONFLICT:
70            raise ObjectExistsSyncException("Object exists") from root_exc
71        if status_code == HttpResponseBadRequest.status_code:
72            raise BadRequestSyncException("Bad request", request) from root_exc
73
74    def check_email_valid(self, *emails: str):
75        for email in emails:
76            if not any(email.endswith(f"@{domain_name}") for domain_name in self.domains):
77                raise BadRequestSyncException(f"Invalid email domain: {email}")
23class GoogleWorkspaceSyncClient[TModel: Model, TConnection: Model, TSchema: dict](
24    BaseOutgoingSyncClient[TModel, TConnection, TSchema, GoogleWorkspaceProvider]
25):
26    """Base client for syncing to google workspace"""
27
28    domains: list
29
30    def __init__(self, provider: GoogleWorkspaceProvider) -> None:
31        super().__init__(provider)
32        self.directory_service = build(
33            "admin",
34            "directory_v1",
35            cache_discovery=False,
36            **provider.google_credentials(),
37        )
38        self.__prefetch_domains()
39
40    def __prefetch_domains(self):
41        self.domains = []
42        domains = self._request(self.directory_service.domains().list(customer="my_customer"))
43        for domain in domains.get("domains", []):
44            domain_name = domain.get("domainName")
45            self.domains.append(domain_name)
46
47    def _request(self, request: HttpRequest):
48        if self.provider.dry_run and request.method.upper() not in SAFE_METHODS:
49            raise DryRunRejected(request.uri, request.method, request.body)
50        try:
51            response = request.execute()
52        except GoogleAuthError as exc:
53            if isinstance(exc, TransportError):
54                raise TransientSyncException(f"Failed to send request: {str(exc)}") from exc
55            raise StopSync(exc) from exc
56        except HttpLib2Error as exc:
57            if isinstance(exc, HttpLib2ErrorWithResponse):
58                self._response_handle_status_code(request.body, exc.response.status, exc)
59            raise TransientSyncException(f"Failed to send request: {str(exc)}") from exc
60        except HttpError as exc:
61            self._response_handle_status_code(request.body, exc.status_code, exc)
62            raise TransientSyncException(f"Failed to send request: {str(exc)}") from exc
63        except Error as exc:
64            raise TransientSyncException(f"Failed to send request: {str(exc)}") from exc
65        return response
66
67    def _response_handle_status_code(self, request: dict, status_code: int, root_exc: Exception):
68        if status_code == HttpResponseNotFound.status_code:
69            raise NotFoundSyncException("Object not found") from root_exc
70        if status_code == HTTP_CONFLICT:
71            raise ObjectExistsSyncException("Object exists") from root_exc
72        if status_code == HttpResponseBadRequest.status_code:
73            raise BadRequestSyncException("Bad request", request) from root_exc
74
75    def check_email_valid(self, *emails: str):
76        for email in emails:
77            if not any(email.endswith(f"@{domain_name}") for domain_name in self.domains):
78                raise BadRequestSyncException(f"Invalid email domain: {email}")

Base client for syncing to google workspace

GoogleWorkspaceSyncClient( provider: authentik.enterprise.providers.google_workspace.models.GoogleWorkspaceProvider)
30    def __init__(self, provider: GoogleWorkspaceProvider) -> None:
31        super().__init__(provider)
32        self.directory_service = build(
33            "admin",
34            "directory_v1",
35            cache_discovery=False,
36            **provider.google_credentials(),
37        )
38        self.__prefetch_domains()
domains: list
directory_service
def check_email_valid(self, *emails: str):
75    def check_email_valid(self, *emails: str):
76        for email in emails:
77            if not any(email.endswith(f"@{domain_name}") for domain_name in self.domains):
78                raise BadRequestSyncException(f"Invalid email domain: {email}")