authentik.providers.oauth2.views.authorize

authentik OAuth2 Authorization views

  1"""authentik OAuth2 Authorization views"""
  2
  3from dataclasses import InitVar, dataclass, field
  4from datetime import timedelta
  5from json import dumps
  6from re import error as RegexError
  7from re import fullmatch
  8from typing import Any
  9from urllib.parse import parse_qs, quote, urlencode, urlparse, urlsplit, urlunparse, urlunsplit
 10from uuid import uuid4
 11
 12from django.conf import settings
 13from django.http import HttpRequest, HttpResponse, HttpResponseRedirect
 14from django.http.response import Http404, HttpResponseBadRequest
 15from django.shortcuts import get_object_or_404
 16from django.utils import timezone, translation
 17from django.utils.translation import gettext as _
 18from structlog.stdlib import get_logger
 19
 20from authentik.common.oauth.constants import (
 21    FORBIDDEN_URI_SCHEMES,
 22    PKCE_METHOD_PLAIN,
 23    PKCE_METHOD_S256,
 24    PROMPT_CONSENT,
 25    PROMPT_LOGIN,
 26    PROMPT_NONE,
 27    QS_LOGIN_HINT,
 28    SCOPE_GITHUB,
 29    SCOPE_OFFLINE_ACCESS,
 30    SCOPE_OPENID,
 31    TOKEN_TYPE,
 32    UI_LOCALES,
 33)
 34from authentik.core.models import Application
 35from authentik.events.models import Event, EventAction
 36from authentik.events.signals import get_login_event
 37from authentik.flows.challenge import (
 38    PLAN_CONTEXT_TITLE,
 39    AutosubmitChallenge,
 40    HttpChallengeResponse,
 41)
 42from authentik.flows.exceptions import FlowNonApplicableException
 43from authentik.flows.models import Flow, in_memory_stage
 44from authentik.flows.planner import PLAN_CONTEXT_APPLICATION, PLAN_CONTEXT_SSO, FlowPlanner
 45from authentik.flows.stage import PLAN_CONTEXT_PENDING_USER_IDENTIFIER, StageView
 46from authentik.lib.utils.time import timedelta_from_string
 47from authentik.lib.views import bad_request_message
 48from authentik.policies.types import PolicyRequest
 49from authentik.policies.views import PolicyAccessView, RequestValidationError
 50from authentik.providers.oauth2.errors import (
 51    AuthorizeError,
 52    ClientIdError,
 53    OAuth2Error,
 54    RedirectUriError,
 55)
 56from authentik.providers.oauth2.id_token import IDToken
 57from authentik.providers.oauth2.models import (
 58    AccessToken,
 59    AuthorizationCode,
 60    GrantType,
 61    OAuth2Provider,
 62    RedirectURIMatchingMode,
 63    ResponseMode,
 64    ResponseTypes,
 65    ScopeMapping,
 66)
 67from authentik.providers.oauth2.utils import HttpResponseRedirectScheme
 68from authentik.providers.oauth2.views.userinfo import UserInfoView
 69from authentik.stages.consent.models import ConsentMode, ConsentStage
 70from authentik.stages.consent.stage import (
 71    PLAN_CONTEXT_CONSENT_HEADER,
 72    PLAN_CONTEXT_CONSENT_PERMISSIONS,
 73)
 74
 75LOGGER = get_logger()
 76
 77PLAN_CONTEXT_PARAMS = "goauthentik.io/providers/oauth2/params"
 78SESSION_KEY_LAST_LOGIN_UID = "authentik/providers/oauth2/last_login_uid"
 79
 80ALLOWED_PROMPT_PARAMS = {PROMPT_NONE, PROMPT_CONSENT, PROMPT_LOGIN}
 81
 82
 83@dataclass(slots=True)
 84class OAuthAuthorizationParams:
 85    """Parameters required to authorize an OAuth Client"""
 86
 87    client_id: str
 88    redirect_uri: str
 89    response_type: str
 90    response_mode: str | None
 91    scope: set[str]
 92    state: str
 93    nonce: str | None
 94    prompt: set[str]
 95    grant_type: str
 96
 97    provider: OAuth2Provider = field(default_factory=OAuth2Provider)
 98
 99    request: str | None = None
100
101    max_age: int | None = None
102
103    code_challenge: str | None = None
104    code_challenge_method: str | None = None
105
106    github_compat: InitVar[bool] = False
107
108    @staticmethod
109    def from_request(request: HttpRequest, github_compat=False) -> OAuthAuthorizationParams:
110        """
111        Get all the params used by the Authorization Code Flow
112        (and also for the Implicit and Hybrid).
113
114        See: http://openid.net/specs/openid-connect-core-1_0.html#AuthRequest
115        """
116        # Because in this endpoint we handle both GET
117        # and POST request.
118        query_dict = request.POST if request.method == "POST" else request.GET
119        state = query_dict.get("state")
120        redirect_uri = query_dict.get("redirect_uri", "")
121
122        response_type = query_dict.get("response_type", "")
123
124        # Validate and check the response_mode against the predefined dict
125        # Set to Query or Fragment if not defined in request
126        response_mode = query_dict.get("response_mode", False)
127
128        max_age = query_dict.get("max_age")
129        return OAuthAuthorizationParams(
130            client_id=query_dict.get("client_id", ""),
131            redirect_uri=redirect_uri,
132            response_type=response_type,
133            response_mode=response_mode,
134            grant_type="",
135            scope=set(query_dict.get("scope", "").split()),
136            state=state,
137            nonce=query_dict.get("nonce"),
138            prompt=ALLOWED_PROMPT_PARAMS.intersection(set(query_dict.get("prompt", "").split())),
139            request=query_dict.get("request", None),
140            max_age=int(max_age) if max_age else None,
141            code_challenge=query_dict.get("code_challenge"),
142            code_challenge_method=query_dict.get("code_challenge_method", "plain"),
143            github_compat=github_compat,
144        )
145
146    def __post_init__(self, github_compat=False):
147        self.provider: OAuth2Provider = OAuth2Provider.objects.filter(
148            client_id=self.client_id
149        ).first()
150        if not self.provider:
151            LOGGER.warning("Invalid client identifier", client_id=self.client_id)
152            raise ClientIdError(client_id=self.client_id)
153        self.check_redirect_uri()
154        self.check_grant()
155        self.check_scope(github_compat)
156        if self.request:
157            raise AuthorizeError(
158                self.redirect_uri, "request_not_supported", self.grant_type, self.state
159            )
160        self.check_nonce()
161        self.check_code_challenge()
162
163    def check_grant(self):
164        """Check grant"""
165        # Determine which flow to use.
166        if self.response_type in [ResponseTypes.CODE]:
167            self.grant_type = GrantType.AUTHORIZATION_CODE
168        elif self.response_type in [
169            ResponseTypes.ID_TOKEN,
170            ResponseTypes.ID_TOKEN_TOKEN,
171        ]:
172            self.grant_type = GrantType.IMPLICIT
173        elif self.response_type in [
174            ResponseTypes.CODE_TOKEN,
175            ResponseTypes.CODE_ID_TOKEN,
176            ResponseTypes.CODE_ID_TOKEN_TOKEN,
177        ]:
178            self.grant_type = GrantType.HYBRID
179        # Grant type validation.
180        if not self.grant_type:
181            LOGGER.warning("Invalid response type", type=self.response_type)
182            raise AuthorizeError(self.redirect_uri, "unsupported_response_type", "", self.state)
183
184        if self.grant_type not in self.provider.grant_types:
185            LOGGER.warning("Invalid grant_type for provider", grant_type=self.grant_type)
186            raise AuthorizeError(self.redirect_uri, "invalid_request", self.grant_type, self.state)
187
188        if self.response_mode not in ResponseMode.values:
189            self.response_mode = ResponseMode.QUERY
190
191            if self.grant_type in [GrantType.IMPLICIT, GrantType.HYBRID]:
192                self.response_mode = ResponseMode.FRAGMENT
193
194    def check_redirect_uri(self):
195        """Redirect URI validation."""
196        allowed_redirect_urls = self.provider.authorization_redirect_uris
197        if not self.redirect_uri:
198            LOGGER.warning("Missing redirect uri.")
199            raise RedirectUriError("", allowed_redirect_urls).with_cause("redirect_uri_missing")
200
201        match_found = False
202        for allowed in allowed_redirect_urls:
203            if allowed.matching_mode == RedirectURIMatchingMode.STRICT:
204                if self.redirect_uri == allowed.url:
205                    match_found = True
206                    break
207            if allowed.matching_mode == RedirectURIMatchingMode.REGEX:
208                try:
209                    if fullmatch(allowed.url, self.redirect_uri):
210                        match_found = True
211                        break
212                except RegexError as exc:
213                    LOGGER.warning(
214                        "Failed to parse regular expression",
215                        exc=exc,
216                        url=allowed.url,
217                        provider=self.provider,
218                    )
219        if not match_found:
220            raise RedirectUriError(self.redirect_uri, allowed_redirect_urls).with_cause(
221                "redirect_uri_no_match"
222            )
223        # Check against forbidden schemes
224        if urlparse(self.redirect_uri).scheme in FORBIDDEN_URI_SCHEMES:
225            raise RedirectUriError(self.redirect_uri, allowed_redirect_urls).with_cause(
226                "redirect_uri_forbidden_scheme"
227            )
228
229    def check_scope(self, github_compat=False):
230        """Ensure openid scope is set in Hybrid flows, or when requesting an id_token"""
231        default_scope_names = set(
232            ScopeMapping.objects.filter(provider__in=[self.provider]).values_list(
233                "scope_name", flat=True
234            )
235        )
236        if len(self.scope) == 0:
237            self.scope = default_scope_names
238            LOGGER.info(
239                "No scopes requested, defaulting to all configured scopes", scopes=self.scope
240            )
241        scopes_to_check = self.scope
242        if github_compat:
243            scopes_to_check = self.scope - SCOPE_GITHUB
244        if not scopes_to_check.issubset(default_scope_names):
245            LOGGER.info(
246                "Application requested scopes not configured, setting to overlap",
247                scope_allowed=default_scope_names,
248                scope_given=self.scope,
249            )
250            self.scope = self.scope.intersection(default_scope_names)
251        if SCOPE_OPENID not in self.scope and (
252            self.grant_type == GrantType.HYBRID
253            or self.response_type in [ResponseTypes.ID_TOKEN, ResponseTypes.ID_TOKEN_TOKEN]
254        ):
255            LOGGER.warning("Missing 'openid' scope.")
256            raise AuthorizeError(
257                self.redirect_uri, "invalid_scope", self.grant_type, self.state
258            ).with_cause("scope_openid_missing")
259        if SCOPE_OFFLINE_ACCESS in self.scope:
260            # https://openid.net/specs/openid-connect-core-1_0.html#OfflineAccess
261            # Don't explicitly request consent with offline_access, as the spec allows for
262            # "other conditions for processing the request permitting offline access to the
263            # requested resources are in place"
264            # which we interpret as "the admin picks an authorization flow with or without consent"
265            if self.response_type not in [
266                ResponseTypes.CODE,
267                ResponseTypes.CODE_TOKEN,
268                ResponseTypes.CODE_ID_TOKEN,
269                ResponseTypes.CODE_ID_TOKEN_TOKEN,
270            ]:
271                # offline_access requires a response type that has some sort of token
272                # Spec says to ignore the scope when the response_type wouldn't result
273                # in an authorization code being generated
274                self.scope.remove(SCOPE_OFFLINE_ACCESS)
275
276    def check_nonce(self):
277        """Nonce parameter validation."""
278        # nonce is required for all flows that return an id_token from the authorization endpoint,
279        # see https://openid.net/specs/openid-connect-core-1_0.html#ImplicitAuthRequest or
280        # https://openid.net/specs/openid-connect-core-1_0.html#HybridIDToken and
281        # https://bitbucket.org/openid/connect/issues/972/nonce-requirement-in-hybrid-auth-request
282        if self.response_type not in [
283            ResponseTypes.ID_TOKEN,
284            ResponseTypes.ID_TOKEN_TOKEN,
285            ResponseTypes.CODE_ID_TOKEN,
286            ResponseTypes.CODE_ID_TOKEN_TOKEN,
287        ]:
288            return
289        if SCOPE_OPENID not in self.scope:
290            return
291        if not self.nonce:
292            LOGGER.warning("Missing nonce for OpenID Request")
293            raise AuthorizeError(
294                self.redirect_uri, "invalid_request", self.grant_type, self.state
295            ).with_cause("nonce_missing")
296
297    def check_code_challenge(self):
298        """PKCE validation of the transformation method."""
299        if self.code_challenge and self.code_challenge_method not in [
300            PKCE_METHOD_PLAIN,
301            PKCE_METHOD_S256,
302        ]:
303            raise AuthorizeError(
304                self.redirect_uri,
305                "invalid_request",
306                self.grant_type,
307                self.state,
308                f"Unsupported challenge method {self.code_challenge_method}",
309            )
310
311    def create_code(self, request: HttpRequest) -> AuthorizationCode:
312        """Create an AuthorizationCode object for the request"""
313        auth_event = get_login_event(request)
314
315        now = timezone.now()
316
317        code = AuthorizationCode(
318            user=request.user,
319            provider=self.provider,
320            auth_time=auth_event.created if auth_event else now,
321            code=uuid4().hex,
322            expires=now + timedelta_from_string(self.provider.access_code_validity),
323            scope=self.scope,
324            nonce=self.nonce,
325            session=request.session["authenticatedsession"],
326        )
327
328        if self.code_challenge and self.code_challenge_method:
329            code.code_challenge = self.code_challenge
330            code.code_challenge_method = self.code_challenge_method
331
332        return code
333
334
335class AuthorizationFlowInitView(PolicyAccessView):
336    """OAuth2 Flow initializer, checks access to application and starts flow"""
337
338    params: OAuthAuthorizationParams
339    # Enable GitHub compatibility (only allow for scopes which are handled
340    # differently for github compat)
341    github_compat = False
342
343    def pre_permission_check(self):
344        """Check prompt parameter before checking permission/authentication,
345        see https://openid.net/specs/openid-connect-core-1_0.html#rfc.section.3.1.2.6"""
346        # Quick sanity check at the beginning to prevent event spamming
347        if len(self.request.GET) < 1:
348            raise Http404
349        try:
350            self.params = OAuthAuthorizationParams.from_request(
351                self.request, github_compat=self.github_compat
352            )
353        except AuthorizeError as error:
354            LOGGER.warning(error.description, redirect_uri=error.redirect_uri, cause=error.cause)
355            raise RequestValidationError(error.get_response(self.request)) from None
356        except OAuth2Error as error:
357            LOGGER.warning(error.description, cause=error.cause)
358            raise RequestValidationError(
359                bad_request_message(self.request, error.description, title=error.error)
360            ) from None
361        except OAuth2Provider.DoesNotExist:
362            raise Http404 from None
363        if PROMPT_NONE in self.params.prompt and not self.request.user.is_authenticated:
364            # When "prompt" is set to "none" but the user is not logged in, show an error message
365            error = AuthorizeError(
366                self.params.redirect_uri,
367                "login_required",
368                self.params.grant_type,
369                self.params.state,
370            )
371            raise RequestValidationError(error.get_response(self.request))
372
373    def resolve_provider_application(self):
374        client_id = self.request.GET.get("client_id")
375        self.provider = get_object_or_404(OAuth2Provider, client_id=client_id)
376        self.application = self.provider.application
377
378    def modify_flow_context(self, flow: Flow, context: dict[str, Any]) -> dict[str, Any]:
379        if QS_LOGIN_HINT in self.request.GET:
380            context[PLAN_CONTEXT_PENDING_USER_IDENTIFIER] = self.request.GET.get(QS_LOGIN_HINT)
381        return super().modify_flow_context(flow, context)
382
383    def modify_policy_request(self, request: PolicyRequest) -> PolicyRequest:
384        request.context["oauth_scopes"] = self.params.scope
385        request.context["oauth_grant_type"] = self.params.grant_type
386        request.context["oauth_code_challenge"] = self.params.code_challenge
387        request.context["oauth_code_challenge_method"] = self.params.code_challenge_method
388        request.context["oauth_max_age"] = self.params.max_age
389        request.context["oauth_redirect_uri"] = self.params.redirect_uri
390        request.context["oauth_response_type"] = self.params.response_type
391        return request
392
393    def dispatch_with_language(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
394        """Activate language from OIDC specific ui_locales parameter, picking the earliest one
395        available"""
396        selected_language = None
397        if UI_LOCALES in self.request.GET:
398            languages = str(self.request.GET[UI_LOCALES]).split(" ")
399            for language in languages:
400                if translation.check_for_language(language):
401                    selected_language = translation.get_supported_language_variant(language)
402                    LOGGER.debug(
403                        "Activating language from oidc ui_locales", locale=selected_language
404                    )
405                    break
406            translation.activate(selected_language)
407        response = super().dispatch(request, *args, **kwargs)
408        if selected_language:
409            response.set_cookie(
410                settings.LANGUAGE_COOKIE_NAME,
411                selected_language,
412                max_age=settings.LANGUAGE_COOKIE_AGE,
413                path=settings.LANGUAGE_COOKIE_PATH,
414                domain=settings.LANGUAGE_COOKIE_DOMAIN,
415                secure=settings.LANGUAGE_COOKIE_SECURE,
416                httponly=settings.LANGUAGE_COOKIE_HTTPONLY,
417                samesite=settings.LANGUAGE_COOKIE_SAMESITE,
418            )
419            if isinstance(response, HttpResponseRedirect):
420                parsed_url = urlparse(response.url)
421                args = parse_qs(parsed_url.query)
422                args["locale"] = selected_language
423                response["Location"] = urlunparse(
424                    parsed_url._replace(query=urlencode(args, quote_via=quote, doseq=True))
425                )
426        return response
427
428    def dispatch(self, request: HttpRequest, *args, **kwargs):
429        # Activate language before parsing params (error messages should be localized)
430        return self.dispatch_with_language(request, *args, **kwargs)
431
432    def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
433        """Start FlowPLanner, return to flow executor shell"""
434        # Require a login event to be set, otherwise make the user re-login
435        login_event = get_login_event(request)
436        if not login_event:
437            LOGGER.warning("request with no login event")
438            return self.handle_no_permission()
439        login_uid = str(login_event.pk)
440        # After we've checked permissions, and the user has access, check if we need
441        # to re-authenticate the user
442        if self.params.max_age:
443            # Attempt to check via the session's login event if set, otherwise we can't
444            # check
445            login_time = login_event.created
446            current_age: timedelta = timezone.now() - login_time
447            if current_age.total_seconds() > self.params.max_age:
448                LOGGER.debug(
449                    "Triggering authentication as max_age requirement",
450                    max_age=self.params.max_age,
451                    ago=int(current_age.total_seconds()),
452                )
453                # Since we already need to re-authenticate the user, set the old login UID
454                # in case this request has both max_age and prompt=login
455                self.request.session[SESSION_KEY_LAST_LOGIN_UID] = login_uid
456                return self.handle_no_permission()
457        # If prompt=login, we need to re-authenticate the user regardless
458        # Check if we're not already doing the re-authentication
459        if PROMPT_LOGIN in self.params.prompt:
460            # No previous login UID saved, so save the current uid and trigger
461            # re-login, or previous login UID matches current one, so no re-login happened yet
462            if (
463                SESSION_KEY_LAST_LOGIN_UID not in self.request.session
464                or login_uid == self.request.session[SESSION_KEY_LAST_LOGIN_UID]
465            ):
466                self.request.session[SESSION_KEY_LAST_LOGIN_UID] = login_uid
467                return self.handle_no_permission()
468        scope_descriptions = UserInfoView().get_scope_descriptions(
469            self.params.scope, self.params.provider
470        )
471        # Regardless, we start the planner and return to it
472        planner = FlowPlanner(self.provider.authorization_flow)
473        planner.allow_empty_flows = True
474        try:
475            plan = planner.plan(
476                self.request,
477                {
478                    PLAN_CONTEXT_SSO: True,
479                    PLAN_CONTEXT_APPLICATION: self.application,
480                    # OAuth2 related params
481                    PLAN_CONTEXT_PARAMS: self.params,
482                    # Consent related params
483                    PLAN_CONTEXT_CONSENT_HEADER: _("You're about to sign into %(application)s.")
484                    % {"application": self.application.name},
485                    PLAN_CONTEXT_CONSENT_PERMISSIONS: scope_descriptions,
486                },
487            )
488        except FlowNonApplicableException:
489            return self.handle_no_permission_authenticated()
490        # OpenID clients can specify a `prompt` parameter, and if its set to consent we
491        # need to inject a consent stage
492        if PROMPT_CONSENT in self.params.prompt:
493            if not any(isinstance(x.stage, ConsentStage) for x in plan.bindings):
494                # Plan does not have any consent stage, so we add an in-memory one
495                stage = ConsentStage(
496                    name="OAuth2 Provider In-memory consent stage",
497                    mode=ConsentMode.ALWAYS_REQUIRE,
498                )
499                plan.append_stage(stage)
500
501        plan.append_stage(in_memory_stage(OAuthFulfillmentStage))
502
503        return plan.to_redirect(
504            self.request,
505            self.provider.authorization_flow,
506            # We can only skip the flow executor and directly go to the final redirect URL if
507            #  we can submit the data to the RP via URL
508            allowed_silent_types=(
509                [OAuthFulfillmentStage]
510                if self.params.response_mode in [ResponseMode.QUERY, ResponseMode.FRAGMENT]
511                else []
512            ),
513        )
514
515
516class OAuthFulfillmentStage(StageView):
517    """Final stage, restores params from Flow."""
518
519    params: OAuthAuthorizationParams
520    provider: OAuth2Provider
521    application: Application
522
523    def redirect(self, uri: str) -> HttpResponse:
524        """Redirect using HttpResponseRedirectScheme, compatible with non-http schemes"""
525        parsed = urlparse(uri)
526
527        if self.params.response_mode == ResponseMode.FORM_POST:
528            # parse_qs returns a dictionary with values wrapped in lists, however
529            # we need a flat dictionary for the autosubmit challenge
530
531            # this picks the first item in the list if the value is a list,
532            # otherwise just the value as-is
533            query_params = dict(
534                (k, v[0] if isinstance(v, list) else v) for k, v in parse_qs(parsed.query).items()
535            )
536
537            challenge = AutosubmitChallenge(
538                data={
539                    "component": "ak-stage-autosubmit",
540                    "title": self.executor.plan.context.get(
541                        PLAN_CONTEXT_TITLE,
542                        _("Redirecting to {app}...".format_map({"app": self.application.name})),
543                    ),
544                    "url": self.params.redirect_uri,
545                    "attrs": query_params,
546                }
547            )
548
549            challenge.is_valid()
550            self.executor.stage_ok()
551            return HttpChallengeResponse(
552                challenge=challenge,
553            )
554        self.executor.stage_ok()
555        return HttpResponseRedirectScheme(uri, allowed_schemes=[parsed.scheme])
556
557    def post(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
558        """Wrapper when this stage gets hit with a post request"""
559        return self.get(request, *args, **kwargs)
560
561    def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
562        """final Stage of an OAuth2 Flow"""
563        if PLAN_CONTEXT_PARAMS not in self.executor.plan.context:
564            LOGGER.warning("Got to fulfillment stage with no pending context")
565            return HttpResponseBadRequest()
566        self.params: OAuthAuthorizationParams = self.executor.plan.context.pop(PLAN_CONTEXT_PARAMS)
567        self.application: Application = self.executor.plan.context.pop(PLAN_CONTEXT_APPLICATION)
568        self.provider = get_object_or_404(OAuth2Provider, pk=self.application.provider_id)
569        try:
570            # At this point we don't need to check permissions anymore
571            if {PROMPT_NONE, PROMPT_CONSENT}.issubset(self.params.prompt):
572                raise AuthorizeError(
573                    self.params.redirect_uri,
574                    "consent_required",
575                    self.params.grant_type,
576                    self.params.state,
577                )
578            Event.new(
579                EventAction.AUTHORIZE_APPLICATION,
580                authorized_application=self.application,
581                flow=self.executor.plan.flow_pk,
582                scopes=" ".join(self.params.scope),
583            ).from_http(self.request)
584            return self.redirect(self.create_response_uri())
585        except (ClientIdError, RedirectUriError) as error:
586            error.to_event(application=self.application).from_http(request)
587            self.executor.stage_invalid()
588
589            return bad_request_message(request, error.description, title=error.error)
590        except AuthorizeError as error:
591            error.to_event(application=self.application).from_http(request)
592            self.executor.stage_invalid()
593            return error.get_response(self.request)
594
595    def create_response_uri(self) -> str:
596        """Create a final Response URI the user is redirected to."""
597        uri = urlsplit(self.params.redirect_uri)
598
599        try:
600            code = None
601
602            if self.params.grant_type in [
603                GrantType.AUTHORIZATION_CODE,
604                GrantType.HYBRID,
605            ]:
606                code = self.params.create_code(self.request)
607                code.save()
608
609            if self.params.response_mode == ResponseMode.QUERY:
610                query_params = parse_qs(uri.query)
611                query_params["code"] = code.code
612                query_params["state"] = [str(self.params.state) if self.params.state else ""]
613
614                uri = uri._replace(query=urlencode(query_params, doseq=True))
615                return urlunsplit(uri)
616
617            if self.params.response_mode == ResponseMode.FRAGMENT:
618                query_fragment = {}
619                if self.params.grant_type in [GrantType.AUTHORIZATION_CODE]:
620                    query_fragment["code"] = code.code
621                    query_fragment["state"] = [str(self.params.state) if self.params.state else ""]
622                else:
623                    query_fragment = self.create_implicit_response(code)
624
625                uri = uri._replace(
626                    fragment=uri.fragment + urlencode(query_fragment, doseq=True),
627                )
628
629                return urlunsplit(uri)
630
631            if self.params.response_mode == ResponseMode.FORM_POST:
632                post_params = {}
633                if self.params.grant_type in [GrantType.AUTHORIZATION_CODE]:
634                    post_params["code"] = code.code
635                    post_params["state"] = [str(self.params.state) if self.params.state else ""]
636                else:
637                    post_params = self.create_implicit_response(code)
638
639                uri = uri._replace(query=urlencode(post_params, doseq=True))
640
641                return urlunsplit(uri)
642
643            raise OAuth2Error()
644        except OAuth2Error as error:
645            LOGGER.warning("Error when trying to create response uri", error=error)
646            raise AuthorizeError(
647                self.params.redirect_uri,
648                "server_error",
649                self.params.grant_type,
650                self.params.state,
651            ) from None
652
653    def create_implicit_response(self, code: AuthorizationCode | None) -> dict:
654        """Create implicit response's URL Fragment dictionary"""
655        query_fragment = {}
656        auth_event = get_login_event(self.request)
657
658        now = timezone.now()
659        access_token_expiry = now + timedelta_from_string(self.provider.access_token_validity)
660        token = AccessToken(
661            user=self.request.user,
662            scope=self.params.scope,
663            expires=access_token_expiry,
664            provider=self.provider,
665            auth_time=auth_event.created if auth_event else now,
666            session=self.request.session["authenticatedsession"],
667        )
668
669        id_token = IDToken.new(self.provider, token, self.request)
670        id_token.nonce = self.params.nonce
671
672        if self.params.response_type in [
673            ResponseTypes.CODE_ID_TOKEN,
674            ResponseTypes.CODE_ID_TOKEN_TOKEN,
675        ]:
676            id_token.c_hash = code.c_hash
677        token.id_token = id_token
678
679        # Check if response_type must include access_token in the response.
680        if self.params.response_type in [
681            ResponseTypes.ID_TOKEN_TOKEN,
682            ResponseTypes.CODE_ID_TOKEN_TOKEN,
683            ResponseTypes.CODE_TOKEN,
684        ]:
685            query_fragment["access_token"] = token.token
686            # Get at_hash of the current token and update the id_token
687            id_token.at_hash = token.at_hash
688
689        # Check if response_type must include id_token in the response.
690        if self.params.response_type in [
691            ResponseTypes.ID_TOKEN,
692            ResponseTypes.ID_TOKEN_TOKEN,
693            ResponseTypes.CODE_ID_TOKEN,
694            ResponseTypes.CODE_ID_TOKEN_TOKEN,
695        ]:
696            query_fragment["id_token"] = self.provider.encode(id_token.to_dict())
697            token._id_token = dumps(id_token.to_dict())
698
699        token.save()
700
701        # Code parameter must be present if it's Hybrid Flow.
702        if self.params.grant_type == GrantType.HYBRID:
703            query_fragment["code"] = code.code
704
705        query_fragment["token_type"] = TOKEN_TYPE
706        query_fragment["expires_in"] = int(
707            timedelta_from_string(self.provider.access_token_validity).total_seconds()
708        )
709        query_fragment["state"] = self.params.state if self.params.state else ""
710        return query_fragment
LOGGER = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
PLAN_CONTEXT_PARAMS = 'goauthentik.io/providers/oauth2/params'
SESSION_KEY_LAST_LOGIN_UID = 'authentik/providers/oauth2/last_login_uid'
ALLOWED_PROMPT_PARAMS = {'consent', 'none', 'login'}
@dataclass(slots=True)
class OAuthAuthorizationParams:
 84@dataclass(slots=True)
 85class OAuthAuthorizationParams:
 86    """Parameters required to authorize an OAuth Client"""
 87
 88    client_id: str
 89    redirect_uri: str
 90    response_type: str
 91    response_mode: str | None
 92    scope: set[str]
 93    state: str
 94    nonce: str | None
 95    prompt: set[str]
 96    grant_type: str
 97
 98    provider: OAuth2Provider = field(default_factory=OAuth2Provider)
 99
100    request: str | None = None
101
102    max_age: int | None = None
103
104    code_challenge: str | None = None
105    code_challenge_method: str | None = None
106
107    github_compat: InitVar[bool] = False
108
109    @staticmethod
110    def from_request(request: HttpRequest, github_compat=False) -> OAuthAuthorizationParams:
111        """
112        Get all the params used by the Authorization Code Flow
113        (and also for the Implicit and Hybrid).
114
115        See: http://openid.net/specs/openid-connect-core-1_0.html#AuthRequest
116        """
117        # Because in this endpoint we handle both GET
118        # and POST request.
119        query_dict = request.POST if request.method == "POST" else request.GET
120        state = query_dict.get("state")
121        redirect_uri = query_dict.get("redirect_uri", "")
122
123        response_type = query_dict.get("response_type", "")
124
125        # Validate and check the response_mode against the predefined dict
126        # Set to Query or Fragment if not defined in request
127        response_mode = query_dict.get("response_mode", False)
128
129        max_age = query_dict.get("max_age")
130        return OAuthAuthorizationParams(
131            client_id=query_dict.get("client_id", ""),
132            redirect_uri=redirect_uri,
133            response_type=response_type,
134            response_mode=response_mode,
135            grant_type="",
136            scope=set(query_dict.get("scope", "").split()),
137            state=state,
138            nonce=query_dict.get("nonce"),
139            prompt=ALLOWED_PROMPT_PARAMS.intersection(set(query_dict.get("prompt", "").split())),
140            request=query_dict.get("request", None),
141            max_age=int(max_age) if max_age else None,
142            code_challenge=query_dict.get("code_challenge"),
143            code_challenge_method=query_dict.get("code_challenge_method", "plain"),
144            github_compat=github_compat,
145        )
146
147    def __post_init__(self, github_compat=False):
148        self.provider: OAuth2Provider = OAuth2Provider.objects.filter(
149            client_id=self.client_id
150        ).first()
151        if not self.provider:
152            LOGGER.warning("Invalid client identifier", client_id=self.client_id)
153            raise ClientIdError(client_id=self.client_id)
154        self.check_redirect_uri()
155        self.check_grant()
156        self.check_scope(github_compat)
157        if self.request:
158            raise AuthorizeError(
159                self.redirect_uri, "request_not_supported", self.grant_type, self.state
160            )
161        self.check_nonce()
162        self.check_code_challenge()
163
164    def check_grant(self):
165        """Check grant"""
166        # Determine which flow to use.
167        if self.response_type in [ResponseTypes.CODE]:
168            self.grant_type = GrantType.AUTHORIZATION_CODE
169        elif self.response_type in [
170            ResponseTypes.ID_TOKEN,
171            ResponseTypes.ID_TOKEN_TOKEN,
172        ]:
173            self.grant_type = GrantType.IMPLICIT
174        elif self.response_type in [
175            ResponseTypes.CODE_TOKEN,
176            ResponseTypes.CODE_ID_TOKEN,
177            ResponseTypes.CODE_ID_TOKEN_TOKEN,
178        ]:
179            self.grant_type = GrantType.HYBRID
180        # Grant type validation.
181        if not self.grant_type:
182            LOGGER.warning("Invalid response type", type=self.response_type)
183            raise AuthorizeError(self.redirect_uri, "unsupported_response_type", "", self.state)
184
185        if self.grant_type not in self.provider.grant_types:
186            LOGGER.warning("Invalid grant_type for provider", grant_type=self.grant_type)
187            raise AuthorizeError(self.redirect_uri, "invalid_request", self.grant_type, self.state)
188
189        if self.response_mode not in ResponseMode.values:
190            self.response_mode = ResponseMode.QUERY
191
192            if self.grant_type in [GrantType.IMPLICIT, GrantType.HYBRID]:
193                self.response_mode = ResponseMode.FRAGMENT
194
195    def check_redirect_uri(self):
196        """Redirect URI validation."""
197        allowed_redirect_urls = self.provider.authorization_redirect_uris
198        if not self.redirect_uri:
199            LOGGER.warning("Missing redirect uri.")
200            raise RedirectUriError("", allowed_redirect_urls).with_cause("redirect_uri_missing")
201
202        match_found = False
203        for allowed in allowed_redirect_urls:
204            if allowed.matching_mode == RedirectURIMatchingMode.STRICT:
205                if self.redirect_uri == allowed.url:
206                    match_found = True
207                    break
208            if allowed.matching_mode == RedirectURIMatchingMode.REGEX:
209                try:
210                    if fullmatch(allowed.url, self.redirect_uri):
211                        match_found = True
212                        break
213                except RegexError as exc:
214                    LOGGER.warning(
215                        "Failed to parse regular expression",
216                        exc=exc,
217                        url=allowed.url,
218                        provider=self.provider,
219                    )
220        if not match_found:
221            raise RedirectUriError(self.redirect_uri, allowed_redirect_urls).with_cause(
222                "redirect_uri_no_match"
223            )
224        # Check against forbidden schemes
225        if urlparse(self.redirect_uri).scheme in FORBIDDEN_URI_SCHEMES:
226            raise RedirectUriError(self.redirect_uri, allowed_redirect_urls).with_cause(
227                "redirect_uri_forbidden_scheme"
228            )
229
230    def check_scope(self, github_compat=False):
231        """Ensure openid scope is set in Hybrid flows, or when requesting an id_token"""
232        default_scope_names = set(
233            ScopeMapping.objects.filter(provider__in=[self.provider]).values_list(
234                "scope_name", flat=True
235            )
236        )
237        if len(self.scope) == 0:
238            self.scope = default_scope_names
239            LOGGER.info(
240                "No scopes requested, defaulting to all configured scopes", scopes=self.scope
241            )
242        scopes_to_check = self.scope
243        if github_compat:
244            scopes_to_check = self.scope - SCOPE_GITHUB
245        if not scopes_to_check.issubset(default_scope_names):
246            LOGGER.info(
247                "Application requested scopes not configured, setting to overlap",
248                scope_allowed=default_scope_names,
249                scope_given=self.scope,
250            )
251            self.scope = self.scope.intersection(default_scope_names)
252        if SCOPE_OPENID not in self.scope and (
253            self.grant_type == GrantType.HYBRID
254            or self.response_type in [ResponseTypes.ID_TOKEN, ResponseTypes.ID_TOKEN_TOKEN]
255        ):
256            LOGGER.warning("Missing 'openid' scope.")
257            raise AuthorizeError(
258                self.redirect_uri, "invalid_scope", self.grant_type, self.state
259            ).with_cause("scope_openid_missing")
260        if SCOPE_OFFLINE_ACCESS in self.scope:
261            # https://openid.net/specs/openid-connect-core-1_0.html#OfflineAccess
262            # Don't explicitly request consent with offline_access, as the spec allows for
263            # "other conditions for processing the request permitting offline access to the
264            # requested resources are in place"
265            # which we interpret as "the admin picks an authorization flow with or without consent"
266            if self.response_type not in [
267                ResponseTypes.CODE,
268                ResponseTypes.CODE_TOKEN,
269                ResponseTypes.CODE_ID_TOKEN,
270                ResponseTypes.CODE_ID_TOKEN_TOKEN,
271            ]:
272                # offline_access requires a response type that has some sort of token
273                # Spec says to ignore the scope when the response_type wouldn't result
274                # in an authorization code being generated
275                self.scope.remove(SCOPE_OFFLINE_ACCESS)
276
277    def check_nonce(self):
278        """Nonce parameter validation."""
279        # nonce is required for all flows that return an id_token from the authorization endpoint,
280        # see https://openid.net/specs/openid-connect-core-1_0.html#ImplicitAuthRequest or
281        # https://openid.net/specs/openid-connect-core-1_0.html#HybridIDToken and
282        # https://bitbucket.org/openid/connect/issues/972/nonce-requirement-in-hybrid-auth-request
283        if self.response_type not in [
284            ResponseTypes.ID_TOKEN,
285            ResponseTypes.ID_TOKEN_TOKEN,
286            ResponseTypes.CODE_ID_TOKEN,
287            ResponseTypes.CODE_ID_TOKEN_TOKEN,
288        ]:
289            return
290        if SCOPE_OPENID not in self.scope:
291            return
292        if not self.nonce:
293            LOGGER.warning("Missing nonce for OpenID Request")
294            raise AuthorizeError(
295                self.redirect_uri, "invalid_request", self.grant_type, self.state
296            ).with_cause("nonce_missing")
297
298    def check_code_challenge(self):
299        """PKCE validation of the transformation method."""
300        if self.code_challenge and self.code_challenge_method not in [
301            PKCE_METHOD_PLAIN,
302            PKCE_METHOD_S256,
303        ]:
304            raise AuthorizeError(
305                self.redirect_uri,
306                "invalid_request",
307                self.grant_type,
308                self.state,
309                f"Unsupported challenge method {self.code_challenge_method}",
310            )
311
312    def create_code(self, request: HttpRequest) -> AuthorizationCode:
313        """Create an AuthorizationCode object for the request"""
314        auth_event = get_login_event(request)
315
316        now = timezone.now()
317
318        code = AuthorizationCode(
319            user=request.user,
320            provider=self.provider,
321            auth_time=auth_event.created if auth_event else now,
322            code=uuid4().hex,
323            expires=now + timedelta_from_string(self.provider.access_code_validity),
324            scope=self.scope,
325            nonce=self.nonce,
326            session=request.session["authenticatedsession"],
327        )
328
329        if self.code_challenge and self.code_challenge_method:
330            code.code_challenge = self.code_challenge
331            code.code_challenge_method = self.code_challenge_method
332
333        return code

Parameters required to authorize an OAuth Client

OAuthAuthorizationParams( client_id: str, redirect_uri: str, response_type: str, response_mode: str | None, scope: set[str], state: str, nonce: str | None, prompt: set[str], grant_type: str, provider: authentik.providers.oauth2.models.OAuth2Provider = <factory>, request: str | None = None, max_age: int | None = None, code_challenge: str | None = None, code_challenge_method: str | None = None, github_compat: dataclasses.InitVar[bool] = False)
client_id: str
redirect_uri: str
response_type: str
response_mode: str | None
scope: set[str]
state: str
nonce: str | None
prompt: set[str]
grant_type: str
request: str | None
max_age: int | None
code_challenge: str | None
code_challenge_method: str | None
github_compat: dataclasses.InitVar[bool] = False
@staticmethod
def from_request( request: django.http.request.HttpRequest, github_compat=False) -> OAuthAuthorizationParams:
109    @staticmethod
110    def from_request(request: HttpRequest, github_compat=False) -> OAuthAuthorizationParams:
111        """
112        Get all the params used by the Authorization Code Flow
113        (and also for the Implicit and Hybrid).
114
115        See: http://openid.net/specs/openid-connect-core-1_0.html#AuthRequest
116        """
117        # Because in this endpoint we handle both GET
118        # and POST request.
119        query_dict = request.POST if request.method == "POST" else request.GET
120        state = query_dict.get("state")
121        redirect_uri = query_dict.get("redirect_uri", "")
122
123        response_type = query_dict.get("response_type", "")
124
125        # Validate and check the response_mode against the predefined dict
126        # Set to Query or Fragment if not defined in request
127        response_mode = query_dict.get("response_mode", False)
128
129        max_age = query_dict.get("max_age")
130        return OAuthAuthorizationParams(
131            client_id=query_dict.get("client_id", ""),
132            redirect_uri=redirect_uri,
133            response_type=response_type,
134            response_mode=response_mode,
135            grant_type="",
136            scope=set(query_dict.get("scope", "").split()),
137            state=state,
138            nonce=query_dict.get("nonce"),
139            prompt=ALLOWED_PROMPT_PARAMS.intersection(set(query_dict.get("prompt", "").split())),
140            request=query_dict.get("request", None),
141            max_age=int(max_age) if max_age else None,
142            code_challenge=query_dict.get("code_challenge"),
143            code_challenge_method=query_dict.get("code_challenge_method", "plain"),
144            github_compat=github_compat,
145        )

Get all the params used by the Authorization Code Flow (and also for the Implicit and Hybrid).

See: http://openid.net/specs/openid-connect-core-1_0.html#AuthRequest

def check_grant(self):
164    def check_grant(self):
165        """Check grant"""
166        # Determine which flow to use.
167        if self.response_type in [ResponseTypes.CODE]:
168            self.grant_type = GrantType.AUTHORIZATION_CODE
169        elif self.response_type in [
170            ResponseTypes.ID_TOKEN,
171            ResponseTypes.ID_TOKEN_TOKEN,
172        ]:
173            self.grant_type = GrantType.IMPLICIT
174        elif self.response_type in [
175            ResponseTypes.CODE_TOKEN,
176            ResponseTypes.CODE_ID_TOKEN,
177            ResponseTypes.CODE_ID_TOKEN_TOKEN,
178        ]:
179            self.grant_type = GrantType.HYBRID
180        # Grant type validation.
181        if not self.grant_type:
182            LOGGER.warning("Invalid response type", type=self.response_type)
183            raise AuthorizeError(self.redirect_uri, "unsupported_response_type", "", self.state)
184
185        if self.grant_type not in self.provider.grant_types:
186            LOGGER.warning("Invalid grant_type for provider", grant_type=self.grant_type)
187            raise AuthorizeError(self.redirect_uri, "invalid_request", self.grant_type, self.state)
188
189        if self.response_mode not in ResponseMode.values:
190            self.response_mode = ResponseMode.QUERY
191
192            if self.grant_type in [GrantType.IMPLICIT, GrantType.HYBRID]:
193                self.response_mode = ResponseMode.FRAGMENT

Check grant

def check_redirect_uri(self):
195    def check_redirect_uri(self):
196        """Redirect URI validation."""
197        allowed_redirect_urls = self.provider.authorization_redirect_uris
198        if not self.redirect_uri:
199            LOGGER.warning("Missing redirect uri.")
200            raise RedirectUriError("", allowed_redirect_urls).with_cause("redirect_uri_missing")
201
202        match_found = False
203        for allowed in allowed_redirect_urls:
204            if allowed.matching_mode == RedirectURIMatchingMode.STRICT:
205                if self.redirect_uri == allowed.url:
206                    match_found = True
207                    break
208            if allowed.matching_mode == RedirectURIMatchingMode.REGEX:
209                try:
210                    if fullmatch(allowed.url, self.redirect_uri):
211                        match_found = True
212                        break
213                except RegexError as exc:
214                    LOGGER.warning(
215                        "Failed to parse regular expression",
216                        exc=exc,
217                        url=allowed.url,
218                        provider=self.provider,
219                    )
220        if not match_found:
221            raise RedirectUriError(self.redirect_uri, allowed_redirect_urls).with_cause(
222                "redirect_uri_no_match"
223            )
224        # Check against forbidden schemes
225        if urlparse(self.redirect_uri).scheme in FORBIDDEN_URI_SCHEMES:
226            raise RedirectUriError(self.redirect_uri, allowed_redirect_urls).with_cause(
227                "redirect_uri_forbidden_scheme"
228            )

Redirect URI validation.

def check_scope(self, github_compat=False):
230    def check_scope(self, github_compat=False):
231        """Ensure openid scope is set in Hybrid flows, or when requesting an id_token"""
232        default_scope_names = set(
233            ScopeMapping.objects.filter(provider__in=[self.provider]).values_list(
234                "scope_name", flat=True
235            )
236        )
237        if len(self.scope) == 0:
238            self.scope = default_scope_names
239            LOGGER.info(
240                "No scopes requested, defaulting to all configured scopes", scopes=self.scope
241            )
242        scopes_to_check = self.scope
243        if github_compat:
244            scopes_to_check = self.scope - SCOPE_GITHUB
245        if not scopes_to_check.issubset(default_scope_names):
246            LOGGER.info(
247                "Application requested scopes not configured, setting to overlap",
248                scope_allowed=default_scope_names,
249                scope_given=self.scope,
250            )
251            self.scope = self.scope.intersection(default_scope_names)
252        if SCOPE_OPENID not in self.scope and (
253            self.grant_type == GrantType.HYBRID
254            or self.response_type in [ResponseTypes.ID_TOKEN, ResponseTypes.ID_TOKEN_TOKEN]
255        ):
256            LOGGER.warning("Missing 'openid' scope.")
257            raise AuthorizeError(
258                self.redirect_uri, "invalid_scope", self.grant_type, self.state
259            ).with_cause("scope_openid_missing")
260        if SCOPE_OFFLINE_ACCESS in self.scope:
261            # https://openid.net/specs/openid-connect-core-1_0.html#OfflineAccess
262            # Don't explicitly request consent with offline_access, as the spec allows for
263            # "other conditions for processing the request permitting offline access to the
264            # requested resources are in place"
265            # which we interpret as "the admin picks an authorization flow with or without consent"
266            if self.response_type not in [
267                ResponseTypes.CODE,
268                ResponseTypes.CODE_TOKEN,
269                ResponseTypes.CODE_ID_TOKEN,
270                ResponseTypes.CODE_ID_TOKEN_TOKEN,
271            ]:
272                # offline_access requires a response type that has some sort of token
273                # Spec says to ignore the scope when the response_type wouldn't result
274                # in an authorization code being generated
275                self.scope.remove(SCOPE_OFFLINE_ACCESS)

Ensure openid scope is set in Hybrid flows, or when requesting an id_token

def check_nonce(self):
277    def check_nonce(self):
278        """Nonce parameter validation."""
279        # nonce is required for all flows that return an id_token from the authorization endpoint,
280        # see https://openid.net/specs/openid-connect-core-1_0.html#ImplicitAuthRequest or
281        # https://openid.net/specs/openid-connect-core-1_0.html#HybridIDToken and
282        # https://bitbucket.org/openid/connect/issues/972/nonce-requirement-in-hybrid-auth-request
283        if self.response_type not in [
284            ResponseTypes.ID_TOKEN,
285            ResponseTypes.ID_TOKEN_TOKEN,
286            ResponseTypes.CODE_ID_TOKEN,
287            ResponseTypes.CODE_ID_TOKEN_TOKEN,
288        ]:
289            return
290        if SCOPE_OPENID not in self.scope:
291            return
292        if not self.nonce:
293            LOGGER.warning("Missing nonce for OpenID Request")
294            raise AuthorizeError(
295                self.redirect_uri, "invalid_request", self.grant_type, self.state
296            ).with_cause("nonce_missing")

Nonce parameter validation.

def check_code_challenge(self):
298    def check_code_challenge(self):
299        """PKCE validation of the transformation method."""
300        if self.code_challenge and self.code_challenge_method not in [
301            PKCE_METHOD_PLAIN,
302            PKCE_METHOD_S256,
303        ]:
304            raise AuthorizeError(
305                self.redirect_uri,
306                "invalid_request",
307                self.grant_type,
308                self.state,
309                f"Unsupported challenge method {self.code_challenge_method}",
310            )

PKCE validation of the transformation method.

def create_code( self, request: django.http.request.HttpRequest) -> authentik.providers.oauth2.models.AuthorizationCode:
312    def create_code(self, request: HttpRequest) -> AuthorizationCode:
313        """Create an AuthorizationCode object for the request"""
314        auth_event = get_login_event(request)
315
316        now = timezone.now()
317
318        code = AuthorizationCode(
319            user=request.user,
320            provider=self.provider,
321            auth_time=auth_event.created if auth_event else now,
322            code=uuid4().hex,
323            expires=now + timedelta_from_string(self.provider.access_code_validity),
324            scope=self.scope,
325            nonce=self.nonce,
326            session=request.session["authenticatedsession"],
327        )
328
329        if self.code_challenge and self.code_challenge_method:
330            code.code_challenge = self.code_challenge
331            code.code_challenge_method = self.code_challenge_method
332
333        return code

Create an AuthorizationCode object for the request

class AuthorizationFlowInitView(authentik.policies.views.PolicyAccessView):
336class AuthorizationFlowInitView(PolicyAccessView):
337    """OAuth2 Flow initializer, checks access to application and starts flow"""
338
339    params: OAuthAuthorizationParams
340    # Enable GitHub compatibility (only allow for scopes which are handled
341    # differently for github compat)
342    github_compat = False
343
344    def pre_permission_check(self):
345        """Check prompt parameter before checking permission/authentication,
346        see https://openid.net/specs/openid-connect-core-1_0.html#rfc.section.3.1.2.6"""
347        # Quick sanity check at the beginning to prevent event spamming
348        if len(self.request.GET) < 1:
349            raise Http404
350        try:
351            self.params = OAuthAuthorizationParams.from_request(
352                self.request, github_compat=self.github_compat
353            )
354        except AuthorizeError as error:
355            LOGGER.warning(error.description, redirect_uri=error.redirect_uri, cause=error.cause)
356            raise RequestValidationError(error.get_response(self.request)) from None
357        except OAuth2Error as error:
358            LOGGER.warning(error.description, cause=error.cause)
359            raise RequestValidationError(
360                bad_request_message(self.request, error.description, title=error.error)
361            ) from None
362        except OAuth2Provider.DoesNotExist:
363            raise Http404 from None
364        if PROMPT_NONE in self.params.prompt and not self.request.user.is_authenticated:
365            # When "prompt" is set to "none" but the user is not logged in, show an error message
366            error = AuthorizeError(
367                self.params.redirect_uri,
368                "login_required",
369                self.params.grant_type,
370                self.params.state,
371            )
372            raise RequestValidationError(error.get_response(self.request))
373
374    def resolve_provider_application(self):
375        client_id = self.request.GET.get("client_id")
376        self.provider = get_object_or_404(OAuth2Provider, client_id=client_id)
377        self.application = self.provider.application
378
379    def modify_flow_context(self, flow: Flow, context: dict[str, Any]) -> dict[str, Any]:
380        if QS_LOGIN_HINT in self.request.GET:
381            context[PLAN_CONTEXT_PENDING_USER_IDENTIFIER] = self.request.GET.get(QS_LOGIN_HINT)
382        return super().modify_flow_context(flow, context)
383
384    def modify_policy_request(self, request: PolicyRequest) -> PolicyRequest:
385        request.context["oauth_scopes"] = self.params.scope
386        request.context["oauth_grant_type"] = self.params.grant_type
387        request.context["oauth_code_challenge"] = self.params.code_challenge
388        request.context["oauth_code_challenge_method"] = self.params.code_challenge_method
389        request.context["oauth_max_age"] = self.params.max_age
390        request.context["oauth_redirect_uri"] = self.params.redirect_uri
391        request.context["oauth_response_type"] = self.params.response_type
392        return request
393
394    def dispatch_with_language(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
395        """Activate language from OIDC specific ui_locales parameter, picking the earliest one
396        available"""
397        selected_language = None
398        if UI_LOCALES in self.request.GET:
399            languages = str(self.request.GET[UI_LOCALES]).split(" ")
400            for language in languages:
401                if translation.check_for_language(language):
402                    selected_language = translation.get_supported_language_variant(language)
403                    LOGGER.debug(
404                        "Activating language from oidc ui_locales", locale=selected_language
405                    )
406                    break
407            translation.activate(selected_language)
408        response = super().dispatch(request, *args, **kwargs)
409        if selected_language:
410            response.set_cookie(
411                settings.LANGUAGE_COOKIE_NAME,
412                selected_language,
413                max_age=settings.LANGUAGE_COOKIE_AGE,
414                path=settings.LANGUAGE_COOKIE_PATH,
415                domain=settings.LANGUAGE_COOKIE_DOMAIN,
416                secure=settings.LANGUAGE_COOKIE_SECURE,
417                httponly=settings.LANGUAGE_COOKIE_HTTPONLY,
418                samesite=settings.LANGUAGE_COOKIE_SAMESITE,
419            )
420            if isinstance(response, HttpResponseRedirect):
421                parsed_url = urlparse(response.url)
422                args = parse_qs(parsed_url.query)
423                args["locale"] = selected_language
424                response["Location"] = urlunparse(
425                    parsed_url._replace(query=urlencode(args, quote_via=quote, doseq=True))
426                )
427        return response
428
429    def dispatch(self, request: HttpRequest, *args, **kwargs):
430        # Activate language before parsing params (error messages should be localized)
431        return self.dispatch_with_language(request, *args, **kwargs)
432
433    def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
434        """Start FlowPLanner, return to flow executor shell"""
435        # Require a login event to be set, otherwise make the user re-login
436        login_event = get_login_event(request)
437        if not login_event:
438            LOGGER.warning("request with no login event")
439            return self.handle_no_permission()
440        login_uid = str(login_event.pk)
441        # After we've checked permissions, and the user has access, check if we need
442        # to re-authenticate the user
443        if self.params.max_age:
444            # Attempt to check via the session's login event if set, otherwise we can't
445            # check
446            login_time = login_event.created
447            current_age: timedelta = timezone.now() - login_time
448            if current_age.total_seconds() > self.params.max_age:
449                LOGGER.debug(
450                    "Triggering authentication as max_age requirement",
451                    max_age=self.params.max_age,
452                    ago=int(current_age.total_seconds()),
453                )
454                # Since we already need to re-authenticate the user, set the old login UID
455                # in case this request has both max_age and prompt=login
456                self.request.session[SESSION_KEY_LAST_LOGIN_UID] = login_uid
457                return self.handle_no_permission()
458        # If prompt=login, we need to re-authenticate the user regardless
459        # Check if we're not already doing the re-authentication
460        if PROMPT_LOGIN in self.params.prompt:
461            # No previous login UID saved, so save the current uid and trigger
462            # re-login, or previous login UID matches current one, so no re-login happened yet
463            if (
464                SESSION_KEY_LAST_LOGIN_UID not in self.request.session
465                or login_uid == self.request.session[SESSION_KEY_LAST_LOGIN_UID]
466            ):
467                self.request.session[SESSION_KEY_LAST_LOGIN_UID] = login_uid
468                return self.handle_no_permission()
469        scope_descriptions = UserInfoView().get_scope_descriptions(
470            self.params.scope, self.params.provider
471        )
472        # Regardless, we start the planner and return to it
473        planner = FlowPlanner(self.provider.authorization_flow)
474        planner.allow_empty_flows = True
475        try:
476            plan = planner.plan(
477                self.request,
478                {
479                    PLAN_CONTEXT_SSO: True,
480                    PLAN_CONTEXT_APPLICATION: self.application,
481                    # OAuth2 related params
482                    PLAN_CONTEXT_PARAMS: self.params,
483                    # Consent related params
484                    PLAN_CONTEXT_CONSENT_HEADER: _("You're about to sign into %(application)s.")
485                    % {"application": self.application.name},
486                    PLAN_CONTEXT_CONSENT_PERMISSIONS: scope_descriptions,
487                },
488            )
489        except FlowNonApplicableException:
490            return self.handle_no_permission_authenticated()
491        # OpenID clients can specify a `prompt` parameter, and if its set to consent we
492        # need to inject a consent stage
493        if PROMPT_CONSENT in self.params.prompt:
494            if not any(isinstance(x.stage, ConsentStage) for x in plan.bindings):
495                # Plan does not have any consent stage, so we add an in-memory one
496                stage = ConsentStage(
497                    name="OAuth2 Provider In-memory consent stage",
498                    mode=ConsentMode.ALWAYS_REQUIRE,
499                )
500                plan.append_stage(stage)
501
502        plan.append_stage(in_memory_stage(OAuthFulfillmentStage))
503
504        return plan.to_redirect(
505            self.request,
506            self.provider.authorization_flow,
507            # We can only skip the flow executor and directly go to the final redirect URL if
508            #  we can submit the data to the RP via URL
509            allowed_silent_types=(
510                [OAuthFulfillmentStage]
511                if self.params.response_mode in [ResponseMode.QUERY, ResponseMode.FRAGMENT]
512                else []
513            ),
514        )

OAuth2 Flow initializer, checks access to application and starts flow

github_compat = False
def pre_permission_check(self):
344    def pre_permission_check(self):
345        """Check prompt parameter before checking permission/authentication,
346        see https://openid.net/specs/openid-connect-core-1_0.html#rfc.section.3.1.2.6"""
347        # Quick sanity check at the beginning to prevent event spamming
348        if len(self.request.GET) < 1:
349            raise Http404
350        try:
351            self.params = OAuthAuthorizationParams.from_request(
352                self.request, github_compat=self.github_compat
353            )
354        except AuthorizeError as error:
355            LOGGER.warning(error.description, redirect_uri=error.redirect_uri, cause=error.cause)
356            raise RequestValidationError(error.get_response(self.request)) from None
357        except OAuth2Error as error:
358            LOGGER.warning(error.description, cause=error.cause)
359            raise RequestValidationError(
360                bad_request_message(self.request, error.description, title=error.error)
361            ) from None
362        except OAuth2Provider.DoesNotExist:
363            raise Http404 from None
364        if PROMPT_NONE in self.params.prompt and not self.request.user.is_authenticated:
365            # When "prompt" is set to "none" but the user is not logged in, show an error message
366            error = AuthorizeError(
367                self.params.redirect_uri,
368                "login_required",
369                self.params.grant_type,
370                self.params.state,
371            )
372            raise RequestValidationError(error.get_response(self.request))

Check prompt parameter before checking permission/authentication, see https://openid.net/specs/openid-connect-core-1_0.html#rfc.section.3.1.2.6

def resolve_provider_application(self):
374    def resolve_provider_application(self):
375        client_id = self.request.GET.get("client_id")
376        self.provider = get_object_or_404(OAuth2Provider, client_id=client_id)
377        self.application = self.provider.application

Resolve self.provider and self.application. *.DoesNotExist Exceptions cause a normal AccessDenied view to be shown. An Http404 exception is not caught, and will return directly

def modify_flow_context( self, flow: authentik.flows.models.Flow, context: dict[str, typing.Any]) -> dict[str, typing.Any]:
379    def modify_flow_context(self, flow: Flow, context: dict[str, Any]) -> dict[str, Any]:
380        if QS_LOGIN_HINT in self.request.GET:
381            context[PLAN_CONTEXT_PENDING_USER_IDENTIFIER] = self.request.GET.get(QS_LOGIN_HINT)
382        return super().modify_flow_context(flow, context)

optionally modify the flow context which is used for the authentication flow

def modify_policy_request( self, request: authentik.policies.types.PolicyRequest) -> authentik.policies.types.PolicyRequest:
384    def modify_policy_request(self, request: PolicyRequest) -> PolicyRequest:
385        request.context["oauth_scopes"] = self.params.scope
386        request.context["oauth_grant_type"] = self.params.grant_type
387        request.context["oauth_code_challenge"] = self.params.code_challenge
388        request.context["oauth_code_challenge_method"] = self.params.code_challenge_method
389        request.context["oauth_max_age"] = self.params.max_age
390        request.context["oauth_redirect_uri"] = self.params.redirect_uri
391        request.context["oauth_response_type"] = self.params.response_type
392        return request

optionally modify the policy request

def dispatch_with_language( self, request: django.http.request.HttpRequest, *args, **kwargs) -> django.http.response.HttpResponse:
394    def dispatch_with_language(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
395        """Activate language from OIDC specific ui_locales parameter, picking the earliest one
396        available"""
397        selected_language = None
398        if UI_LOCALES in self.request.GET:
399            languages = str(self.request.GET[UI_LOCALES]).split(" ")
400            for language in languages:
401                if translation.check_for_language(language):
402                    selected_language = translation.get_supported_language_variant(language)
403                    LOGGER.debug(
404                        "Activating language from oidc ui_locales", locale=selected_language
405                    )
406                    break
407            translation.activate(selected_language)
408        response = super().dispatch(request, *args, **kwargs)
409        if selected_language:
410            response.set_cookie(
411                settings.LANGUAGE_COOKIE_NAME,
412                selected_language,
413                max_age=settings.LANGUAGE_COOKIE_AGE,
414                path=settings.LANGUAGE_COOKIE_PATH,
415                domain=settings.LANGUAGE_COOKIE_DOMAIN,
416                secure=settings.LANGUAGE_COOKIE_SECURE,
417                httponly=settings.LANGUAGE_COOKIE_HTTPONLY,
418                samesite=settings.LANGUAGE_COOKIE_SAMESITE,
419            )
420            if isinstance(response, HttpResponseRedirect):
421                parsed_url = urlparse(response.url)
422                args = parse_qs(parsed_url.query)
423                args["locale"] = selected_language
424                response["Location"] = urlunparse(
425                    parsed_url._replace(query=urlencode(args, quote_via=quote, doseq=True))
426                )
427        return response

Activate language from OIDC specific ui_locales parameter, picking the earliest one available

def dispatch(self, request: django.http.request.HttpRequest, *args, **kwargs):
429    def dispatch(self, request: HttpRequest, *args, **kwargs):
430        # Activate language before parsing params (error messages should be localized)
431        return self.dispatch_with_language(request, *args, **kwargs)
def get( self, request: django.http.request.HttpRequest, *args, **kwargs) -> django.http.response.HttpResponse:
433    def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
434        """Start FlowPLanner, return to flow executor shell"""
435        # Require a login event to be set, otherwise make the user re-login
436        login_event = get_login_event(request)
437        if not login_event:
438            LOGGER.warning("request with no login event")
439            return self.handle_no_permission()
440        login_uid = str(login_event.pk)
441        # After we've checked permissions, and the user has access, check if we need
442        # to re-authenticate the user
443        if self.params.max_age:
444            # Attempt to check via the session's login event if set, otherwise we can't
445            # check
446            login_time = login_event.created
447            current_age: timedelta = timezone.now() - login_time
448            if current_age.total_seconds() > self.params.max_age:
449                LOGGER.debug(
450                    "Triggering authentication as max_age requirement",
451                    max_age=self.params.max_age,
452                    ago=int(current_age.total_seconds()),
453                )
454                # Since we already need to re-authenticate the user, set the old login UID
455                # in case this request has both max_age and prompt=login
456                self.request.session[SESSION_KEY_LAST_LOGIN_UID] = login_uid
457                return self.handle_no_permission()
458        # If prompt=login, we need to re-authenticate the user regardless
459        # Check if we're not already doing the re-authentication
460        if PROMPT_LOGIN in self.params.prompt:
461            # No previous login UID saved, so save the current uid and trigger
462            # re-login, or previous login UID matches current one, so no re-login happened yet
463            if (
464                SESSION_KEY_LAST_LOGIN_UID not in self.request.session
465                or login_uid == self.request.session[SESSION_KEY_LAST_LOGIN_UID]
466            ):
467                self.request.session[SESSION_KEY_LAST_LOGIN_UID] = login_uid
468                return self.handle_no_permission()
469        scope_descriptions = UserInfoView().get_scope_descriptions(
470            self.params.scope, self.params.provider
471        )
472        # Regardless, we start the planner and return to it
473        planner = FlowPlanner(self.provider.authorization_flow)
474        planner.allow_empty_flows = True
475        try:
476            plan = planner.plan(
477                self.request,
478                {
479                    PLAN_CONTEXT_SSO: True,
480                    PLAN_CONTEXT_APPLICATION: self.application,
481                    # OAuth2 related params
482                    PLAN_CONTEXT_PARAMS: self.params,
483                    # Consent related params
484                    PLAN_CONTEXT_CONSENT_HEADER: _("You're about to sign into %(application)s.")
485                    % {"application": self.application.name},
486                    PLAN_CONTEXT_CONSENT_PERMISSIONS: scope_descriptions,
487                },
488            )
489        except FlowNonApplicableException:
490            return self.handle_no_permission_authenticated()
491        # OpenID clients can specify a `prompt` parameter, and if its set to consent we
492        # need to inject a consent stage
493        if PROMPT_CONSENT in self.params.prompt:
494            if not any(isinstance(x.stage, ConsentStage) for x in plan.bindings):
495                # Plan does not have any consent stage, so we add an in-memory one
496                stage = ConsentStage(
497                    name="OAuth2 Provider In-memory consent stage",
498                    mode=ConsentMode.ALWAYS_REQUIRE,
499                )
500                plan.append_stage(stage)
501
502        plan.append_stage(in_memory_stage(OAuthFulfillmentStage))
503
504        return plan.to_redirect(
505            self.request,
506            self.provider.authorization_flow,
507            # We can only skip the flow executor and directly go to the final redirect URL if
508            #  we can submit the data to the RP via URL
509            allowed_silent_types=(
510                [OAuthFulfillmentStage]
511                if self.params.response_mode in [ResponseMode.QUERY, ResponseMode.FRAGMENT]
512                else []
513            ),
514        )

Start FlowPLanner, return to flow executor shell

class OAuthFulfillmentStage(authentik.flows.stage.StageView):
517class OAuthFulfillmentStage(StageView):
518    """Final stage, restores params from Flow."""
519
520    params: OAuthAuthorizationParams
521    provider: OAuth2Provider
522    application: Application
523
524    def redirect(self, uri: str) -> HttpResponse:
525        """Redirect using HttpResponseRedirectScheme, compatible with non-http schemes"""
526        parsed = urlparse(uri)
527
528        if self.params.response_mode == ResponseMode.FORM_POST:
529            # parse_qs returns a dictionary with values wrapped in lists, however
530            # we need a flat dictionary for the autosubmit challenge
531
532            # this picks the first item in the list if the value is a list,
533            # otherwise just the value as-is
534            query_params = dict(
535                (k, v[0] if isinstance(v, list) else v) for k, v in parse_qs(parsed.query).items()
536            )
537
538            challenge = AutosubmitChallenge(
539                data={
540                    "component": "ak-stage-autosubmit",
541                    "title": self.executor.plan.context.get(
542                        PLAN_CONTEXT_TITLE,
543                        _("Redirecting to {app}...".format_map({"app": self.application.name})),
544                    ),
545                    "url": self.params.redirect_uri,
546                    "attrs": query_params,
547                }
548            )
549
550            challenge.is_valid()
551            self.executor.stage_ok()
552            return HttpChallengeResponse(
553                challenge=challenge,
554            )
555        self.executor.stage_ok()
556        return HttpResponseRedirectScheme(uri, allowed_schemes=[parsed.scheme])
557
558    def post(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
559        """Wrapper when this stage gets hit with a post request"""
560        return self.get(request, *args, **kwargs)
561
562    def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
563        """final Stage of an OAuth2 Flow"""
564        if PLAN_CONTEXT_PARAMS not in self.executor.plan.context:
565            LOGGER.warning("Got to fulfillment stage with no pending context")
566            return HttpResponseBadRequest()
567        self.params: OAuthAuthorizationParams = self.executor.plan.context.pop(PLAN_CONTEXT_PARAMS)
568        self.application: Application = self.executor.plan.context.pop(PLAN_CONTEXT_APPLICATION)
569        self.provider = get_object_or_404(OAuth2Provider, pk=self.application.provider_id)
570        try:
571            # At this point we don't need to check permissions anymore
572            if {PROMPT_NONE, PROMPT_CONSENT}.issubset(self.params.prompt):
573                raise AuthorizeError(
574                    self.params.redirect_uri,
575                    "consent_required",
576                    self.params.grant_type,
577                    self.params.state,
578                )
579            Event.new(
580                EventAction.AUTHORIZE_APPLICATION,
581                authorized_application=self.application,
582                flow=self.executor.plan.flow_pk,
583                scopes=" ".join(self.params.scope),
584            ).from_http(self.request)
585            return self.redirect(self.create_response_uri())
586        except (ClientIdError, RedirectUriError) as error:
587            error.to_event(application=self.application).from_http(request)
588            self.executor.stage_invalid()
589
590            return bad_request_message(request, error.description, title=error.error)
591        except AuthorizeError as error:
592            error.to_event(application=self.application).from_http(request)
593            self.executor.stage_invalid()
594            return error.get_response(self.request)
595
596    def create_response_uri(self) -> str:
597        """Create a final Response URI the user is redirected to."""
598        uri = urlsplit(self.params.redirect_uri)
599
600        try:
601            code = None
602
603            if self.params.grant_type in [
604                GrantType.AUTHORIZATION_CODE,
605                GrantType.HYBRID,
606            ]:
607                code = self.params.create_code(self.request)
608                code.save()
609
610            if self.params.response_mode == ResponseMode.QUERY:
611                query_params = parse_qs(uri.query)
612                query_params["code"] = code.code
613                query_params["state"] = [str(self.params.state) if self.params.state else ""]
614
615                uri = uri._replace(query=urlencode(query_params, doseq=True))
616                return urlunsplit(uri)
617
618            if self.params.response_mode == ResponseMode.FRAGMENT:
619                query_fragment = {}
620                if self.params.grant_type in [GrantType.AUTHORIZATION_CODE]:
621                    query_fragment["code"] = code.code
622                    query_fragment["state"] = [str(self.params.state) if self.params.state else ""]
623                else:
624                    query_fragment = self.create_implicit_response(code)
625
626                uri = uri._replace(
627                    fragment=uri.fragment + urlencode(query_fragment, doseq=True),
628                )
629
630                return urlunsplit(uri)
631
632            if self.params.response_mode == ResponseMode.FORM_POST:
633                post_params = {}
634                if self.params.grant_type in [GrantType.AUTHORIZATION_CODE]:
635                    post_params["code"] = code.code
636                    post_params["state"] = [str(self.params.state) if self.params.state else ""]
637                else:
638                    post_params = self.create_implicit_response(code)
639
640                uri = uri._replace(query=urlencode(post_params, doseq=True))
641
642                return urlunsplit(uri)
643
644            raise OAuth2Error()
645        except OAuth2Error as error:
646            LOGGER.warning("Error when trying to create response uri", error=error)
647            raise AuthorizeError(
648                self.params.redirect_uri,
649                "server_error",
650                self.params.grant_type,
651                self.params.state,
652            ) from None
653
654    def create_implicit_response(self, code: AuthorizationCode | None) -> dict:
655        """Create implicit response's URL Fragment dictionary"""
656        query_fragment = {}
657        auth_event = get_login_event(self.request)
658
659        now = timezone.now()
660        access_token_expiry = now + timedelta_from_string(self.provider.access_token_validity)
661        token = AccessToken(
662            user=self.request.user,
663            scope=self.params.scope,
664            expires=access_token_expiry,
665            provider=self.provider,
666            auth_time=auth_event.created if auth_event else now,
667            session=self.request.session["authenticatedsession"],
668        )
669
670        id_token = IDToken.new(self.provider, token, self.request)
671        id_token.nonce = self.params.nonce
672
673        if self.params.response_type in [
674            ResponseTypes.CODE_ID_TOKEN,
675            ResponseTypes.CODE_ID_TOKEN_TOKEN,
676        ]:
677            id_token.c_hash = code.c_hash
678        token.id_token = id_token
679
680        # Check if response_type must include access_token in the response.
681        if self.params.response_type in [
682            ResponseTypes.ID_TOKEN_TOKEN,
683            ResponseTypes.CODE_ID_TOKEN_TOKEN,
684            ResponseTypes.CODE_TOKEN,
685        ]:
686            query_fragment["access_token"] = token.token
687            # Get at_hash of the current token and update the id_token
688            id_token.at_hash = token.at_hash
689
690        # Check if response_type must include id_token in the response.
691        if self.params.response_type in [
692            ResponseTypes.ID_TOKEN,
693            ResponseTypes.ID_TOKEN_TOKEN,
694            ResponseTypes.CODE_ID_TOKEN,
695            ResponseTypes.CODE_ID_TOKEN_TOKEN,
696        ]:
697            query_fragment["id_token"] = self.provider.encode(id_token.to_dict())
698            token._id_token = dumps(id_token.to_dict())
699
700        token.save()
701
702        # Code parameter must be present if it's Hybrid Flow.
703        if self.params.grant_type == GrantType.HYBRID:
704            query_fragment["code"] = code.code
705
706        query_fragment["token_type"] = TOKEN_TYPE
707        query_fragment["expires_in"] = int(
708            timedelta_from_string(self.provider.access_token_validity).total_seconds()
709        )
710        query_fragment["state"] = self.params.state if self.params.state else ""
711        return query_fragment

Final stage, restores params from Flow.

def redirect(self, uri: str) -> django.http.response.HttpResponse:
524    def redirect(self, uri: str) -> HttpResponse:
525        """Redirect using HttpResponseRedirectScheme, compatible with non-http schemes"""
526        parsed = urlparse(uri)
527
528        if self.params.response_mode == ResponseMode.FORM_POST:
529            # parse_qs returns a dictionary with values wrapped in lists, however
530            # we need a flat dictionary for the autosubmit challenge
531
532            # this picks the first item in the list if the value is a list,
533            # otherwise just the value as-is
534            query_params = dict(
535                (k, v[0] if isinstance(v, list) else v) for k, v in parse_qs(parsed.query).items()
536            )
537
538            challenge = AutosubmitChallenge(
539                data={
540                    "component": "ak-stage-autosubmit",
541                    "title": self.executor.plan.context.get(
542                        PLAN_CONTEXT_TITLE,
543                        _("Redirecting to {app}...".format_map({"app": self.application.name})),
544                    ),
545                    "url": self.params.redirect_uri,
546                    "attrs": query_params,
547                }
548            )
549
550            challenge.is_valid()
551            self.executor.stage_ok()
552            return HttpChallengeResponse(
553                challenge=challenge,
554            )
555        self.executor.stage_ok()
556        return HttpResponseRedirectScheme(uri, allowed_schemes=[parsed.scheme])

Redirect using HttpResponseRedirectScheme, compatible with non-http schemes

def post( self, request: django.http.request.HttpRequest, *args, **kwargs) -> django.http.response.HttpResponse:
558    def post(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
559        """Wrapper when this stage gets hit with a post request"""
560        return self.get(request, *args, **kwargs)

Wrapper when this stage gets hit with a post request

def get( self, request: django.http.request.HttpRequest, *args, **kwargs) -> django.http.response.HttpResponse:
562    def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
563        """final Stage of an OAuth2 Flow"""
564        if PLAN_CONTEXT_PARAMS not in self.executor.plan.context:
565            LOGGER.warning("Got to fulfillment stage with no pending context")
566            return HttpResponseBadRequest()
567        self.params: OAuthAuthorizationParams = self.executor.plan.context.pop(PLAN_CONTEXT_PARAMS)
568        self.application: Application = self.executor.plan.context.pop(PLAN_CONTEXT_APPLICATION)
569        self.provider = get_object_or_404(OAuth2Provider, pk=self.application.provider_id)
570        try:
571            # At this point we don't need to check permissions anymore
572            if {PROMPT_NONE, PROMPT_CONSENT}.issubset(self.params.prompt):
573                raise AuthorizeError(
574                    self.params.redirect_uri,
575                    "consent_required",
576                    self.params.grant_type,
577                    self.params.state,
578                )
579            Event.new(
580                EventAction.AUTHORIZE_APPLICATION,
581                authorized_application=self.application,
582                flow=self.executor.plan.flow_pk,
583                scopes=" ".join(self.params.scope),
584            ).from_http(self.request)
585            return self.redirect(self.create_response_uri())
586        except (ClientIdError, RedirectUriError) as error:
587            error.to_event(application=self.application).from_http(request)
588            self.executor.stage_invalid()
589
590            return bad_request_message(request, error.description, title=error.error)
591        except AuthorizeError as error:
592            error.to_event(application=self.application).from_http(request)
593            self.executor.stage_invalid()
594            return error.get_response(self.request)

final Stage of an OAuth2 Flow

def create_response_uri(self) -> str:
596    def create_response_uri(self) -> str:
597        """Create a final Response URI the user is redirected to."""
598        uri = urlsplit(self.params.redirect_uri)
599
600        try:
601            code = None
602
603            if self.params.grant_type in [
604                GrantType.AUTHORIZATION_CODE,
605                GrantType.HYBRID,
606            ]:
607                code = self.params.create_code(self.request)
608                code.save()
609
610            if self.params.response_mode == ResponseMode.QUERY:
611                query_params = parse_qs(uri.query)
612                query_params["code"] = code.code
613                query_params["state"] = [str(self.params.state) if self.params.state else ""]
614
615                uri = uri._replace(query=urlencode(query_params, doseq=True))
616                return urlunsplit(uri)
617
618            if self.params.response_mode == ResponseMode.FRAGMENT:
619                query_fragment = {}
620                if self.params.grant_type in [GrantType.AUTHORIZATION_CODE]:
621                    query_fragment["code"] = code.code
622                    query_fragment["state"] = [str(self.params.state) if self.params.state else ""]
623                else:
624                    query_fragment = self.create_implicit_response(code)
625
626                uri = uri._replace(
627                    fragment=uri.fragment + urlencode(query_fragment, doseq=True),
628                )
629
630                return urlunsplit(uri)
631
632            if self.params.response_mode == ResponseMode.FORM_POST:
633                post_params = {}
634                if self.params.grant_type in [GrantType.AUTHORIZATION_CODE]:
635                    post_params["code"] = code.code
636                    post_params["state"] = [str(self.params.state) if self.params.state else ""]
637                else:
638                    post_params = self.create_implicit_response(code)
639
640                uri = uri._replace(query=urlencode(post_params, doseq=True))
641
642                return urlunsplit(uri)
643
644            raise OAuth2Error()
645        except OAuth2Error as error:
646            LOGGER.warning("Error when trying to create response uri", error=error)
647            raise AuthorizeError(
648                self.params.redirect_uri,
649                "server_error",
650                self.params.grant_type,
651                self.params.state,
652            ) from None

Create a final Response URI the user is redirected to.

def create_implicit_response( self, code: authentik.providers.oauth2.models.AuthorizationCode | None) -> dict:
654    def create_implicit_response(self, code: AuthorizationCode | None) -> dict:
655        """Create implicit response's URL Fragment dictionary"""
656        query_fragment = {}
657        auth_event = get_login_event(self.request)
658
659        now = timezone.now()
660        access_token_expiry = now + timedelta_from_string(self.provider.access_token_validity)
661        token = AccessToken(
662            user=self.request.user,
663            scope=self.params.scope,
664            expires=access_token_expiry,
665            provider=self.provider,
666            auth_time=auth_event.created if auth_event else now,
667            session=self.request.session["authenticatedsession"],
668        )
669
670        id_token = IDToken.new(self.provider, token, self.request)
671        id_token.nonce = self.params.nonce
672
673        if self.params.response_type in [
674            ResponseTypes.CODE_ID_TOKEN,
675            ResponseTypes.CODE_ID_TOKEN_TOKEN,
676        ]:
677            id_token.c_hash = code.c_hash
678        token.id_token = id_token
679
680        # Check if response_type must include access_token in the response.
681        if self.params.response_type in [
682            ResponseTypes.ID_TOKEN_TOKEN,
683            ResponseTypes.CODE_ID_TOKEN_TOKEN,
684            ResponseTypes.CODE_TOKEN,
685        ]:
686            query_fragment["access_token"] = token.token
687            # Get at_hash of the current token and update the id_token
688            id_token.at_hash = token.at_hash
689
690        # Check if response_type must include id_token in the response.
691        if self.params.response_type in [
692            ResponseTypes.ID_TOKEN,
693            ResponseTypes.ID_TOKEN_TOKEN,
694            ResponseTypes.CODE_ID_TOKEN,
695            ResponseTypes.CODE_ID_TOKEN_TOKEN,
696        ]:
697            query_fragment["id_token"] = self.provider.encode(id_token.to_dict())
698            token._id_token = dumps(id_token.to_dict())
699
700        token.save()
701
702        # Code parameter must be present if it's Hybrid Flow.
703        if self.params.grant_type == GrantType.HYBRID:
704            query_fragment["code"] = code.code
705
706        query_fragment["token_type"] = TOKEN_TYPE
707        query_fragment["expires_in"] = int(
708            timedelta_from_string(self.provider.access_token_validity).total_seconds()
709        )
710        query_fragment["state"] = self.params.state if self.params.state else ""
711        return query_fragment

Create implicit response's URL Fragment dictionary