authentik.enterprise.providers.google_workspace.clients.base
1from django.db.models import Model 2from django.http import HttpResponseBadRequest, HttpResponseNotFound 3from google.auth.exceptions import GoogleAuthError, TransportError 4from googleapiclient.discovery import build 5from googleapiclient.errors import Error, HttpError 6from googleapiclient.http import HttpRequest 7from httplib2 import HttpLib2Error, HttpLib2ErrorWithResponse 8 9from authentik.enterprise.providers.google_workspace.models import GoogleWorkspaceProvider 10from authentik.lib.sync.outgoing import HTTP_CONFLICT 11from authentik.lib.sync.outgoing.base import SAFE_METHODS, BaseOutgoingSyncClient 12from authentik.lib.sync.outgoing.exceptions import ( 13 BadRequestSyncException, 14 DryRunRejected, 15 NotFoundSyncException, 16 ObjectExistsSyncException, 17 StopSync, 18 TransientSyncException, 19) 20 21 22class GoogleWorkspaceSyncClient[TModel: Model, TConnection: Model, TSchema: dict]( 23 BaseOutgoingSyncClient[TModel, TConnection, TSchema, GoogleWorkspaceProvider] 24): 25 """Base client for syncing to google workspace""" 26 27 domains: list 28 29 def __init__(self, provider: GoogleWorkspaceProvider) -> None: 30 super().__init__(provider) 31 self.directory_service = build( 32 "admin", 33 "directory_v1", 34 cache_discovery=False, 35 **provider.google_credentials(), 36 ) 37 self.__prefetch_domains() 38 39 def __prefetch_domains(self): 40 self.domains = [] 41 domains = self._request(self.directory_service.domains().list(customer="my_customer")) 42 for domain in domains.get("domains", []): 43 domain_name = domain.get("domainName") 44 self.domains.append(domain_name) 45 46 def _request(self, request: HttpRequest): 47 if self.provider.dry_run and request.method.upper() not in SAFE_METHODS: 48 raise DryRunRejected(request.uri, request.method, request.body) 49 try: 50 response = request.execute() 51 except GoogleAuthError as exc: 52 if isinstance(exc, TransportError): 53 raise TransientSyncException(f"Failed to send request: {str(exc)}") from exc 54 raise StopSync(exc) from exc 55 except HttpLib2Error as exc: 56 if isinstance(exc, HttpLib2ErrorWithResponse): 57 self._response_handle_status_code(request.body, exc.response.status, exc) 58 raise TransientSyncException(f"Failed to send request: {str(exc)}") from exc 59 except HttpError as exc: 60 self._response_handle_status_code(request.body, exc.status_code, exc) 61 raise TransientSyncException(f"Failed to send request: {str(exc)}") from exc 62 except Error as exc: 63 raise TransientSyncException(f"Failed to send request: {str(exc)}") from exc 64 return response 65 66 def _response_handle_status_code(self, request: dict, status_code: int, root_exc: Exception): 67 if status_code == HttpResponseNotFound.status_code: 68 raise NotFoundSyncException("Object not found") from root_exc 69 if status_code == HTTP_CONFLICT: 70 raise ObjectExistsSyncException("Object exists") from root_exc 71 if status_code == HttpResponseBadRequest.status_code: 72 raise BadRequestSyncException("Bad request", request) from root_exc 73 74 def check_email_valid(self, *emails: str): 75 for email in emails: 76 if not any(email.endswith(f"@{domain_name}") for domain_name in self.domains): 77 raise BadRequestSyncException(f"Invalid email domain: {email}")
class
GoogleWorkspaceSyncClient(authentik.lib.sync.outgoing.base.BaseOutgoingSyncClient[TModel, TConnection, TSchema, authentik.enterprise.providers.google_workspace.models.GoogleWorkspaceProvider], typing.Generic[TModel, TConnection, TSchema]):
23class GoogleWorkspaceSyncClient[TModel: Model, TConnection: Model, TSchema: dict]( 24 BaseOutgoingSyncClient[TModel, TConnection, TSchema, GoogleWorkspaceProvider] 25): 26 """Base client for syncing to google workspace""" 27 28 domains: list 29 30 def __init__(self, provider: GoogleWorkspaceProvider) -> None: 31 super().__init__(provider) 32 self.directory_service = build( 33 "admin", 34 "directory_v1", 35 cache_discovery=False, 36 **provider.google_credentials(), 37 ) 38 self.__prefetch_domains() 39 40 def __prefetch_domains(self): 41 self.domains = [] 42 domains = self._request(self.directory_service.domains().list(customer="my_customer")) 43 for domain in domains.get("domains", []): 44 domain_name = domain.get("domainName") 45 self.domains.append(domain_name) 46 47 def _request(self, request: HttpRequest): 48 if self.provider.dry_run and request.method.upper() not in SAFE_METHODS: 49 raise DryRunRejected(request.uri, request.method, request.body) 50 try: 51 response = request.execute() 52 except GoogleAuthError as exc: 53 if isinstance(exc, TransportError): 54 raise TransientSyncException(f"Failed to send request: {str(exc)}") from exc 55 raise StopSync(exc) from exc 56 except HttpLib2Error as exc: 57 if isinstance(exc, HttpLib2ErrorWithResponse): 58 self._response_handle_status_code(request.body, exc.response.status, exc) 59 raise TransientSyncException(f"Failed to send request: {str(exc)}") from exc 60 except HttpError as exc: 61 self._response_handle_status_code(request.body, exc.status_code, exc) 62 raise TransientSyncException(f"Failed to send request: {str(exc)}") from exc 63 except Error as exc: 64 raise TransientSyncException(f"Failed to send request: {str(exc)}") from exc 65 return response 66 67 def _response_handle_status_code(self, request: dict, status_code: int, root_exc: Exception): 68 if status_code == HttpResponseNotFound.status_code: 69 raise NotFoundSyncException("Object not found") from root_exc 70 if status_code == HTTP_CONFLICT: 71 raise ObjectExistsSyncException("Object exists") from root_exc 72 if status_code == HttpResponseBadRequest.status_code: 73 raise BadRequestSyncException("Bad request", request) from root_exc 74 75 def check_email_valid(self, *emails: str): 76 for email in emails: 77 if not any(email.endswith(f"@{domain_name}") for domain_name in self.domains): 78 raise BadRequestSyncException(f"Invalid email domain: {email}")
Base client for syncing to google workspace
GoogleWorkspaceSyncClient( provider: authentik.enterprise.providers.google_workspace.models.GoogleWorkspaceProvider)