authentik.enterprise.providers.scim.views

 1from datetime import timedelta
 2
 3from django.core.exceptions import PermissionDenied
 4from django.http import HttpRequest
 5from django.shortcuts import redirect
 6from django.urls import reverse
 7from django.utils.timezone import now
 8
 9from authentik.core.models import Application
10from authentik.providers.scim.models import SCIMProvider
11from authentik.sources.oauth.clients.base import BaseOAuthClient
12from authentik.sources.oauth.models import OAuthSource, UserOAuthSourceConnection
13from authentik.sources.oauth.types.registry import RequestKind, registry
14from authentik.sources.oauth.views.callback import OAuthCallback
15from authentik.sources.oauth.views.redirect import OAuthRedirect
16
17
18class SCIMOAuthViewMixin:
19
20    provider: SCIMProvider
21
22    def get_client(self, source: OAuthSource, **kwargs) -> BaseOAuthClient:
23        source: OAuthSource = self.provider.auth_oauth
24        source_cls = registry.find(source.provider_type, kind=RequestKind.CALLBACK)
25        if not source_cls.client_class:
26            return super().get_client(source, **kwargs)
27        return source_cls.client_class(source, self.request, **kwargs)
28
29    def _get_scim_provider(self, app_slug: str):
30        app = Application.objects.filter(slug=app_slug).first()
31        if not app:
32            return None
33        provider = SCIMProvider.objects.filter(backchannel_application=app)
34        return provider.first()
35
36    def dispatch(self, request: HttpRequest, application_slug: str):
37        if not request.user.is_authenticated:
38            raise PermissionDenied()
39        provider = self._get_scim_provider(application_slug)
40        if not provider or not provider.auth_oauth:
41            raise PermissionDenied()
42        if not request.user.has_perm(
43            "authentik_providers_scim.change_scimprovider",
44            provider,
45        ):
46            raise PermissionDenied()
47        self.provider = provider
48        return super().dispatch(request, source_slug=provider.auth_oauth.slug)
49
50
51class SCIMOAuthStart(SCIMOAuthViewMixin, OAuthRedirect):
52
53    def get_callback_url(self, source: OAuthSource):
54        return reverse("authentik_enterprise_providers_scim:callback", kwargs=self.kwargs)
55
56
57class SCIMRedirectCallback(SCIMOAuthViewMixin, OAuthCallback):
58
59    def redirect_flow_manager(self, client: BaseOAuthClient):
60        expires_in = int(self.token.get("expires_in", 0))
61        UserOAuthSourceConnection.objects.update_or_create(
62            source=self.provider.auth_oauth,
63            user=self.provider.auth_oauth_user,
64            defaults={
65                "access_token": self.token.get("access_token"),
66                "refresh_token": self.token.get("refresh_token"),
67                "expires": now() + timedelta(seconds=expires_in),
68                "last_updated": now(),
69            },
70        )
71        return redirect("authentik_core:if-admin")
class SCIMOAuthViewMixin:
19class SCIMOAuthViewMixin:
20
21    provider: SCIMProvider
22
23    def get_client(self, source: OAuthSource, **kwargs) -> BaseOAuthClient:
24        source: OAuthSource = self.provider.auth_oauth
25        source_cls = registry.find(source.provider_type, kind=RequestKind.CALLBACK)
26        if not source_cls.client_class:
27            return super().get_client(source, **kwargs)
28        return source_cls.client_class(source, self.request, **kwargs)
29
30    def _get_scim_provider(self, app_slug: str):
31        app = Application.objects.filter(slug=app_slug).first()
32        if not app:
33            return None
34        provider = SCIMProvider.objects.filter(backchannel_application=app)
35        return provider.first()
36
37    def dispatch(self, request: HttpRequest, application_slug: str):
38        if not request.user.is_authenticated:
39            raise PermissionDenied()
40        provider = self._get_scim_provider(application_slug)
41        if not provider or not provider.auth_oauth:
42            raise PermissionDenied()
43        if not request.user.has_perm(
44            "authentik_providers_scim.change_scimprovider",
45            provider,
46        ):
47            raise PermissionDenied()
48        self.provider = provider
49        return super().dispatch(request, source_slug=provider.auth_oauth.slug)
23    def get_client(self, source: OAuthSource, **kwargs) -> BaseOAuthClient:
24        source: OAuthSource = self.provider.auth_oauth
25        source_cls = registry.find(source.provider_type, kind=RequestKind.CALLBACK)
26        if not source_cls.client_class:
27            return super().get_client(source, **kwargs)
28        return source_cls.client_class(source, self.request, **kwargs)
def dispatch( self, request: django.http.request.HttpRequest, application_slug: str):
37    def dispatch(self, request: HttpRequest, application_slug: str):
38        if not request.user.is_authenticated:
39            raise PermissionDenied()
40        provider = self._get_scim_provider(application_slug)
41        if not provider or not provider.auth_oauth:
42            raise PermissionDenied()
43        if not request.user.has_perm(
44            "authentik_providers_scim.change_scimprovider",
45            provider,
46        ):
47            raise PermissionDenied()
48        self.provider = provider
49        return super().dispatch(request, source_slug=provider.auth_oauth.slug)
52class SCIMOAuthStart(SCIMOAuthViewMixin, OAuthRedirect):
53
54    def get_callback_url(self, source: OAuthSource):
55        return reverse("authentik_enterprise_providers_scim:callback", kwargs=self.kwargs)

Redirect user to OAuth source to enable access.

def get_callback_url(self, source: authentik.sources.oauth.models.OAuthSource):
54    def get_callback_url(self, source: OAuthSource):
55        return reverse("authentik_enterprise_providers_scim:callback", kwargs=self.kwargs)

Return the callback url for this source.

58class SCIMRedirectCallback(SCIMOAuthViewMixin, OAuthCallback):
59
60    def redirect_flow_manager(self, client: BaseOAuthClient):
61        expires_in = int(self.token.get("expires_in", 0))
62        UserOAuthSourceConnection.objects.update_or_create(
63            source=self.provider.auth_oauth,
64            user=self.provider.auth_oauth_user,
65            defaults={
66                "access_token": self.token.get("access_token"),
67                "refresh_token": self.token.get("refresh_token"),
68                "expires": now() + timedelta(seconds=expires_in),
69                "last_updated": now(),
70            },
71        )
72        return redirect("authentik_core:if-admin")

Base OAuth callback view.

def redirect_flow_manager(self, client: authentik.sources.oauth.clients.base.BaseOAuthClient):
60    def redirect_flow_manager(self, client: BaseOAuthClient):
61        expires_in = int(self.token.get("expires_in", 0))
62        UserOAuthSourceConnection.objects.update_or_create(
63            source=self.provider.auth_oauth,
64            user=self.provider.auth_oauth_user,
65            defaults={
66                "access_token": self.token.get("access_token"),
67                "refresh_token": self.token.get("refresh_token"),
68                "expires": now() + timedelta(seconds=expires_in),
69                "last_updated": now(),
70            },
71        )
72        return redirect("authentik_core:if-admin")