authentik.providers.scim.clients.base
SCIM Client
1"""SCIM Client""" 2 3from typing import TYPE_CHECKING 4 5from django.core.cache import cache 6from django.http import HttpResponseBadRequest, HttpResponseNotFound 7from pydantic import ValidationError 8from requests import RequestException, Session 9 10from authentik.lib.sync.outgoing import ( 11 HTTP_CONFLICT, 12 HTTP_NO_CONTENT, 13 HTTP_SERVICE_UNAVAILABLE, 14 HTTP_TOO_MANY_REQUESTS, 15) 16from authentik.lib.sync.outgoing.base import SAFE_METHODS, BaseOutgoingSyncClient 17from authentik.lib.sync.outgoing.exceptions import ( 18 DryRunRejected, 19 NotFoundSyncException, 20 ObjectExistsSyncException, 21 TransientSyncException, 22) 23from authentik.lib.utils.http import get_http_session 24from authentik.providers.scim.clients.exceptions import SCIMRequestException 25from authentik.providers.scim.clients.schema import ServiceProviderConfiguration 26from authentik.providers.scim.models import SCIMCompatibilityMode, SCIMProvider 27 28if TYPE_CHECKING: 29 from django.db.models import Model 30 from pydantic import BaseModel 31 32 33class SCIMClient[TModel: "Model", TConnection: "Model", TSchema: "BaseModel"]( 34 BaseOutgoingSyncClient[TModel, TConnection, TSchema, SCIMProvider] 35): 36 """SCIM Client""" 37 38 base_url: str 39 40 _session: Session 41 _config: ServiceProviderConfiguration 42 43 def __init__(self, provider: SCIMProvider): 44 super().__init__(provider) 45 self._session = get_http_session() 46 self._session.verify = provider.verify_certificates 47 self.provider = provider 48 self.auth = provider.scim_auth() 49 # Remove trailing slashes as we assume the URL doesn't have any 50 base_url = provider.url 51 if base_url.endswith("/"): 52 base_url = base_url[:-1] 53 self.base_url = base_url 54 self._config = self.get_service_provider_config() 55 56 def _request(self, method: str, path: str, **kwargs) -> dict: 57 """Wrapper to send a request to the full URL""" 58 if self.provider.dry_run and method.upper() not in SAFE_METHODS: 59 raise DryRunRejected(f"{self.base_url}{path}", method, body=kwargs.get("json")) 60 try: 61 response = self._session.request( 62 method, 63 f"{self.base_url}{path}", 64 **kwargs, 65 auth=self.auth, 66 headers={ 67 "Accept": "application/scim+json", 68 "Content-Type": "application/scim+json", 69 }, 70 ) 71 except RequestException as exc: 72 raise SCIMRequestException(message="Failed to send request") from exc 73 self.logger.debug("scim request", path=path, method=method, **kwargs) 74 if response.status_code >= HttpResponseBadRequest.status_code: 75 if response.status_code == HttpResponseNotFound.status_code: 76 raise NotFoundSyncException(response) 77 if response.status_code in [HTTP_TOO_MANY_REQUESTS, HTTP_SERVICE_UNAVAILABLE]: 78 raise TransientSyncException() 79 if response.status_code == HTTP_CONFLICT: 80 raise ObjectExistsSyncException(response) 81 self.logger.warning( 82 "Failed to send SCIM request", path=path, method=method, response=response.text 83 ) 84 raise SCIMRequestException(response) 85 if response.status_code == HTTP_NO_CONTENT: 86 return {} 87 return response.json() 88 89 def get_service_provider_config(self): 90 """Get Service provider config""" 91 default_config = ServiceProviderConfiguration.default() 92 timeout_seconds = self.provider.service_provider_config_cache_timeout_seconds 93 cache_key = f"goauthentik.io/providers/scim/{self.provider.pk}/service_provider_config" 94 95 # Check cache first 96 cached_config = cache.get(cache_key) if timeout_seconds > 0 else None 97 if cached_config is not None: 98 return cached_config 99 100 # Attempt to fetch from remote 101 path = "/ServiceProviderConfig" 102 if self.provider.compatibility_mode == SCIMCompatibilityMode.SALESFORCE: 103 path = "/ServiceProviderConfigs" 104 105 try: 106 config = ServiceProviderConfiguration.model_validate(self._request("GET", path)) 107 if self.provider.compatibility_mode == SCIMCompatibilityMode.AWS: 108 config.patch.supported = False 109 if self.provider.compatibility_mode == SCIMCompatibilityMode.SLACK: 110 config.filter.supported = True 111 except (ValidationError, SCIMRequestException, NotFoundSyncException) as exc: 112 self.logger.warning( 113 "failed to get ServiceProviderConfig, using default", 114 exc=exc, 115 ) 116 config = default_config 117 118 # Cache the config (either successfully fetched or default) 119 if timeout_seconds > 0: 120 cache.set(cache_key, config, timeout_seconds) 121 else: 122 cache.delete(cache_key) 123 return config
class
SCIMClient(authentik.lib.sync.outgoing.base.BaseOutgoingSyncClient[TModel, TConnection, TSchema, authentik.providers.scim.models.SCIMProvider], typing.Generic[TModel, TConnection, TSchema]):
34class SCIMClient[TModel: "Model", TConnection: "Model", TSchema: "BaseModel"]( 35 BaseOutgoingSyncClient[TModel, TConnection, TSchema, SCIMProvider] 36): 37 """SCIM Client""" 38 39 base_url: str 40 41 _session: Session 42 _config: ServiceProviderConfiguration 43 44 def __init__(self, provider: SCIMProvider): 45 super().__init__(provider) 46 self._session = get_http_session() 47 self._session.verify = provider.verify_certificates 48 self.provider = provider 49 self.auth = provider.scim_auth() 50 # Remove trailing slashes as we assume the URL doesn't have any 51 base_url = provider.url 52 if base_url.endswith("/"): 53 base_url = base_url[:-1] 54 self.base_url = base_url 55 self._config = self.get_service_provider_config() 56 57 def _request(self, method: str, path: str, **kwargs) -> dict: 58 """Wrapper to send a request to the full URL""" 59 if self.provider.dry_run and method.upper() not in SAFE_METHODS: 60 raise DryRunRejected(f"{self.base_url}{path}", method, body=kwargs.get("json")) 61 try: 62 response = self._session.request( 63 method, 64 f"{self.base_url}{path}", 65 **kwargs, 66 auth=self.auth, 67 headers={ 68 "Accept": "application/scim+json", 69 "Content-Type": "application/scim+json", 70 }, 71 ) 72 except RequestException as exc: 73 raise SCIMRequestException(message="Failed to send request") from exc 74 self.logger.debug("scim request", path=path, method=method, **kwargs) 75 if response.status_code >= HttpResponseBadRequest.status_code: 76 if response.status_code == HttpResponseNotFound.status_code: 77 raise NotFoundSyncException(response) 78 if response.status_code in [HTTP_TOO_MANY_REQUESTS, HTTP_SERVICE_UNAVAILABLE]: 79 raise TransientSyncException() 80 if response.status_code == HTTP_CONFLICT: 81 raise ObjectExistsSyncException(response) 82 self.logger.warning( 83 "Failed to send SCIM request", path=path, method=method, response=response.text 84 ) 85 raise SCIMRequestException(response) 86 if response.status_code == HTTP_NO_CONTENT: 87 return {} 88 return response.json() 89 90 def get_service_provider_config(self): 91 """Get Service provider config""" 92 default_config = ServiceProviderConfiguration.default() 93 timeout_seconds = self.provider.service_provider_config_cache_timeout_seconds 94 cache_key = f"goauthentik.io/providers/scim/{self.provider.pk}/service_provider_config" 95 96 # Check cache first 97 cached_config = cache.get(cache_key) if timeout_seconds > 0 else None 98 if cached_config is not None: 99 return cached_config 100 101 # Attempt to fetch from remote 102 path = "/ServiceProviderConfig" 103 if self.provider.compatibility_mode == SCIMCompatibilityMode.SALESFORCE: 104 path = "/ServiceProviderConfigs" 105 106 try: 107 config = ServiceProviderConfiguration.model_validate(self._request("GET", path)) 108 if self.provider.compatibility_mode == SCIMCompatibilityMode.AWS: 109 config.patch.supported = False 110 if self.provider.compatibility_mode == SCIMCompatibilityMode.SLACK: 111 config.filter.supported = True 112 except (ValidationError, SCIMRequestException, NotFoundSyncException) as exc: 113 self.logger.warning( 114 "failed to get ServiceProviderConfig, using default", 115 exc=exc, 116 ) 117 config = default_config 118 119 # Cache the config (either successfully fetched or default) 120 if timeout_seconds > 0: 121 cache.set(cache_key, config, timeout_seconds) 122 else: 123 cache.delete(cache_key) 124 return config
SCIM Client
SCIMClient(provider: authentik.providers.scim.models.SCIMProvider)
44 def __init__(self, provider: SCIMProvider): 45 super().__init__(provider) 46 self._session = get_http_session() 47 self._session.verify = provider.verify_certificates 48 self.provider = provider 49 self.auth = provider.scim_auth() 50 # Remove trailing slashes as we assume the URL doesn't have any 51 base_url = provider.url 52 if base_url.endswith("/"): 53 base_url = base_url[:-1] 54 self.base_url = base_url 55 self._config = self.get_service_provider_config()
def
get_service_provider_config(self):
90 def get_service_provider_config(self): 91 """Get Service provider config""" 92 default_config = ServiceProviderConfiguration.default() 93 timeout_seconds = self.provider.service_provider_config_cache_timeout_seconds 94 cache_key = f"goauthentik.io/providers/scim/{self.provider.pk}/service_provider_config" 95 96 # Check cache first 97 cached_config = cache.get(cache_key) if timeout_seconds > 0 else None 98 if cached_config is not None: 99 return cached_config 100 101 # Attempt to fetch from remote 102 path = "/ServiceProviderConfig" 103 if self.provider.compatibility_mode == SCIMCompatibilityMode.SALESFORCE: 104 path = "/ServiceProviderConfigs" 105 106 try: 107 config = ServiceProviderConfiguration.model_validate(self._request("GET", path)) 108 if self.provider.compatibility_mode == SCIMCompatibilityMode.AWS: 109 config.patch.supported = False 110 if self.provider.compatibility_mode == SCIMCompatibilityMode.SLACK: 111 config.filter.supported = True 112 except (ValidationError, SCIMRequestException, NotFoundSyncException) as exc: 113 self.logger.warning( 114 "failed to get ServiceProviderConfig, using default", 115 exc=exc, 116 ) 117 config = default_config 118 119 # Cache the config (either successfully fetched or default) 120 if timeout_seconds > 0: 121 cache.set(cache_key, config, timeout_seconds) 122 else: 123 cache.delete(cache_key) 124 return config
Get Service provider config