authentik.providers.scim.clients.base

SCIM Client

  1"""SCIM Client"""
  2
  3from typing import TYPE_CHECKING
  4
  5from django.core.cache import cache
  6from django.http import HttpResponseBadRequest, HttpResponseNotFound
  7from pydantic import ValidationError
  8from requests import RequestException, Session
  9
 10from authentik.lib.sync.outgoing import (
 11    HTTP_CONFLICT,
 12    HTTP_NO_CONTENT,
 13    HTTP_SERVICE_UNAVAILABLE,
 14    HTTP_TOO_MANY_REQUESTS,
 15)
 16from authentik.lib.sync.outgoing.base import SAFE_METHODS, BaseOutgoingSyncClient
 17from authentik.lib.sync.outgoing.exceptions import (
 18    DryRunRejected,
 19    NotFoundSyncException,
 20    ObjectExistsSyncException,
 21    TransientSyncException,
 22)
 23from authentik.lib.utils.http import get_http_session
 24from authentik.providers.scim.clients.exceptions import SCIMRequestException
 25from authentik.providers.scim.clients.schema import ServiceProviderConfiguration
 26from authentik.providers.scim.models import SCIMCompatibilityMode, SCIMProvider
 27
 28if TYPE_CHECKING:
 29    from django.db.models import Model
 30    from pydantic import BaseModel
 31
 32
 33class SCIMClient[TModel: "Model", TConnection: "Model", TSchema: "BaseModel"](
 34    BaseOutgoingSyncClient[TModel, TConnection, TSchema, SCIMProvider]
 35):
 36    """SCIM Client"""
 37
 38    base_url: str
 39
 40    _session: Session
 41    _config: ServiceProviderConfiguration
 42
 43    def __init__(self, provider: SCIMProvider):
 44        super().__init__(provider)
 45        self._session = get_http_session()
 46        self._session.verify = provider.verify_certificates
 47        self.provider = provider
 48        self.auth = provider.scim_auth()
 49        # Remove trailing slashes as we assume the URL doesn't have any
 50        base_url = provider.url
 51        if base_url.endswith("/"):
 52            base_url = base_url[:-1]
 53        self.base_url = base_url
 54        self._config = self.get_service_provider_config()
 55
 56    def _request(self, method: str, path: str, **kwargs) -> dict:
 57        """Wrapper to send a request to the full URL"""
 58        if self.provider.dry_run and method.upper() not in SAFE_METHODS:
 59            raise DryRunRejected(f"{self.base_url}{path}", method, body=kwargs.get("json"))
 60        try:
 61            response = self._session.request(
 62                method,
 63                f"{self.base_url}{path}",
 64                **kwargs,
 65                auth=self.auth,
 66                headers={
 67                    "Accept": "application/scim+json",
 68                    "Content-Type": "application/scim+json",
 69                },
 70            )
 71        except RequestException as exc:
 72            raise SCIMRequestException(message="Failed to send request") from exc
 73        self.logger.debug("scim request", path=path, method=method, **kwargs)
 74        if response.status_code >= HttpResponseBadRequest.status_code:
 75            if response.status_code == HttpResponseNotFound.status_code:
 76                raise NotFoundSyncException(response)
 77            if response.status_code in [HTTP_TOO_MANY_REQUESTS, HTTP_SERVICE_UNAVAILABLE]:
 78                raise TransientSyncException()
 79            if response.status_code == HTTP_CONFLICT:
 80                raise ObjectExistsSyncException(response)
 81            self.logger.warning(
 82                "Failed to send SCIM request", path=path, method=method, response=response.text
 83            )
 84            raise SCIMRequestException(response)
 85        if response.status_code == HTTP_NO_CONTENT:
 86            return {}
 87        return response.json()
 88
 89    def get_service_provider_config(self):
 90        """Get Service provider config"""
 91        default_config = ServiceProviderConfiguration.default()
 92        timeout_seconds = self.provider.service_provider_config_cache_timeout_seconds
 93        cache_key = f"goauthentik.io/providers/scim/{self.provider.pk}/service_provider_config"
 94
 95        # Check cache first
 96        cached_config = cache.get(cache_key) if timeout_seconds > 0 else None
 97        if cached_config is not None:
 98            return cached_config
 99
100        # Attempt to fetch from remote
101        path = "/ServiceProviderConfig"
102        if self.provider.compatibility_mode == SCIMCompatibilityMode.SALESFORCE:
103            path = "/ServiceProviderConfigs"
104
105        try:
106            config = ServiceProviderConfiguration.model_validate(self._request("GET", path))
107            if self.provider.compatibility_mode == SCIMCompatibilityMode.AWS:
108                config.patch.supported = False
109            if self.provider.compatibility_mode == SCIMCompatibilityMode.SLACK:
110                config.filter.supported = True
111        except (ValidationError, SCIMRequestException, NotFoundSyncException) as exc:
112            self.logger.warning(
113                "failed to get ServiceProviderConfig, using default",
114                exc=exc,
115            )
116            config = default_config
117
118        # Cache the config (either successfully fetched or default)
119        if timeout_seconds > 0:
120            cache.set(cache_key, config, timeout_seconds)
121        else:
122            cache.delete(cache_key)
123        return config
 34class SCIMClient[TModel: "Model", TConnection: "Model", TSchema: "BaseModel"](
 35    BaseOutgoingSyncClient[TModel, TConnection, TSchema, SCIMProvider]
 36):
 37    """SCIM Client"""
 38
 39    base_url: str
 40
 41    _session: Session
 42    _config: ServiceProviderConfiguration
 43
 44    def __init__(self, provider: SCIMProvider):
 45        super().__init__(provider)
 46        self._session = get_http_session()
 47        self._session.verify = provider.verify_certificates
 48        self.provider = provider
 49        self.auth = provider.scim_auth()
 50        # Remove trailing slashes as we assume the URL doesn't have any
 51        base_url = provider.url
 52        if base_url.endswith("/"):
 53            base_url = base_url[:-1]
 54        self.base_url = base_url
 55        self._config = self.get_service_provider_config()
 56
 57    def _request(self, method: str, path: str, **kwargs) -> dict:
 58        """Wrapper to send a request to the full URL"""
 59        if self.provider.dry_run and method.upper() not in SAFE_METHODS:
 60            raise DryRunRejected(f"{self.base_url}{path}", method, body=kwargs.get("json"))
 61        try:
 62            response = self._session.request(
 63                method,
 64                f"{self.base_url}{path}",
 65                **kwargs,
 66                auth=self.auth,
 67                headers={
 68                    "Accept": "application/scim+json",
 69                    "Content-Type": "application/scim+json",
 70                },
 71            )
 72        except RequestException as exc:
 73            raise SCIMRequestException(message="Failed to send request") from exc
 74        self.logger.debug("scim request", path=path, method=method, **kwargs)
 75        if response.status_code >= HttpResponseBadRequest.status_code:
 76            if response.status_code == HttpResponseNotFound.status_code:
 77                raise NotFoundSyncException(response)
 78            if response.status_code in [HTTP_TOO_MANY_REQUESTS, HTTP_SERVICE_UNAVAILABLE]:
 79                raise TransientSyncException()
 80            if response.status_code == HTTP_CONFLICT:
 81                raise ObjectExistsSyncException(response)
 82            self.logger.warning(
 83                "Failed to send SCIM request", path=path, method=method, response=response.text
 84            )
 85            raise SCIMRequestException(response)
 86        if response.status_code == HTTP_NO_CONTENT:
 87            return {}
 88        return response.json()
 89
 90    def get_service_provider_config(self):
 91        """Get Service provider config"""
 92        default_config = ServiceProviderConfiguration.default()
 93        timeout_seconds = self.provider.service_provider_config_cache_timeout_seconds
 94        cache_key = f"goauthentik.io/providers/scim/{self.provider.pk}/service_provider_config"
 95
 96        # Check cache first
 97        cached_config = cache.get(cache_key) if timeout_seconds > 0 else None
 98        if cached_config is not None:
 99            return cached_config
100
101        # Attempt to fetch from remote
102        path = "/ServiceProviderConfig"
103        if self.provider.compatibility_mode == SCIMCompatibilityMode.SALESFORCE:
104            path = "/ServiceProviderConfigs"
105
106        try:
107            config = ServiceProviderConfiguration.model_validate(self._request("GET", path))
108            if self.provider.compatibility_mode == SCIMCompatibilityMode.AWS:
109                config.patch.supported = False
110            if self.provider.compatibility_mode == SCIMCompatibilityMode.SLACK:
111                config.filter.supported = True
112        except (ValidationError, SCIMRequestException, NotFoundSyncException) as exc:
113            self.logger.warning(
114                "failed to get ServiceProviderConfig, using default",
115                exc=exc,
116            )
117            config = default_config
118
119        # Cache the config (either successfully fetched or default)
120        if timeout_seconds > 0:
121            cache.set(cache_key, config, timeout_seconds)
122        else:
123            cache.delete(cache_key)
124        return config

SCIM Client

SCIMClient(provider: authentik.providers.scim.models.SCIMProvider)
44    def __init__(self, provider: SCIMProvider):
45        super().__init__(provider)
46        self._session = get_http_session()
47        self._session.verify = provider.verify_certificates
48        self.provider = provider
49        self.auth = provider.scim_auth()
50        # Remove trailing slashes as we assume the URL doesn't have any
51        base_url = provider.url
52        if base_url.endswith("/"):
53            base_url = base_url[:-1]
54        self.base_url = base_url
55        self._config = self.get_service_provider_config()
base_url: str
provider
auth
def get_service_provider_config(self):
 90    def get_service_provider_config(self):
 91        """Get Service provider config"""
 92        default_config = ServiceProviderConfiguration.default()
 93        timeout_seconds = self.provider.service_provider_config_cache_timeout_seconds
 94        cache_key = f"goauthentik.io/providers/scim/{self.provider.pk}/service_provider_config"
 95
 96        # Check cache first
 97        cached_config = cache.get(cache_key) if timeout_seconds > 0 else None
 98        if cached_config is not None:
 99            return cached_config
100
101        # Attempt to fetch from remote
102        path = "/ServiceProviderConfig"
103        if self.provider.compatibility_mode == SCIMCompatibilityMode.SALESFORCE:
104            path = "/ServiceProviderConfigs"
105
106        try:
107            config = ServiceProviderConfiguration.model_validate(self._request("GET", path))
108            if self.provider.compatibility_mode == SCIMCompatibilityMode.AWS:
109                config.patch.supported = False
110            if self.provider.compatibility_mode == SCIMCompatibilityMode.SLACK:
111                config.filter.supported = True
112        except (ValidationError, SCIMRequestException, NotFoundSyncException) as exc:
113            self.logger.warning(
114                "failed to get ServiceProviderConfig, using default",
115                exc=exc,
116            )
117            config = default_config
118
119        # Cache the config (either successfully fetched or default)
120        if timeout_seconds > 0:
121            cache.set(cache_key, config, timeout_seconds)
122        else:
123            cache.delete(cache_key)
124        return config

Get Service provider config