authentik.enterprise.providers.scim.views
1from datetime import timedelta 2 3from django.core.exceptions import PermissionDenied 4from django.http import HttpRequest 5from django.shortcuts import redirect 6from django.urls import reverse 7from django.utils.timezone import now 8 9from authentik.core.models import Application 10from authentik.providers.scim.models import SCIMProvider 11from authentik.sources.oauth.clients.base import BaseOAuthClient 12from authentik.sources.oauth.models import OAuthSource, UserOAuthSourceConnection 13from authentik.sources.oauth.types.registry import RequestKind, registry 14from authentik.sources.oauth.views.callback import OAuthCallback 15from authentik.sources.oauth.views.redirect import OAuthRedirect 16 17 18class SCIMOAuthViewMixin: 19 20 provider: SCIMProvider 21 22 def get_client(self, source: OAuthSource, **kwargs) -> BaseOAuthClient: 23 source: OAuthSource = self.provider.auth_oauth 24 source_cls = registry.find(source.provider_type, kind=RequestKind.CALLBACK) 25 if not source_cls.client_class: 26 return super().get_client(source, **kwargs) 27 return source_cls.client_class(source, self.request, **kwargs) 28 29 def _get_scim_provider(self, app_slug: str): 30 app = Application.objects.filter(slug=app_slug).first() 31 if not app: 32 return None 33 provider = SCIMProvider.objects.filter(backchannel_application=app) 34 return provider.first() 35 36 def dispatch(self, request: HttpRequest, application_slug: str): 37 if not request.user.is_authenticated: 38 raise PermissionDenied() 39 provider = self._get_scim_provider(application_slug) 40 if not provider or not provider.auth_oauth: 41 raise PermissionDenied() 42 if not request.user.has_perm( 43 "authentik_providers_scim.change_scimprovider", 44 provider, 45 ): 46 raise PermissionDenied() 47 self.provider = provider 48 return super().dispatch(request, source_slug=provider.auth_oauth.slug) 49 50 51class SCIMOAuthStart(SCIMOAuthViewMixin, OAuthRedirect): 52 53 def get_callback_url(self, source: OAuthSource): 54 return reverse("authentik_enterprise_providers_scim:callback", kwargs=self.kwargs) 55 56 57class SCIMRedirectCallback(SCIMOAuthViewMixin, OAuthCallback): 58 59 def redirect_flow_manager(self, client: BaseOAuthClient): 60 expires_in = int(self.token.get("expires_in", 0)) 61 UserOAuthSourceConnection.objects.update_or_create( 62 source=self.provider.auth_oauth, 63 user=self.provider.auth_oauth_user, 64 defaults={ 65 "access_token": self.token.get("access_token"), 66 "refresh_token": self.token.get("refresh_token"), 67 "expires": now() + timedelta(seconds=expires_in), 68 "last_updated": now(), 69 }, 70 ) 71 return redirect("authentik_core:if-admin")
class
SCIMOAuthViewMixin:
19class SCIMOAuthViewMixin: 20 21 provider: SCIMProvider 22 23 def get_client(self, source: OAuthSource, **kwargs) -> BaseOAuthClient: 24 source: OAuthSource = self.provider.auth_oauth 25 source_cls = registry.find(source.provider_type, kind=RequestKind.CALLBACK) 26 if not source_cls.client_class: 27 return super().get_client(source, **kwargs) 28 return source_cls.client_class(source, self.request, **kwargs) 29 30 def _get_scim_provider(self, app_slug: str): 31 app = Application.objects.filter(slug=app_slug).first() 32 if not app: 33 return None 34 provider = SCIMProvider.objects.filter(backchannel_application=app) 35 return provider.first() 36 37 def dispatch(self, request: HttpRequest, application_slug: str): 38 if not request.user.is_authenticated: 39 raise PermissionDenied() 40 provider = self._get_scim_provider(application_slug) 41 if not provider or not provider.auth_oauth: 42 raise PermissionDenied() 43 if not request.user.has_perm( 44 "authentik_providers_scim.change_scimprovider", 45 provider, 46 ): 47 raise PermissionDenied() 48 self.provider = provider 49 return super().dispatch(request, source_slug=provider.auth_oauth.slug)
def
get_client( self, source: authentik.sources.oauth.models.OAuthSource, **kwargs) -> authentik.sources.oauth.clients.base.BaseOAuthClient:
23 def get_client(self, source: OAuthSource, **kwargs) -> BaseOAuthClient: 24 source: OAuthSource = self.provider.auth_oauth 25 source_cls = registry.find(source.provider_type, kind=RequestKind.CALLBACK) 26 if not source_cls.client_class: 27 return super().get_client(source, **kwargs) 28 return source_cls.client_class(source, self.request, **kwargs)
def
dispatch( self, request: django.http.request.HttpRequest, application_slug: str):
37 def dispatch(self, request: HttpRequest, application_slug: str): 38 if not request.user.is_authenticated: 39 raise PermissionDenied() 40 provider = self._get_scim_provider(application_slug) 41 if not provider or not provider.auth_oauth: 42 raise PermissionDenied() 43 if not request.user.has_perm( 44 "authentik_providers_scim.change_scimprovider", 45 provider, 46 ): 47 raise PermissionDenied() 48 self.provider = provider 49 return super().dispatch(request, source_slug=provider.auth_oauth.slug)
52class SCIMOAuthStart(SCIMOAuthViewMixin, OAuthRedirect): 53 54 def get_callback_url(self, source: OAuthSource): 55 return reverse("authentik_enterprise_providers_scim:callback", kwargs=self.kwargs)
Redirect user to OAuth source to enable access.
class
SCIMRedirectCallback(SCIMOAuthViewMixin, authentik.sources.oauth.views.callback.OAuthCallback):
58class SCIMRedirectCallback(SCIMOAuthViewMixin, OAuthCallback): 59 60 def redirect_flow_manager(self, client: BaseOAuthClient): 61 expires_in = int(self.token.get("expires_in", 0)) 62 UserOAuthSourceConnection.objects.update_or_create( 63 source=self.provider.auth_oauth, 64 user=self.provider.auth_oauth_user, 65 defaults={ 66 "access_token": self.token.get("access_token"), 67 "refresh_token": self.token.get("refresh_token"), 68 "expires": now() + timedelta(seconds=expires_in), 69 "last_updated": now(), 70 }, 71 ) 72 return redirect("authentik_core:if-admin")
Base OAuth callback view.
60 def redirect_flow_manager(self, client: BaseOAuthClient): 61 expires_in = int(self.token.get("expires_in", 0)) 62 UserOAuthSourceConnection.objects.update_or_create( 63 source=self.provider.auth_oauth, 64 user=self.provider.auth_oauth_user, 65 defaults={ 66 "access_token": self.token.get("access_token"), 67 "refresh_token": self.token.get("refresh_token"), 68 "expires": now() + timedelta(seconds=expires_in), 69 "last_updated": now(), 70 }, 71 ) 72 return redirect("authentik_core:if-admin")