authentik.sources.oauth.views.redirect

OAuth Redirect Views

 1"""OAuth Redirect Views"""
 2
 3from typing import Any
 4
 5from django.http import Http404
 6from django.urls import reverse
 7from django.views.generic import RedirectView
 8from structlog.stdlib import get_logger
 9
10from authentik.flows.planner import PLAN_CONTEXT_PENDING_USER, FlowPlan
11from authentik.flows.stage import PLAN_CONTEXT_PENDING_USER_IDENTIFIER
12from authentik.flows.views.executor import SESSION_KEY_PLAN
13from authentik.sources.oauth.models import OAuthSource
14from authentik.sources.oauth.views.base import OAuthClientMixin
15
16LOGGER = get_logger()
17
18
19class OAuthRedirect(OAuthClientMixin, RedirectView):
20    "Redirect user to OAuth source to enable access."
21
22    permanent = False
23    params = None
24
25    def get_additional_parameters(self, source: OAuthSource) -> dict[str, Any]:
26        "Return additional redirect parameters for this source."
27        return self.params or {}
28
29    def get_callback_url(self, source: OAuthSource) -> str:
30        "Return the callback url for this source."
31        return reverse(
32            "authentik_sources_oauth:oauth-client-callback",
33            kwargs={"source_slug": source.slug},
34        )
35
36    def _try_login_hint_extract(self) -> dict[str, str]:
37        """Check if we're running in a flow and if we have a pending user, use that
38        as login_hint"""
39        params = {}
40        plan: FlowPlan = self.request.session.get(SESSION_KEY_PLAN, None)
41        if not plan:
42            return params
43        if user := plan.context.get(PLAN_CONTEXT_PENDING_USER):
44            params["login_hint"] = user.email
45        if identifier := plan.context.get(PLAN_CONTEXT_PENDING_USER_IDENTIFIER):
46            params["login_hint"] = identifier
47        return params
48
49    def get_redirect_url(self, **kwargs) -> str:
50        "Build redirect url for a given source."
51        slug = kwargs.get("source_slug", "")
52        try:
53            source: OAuthSource = OAuthSource.objects.get(slug=slug)
54        except OAuthSource.DoesNotExist:
55            raise Http404(f"Unknown OAuth source '{slug}'.") from None
56        if not source.enabled:
57            raise Http404(f"source {slug} is not enabled.")
58        client = self.get_client(source, callback=self.get_callback_url(source))
59        params = self.get_additional_parameters(source)
60        params.setdefault("scope", [])
61        if source.additional_scopes != "":
62            if source.additional_scopes.startswith("*"):
63                params["scope"] = source.additional_scopes[1:].split(" ")
64            else:
65                params["scope"] += source.additional_scopes.split(" ")
66        params.update(self._try_login_hint_extract())
67        return client.get_redirect_url(params)
LOGGER = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
class OAuthRedirect(authentik.sources.oauth.views.base.OAuthClientMixin, django.views.generic.base.RedirectView):
20class OAuthRedirect(OAuthClientMixin, RedirectView):
21    "Redirect user to OAuth source to enable access."
22
23    permanent = False
24    params = None
25
26    def get_additional_parameters(self, source: OAuthSource) -> dict[str, Any]:
27        "Return additional redirect parameters for this source."
28        return self.params or {}
29
30    def get_callback_url(self, source: OAuthSource) -> str:
31        "Return the callback url for this source."
32        return reverse(
33            "authentik_sources_oauth:oauth-client-callback",
34            kwargs={"source_slug": source.slug},
35        )
36
37    def _try_login_hint_extract(self) -> dict[str, str]:
38        """Check if we're running in a flow and if we have a pending user, use that
39        as login_hint"""
40        params = {}
41        plan: FlowPlan = self.request.session.get(SESSION_KEY_PLAN, None)
42        if not plan:
43            return params
44        if user := plan.context.get(PLAN_CONTEXT_PENDING_USER):
45            params["login_hint"] = user.email
46        if identifier := plan.context.get(PLAN_CONTEXT_PENDING_USER_IDENTIFIER):
47            params["login_hint"] = identifier
48        return params
49
50    def get_redirect_url(self, **kwargs) -> str:
51        "Build redirect url for a given source."
52        slug = kwargs.get("source_slug", "")
53        try:
54            source: OAuthSource = OAuthSource.objects.get(slug=slug)
55        except OAuthSource.DoesNotExist:
56            raise Http404(f"Unknown OAuth source '{slug}'.") from None
57        if not source.enabled:
58            raise Http404(f"source {slug} is not enabled.")
59        client = self.get_client(source, callback=self.get_callback_url(source))
60        params = self.get_additional_parameters(source)
61        params.setdefault("scope", [])
62        if source.additional_scopes != "":
63            if source.additional_scopes.startswith("*"):
64                params["scope"] = source.additional_scopes[1:].split(" ")
65            else:
66                params["scope"] += source.additional_scopes.split(" ")
67        params.update(self._try_login_hint_extract())
68        return client.get_redirect_url(params)

Redirect user to OAuth source to enable access.

permanent = False
params = None
def get_additional_parameters( self, source: authentik.sources.oauth.models.OAuthSource) -> dict[str, typing.Any]:
26    def get_additional_parameters(self, source: OAuthSource) -> dict[str, Any]:
27        "Return additional redirect parameters for this source."
28        return self.params or {}

Return additional redirect parameters for this source.

def get_callback_url(self, source: authentik.sources.oauth.models.OAuthSource) -> str:
30    def get_callback_url(self, source: OAuthSource) -> str:
31        "Return the callback url for this source."
32        return reverse(
33            "authentik_sources_oauth:oauth-client-callback",
34            kwargs={"source_slug": source.slug},
35        )

Return the callback url for this source.

def get_redirect_url(self, **kwargs) -> str:
50    def get_redirect_url(self, **kwargs) -> str:
51        "Build redirect url for a given source."
52        slug = kwargs.get("source_slug", "")
53        try:
54            source: OAuthSource = OAuthSource.objects.get(slug=slug)
55        except OAuthSource.DoesNotExist:
56            raise Http404(f"Unknown OAuth source '{slug}'.") from None
57        if not source.enabled:
58            raise Http404(f"source {slug} is not enabled.")
59        client = self.get_client(source, callback=self.get_callback_url(source))
60        params = self.get_additional_parameters(source)
61        params.setdefault("scope", [])
62        if source.additional_scopes != "":
63            if source.additional_scopes.startswith("*"):
64                params["scope"] = source.additional_scopes[1:].split(" ")
65            else:
66                params["scope"] += source.additional_scopes.split(" ")
67        params.update(self._try_login_hint_extract())
68        return client.get_redirect_url(params)

Build redirect url for a given source.