authentik.providers.oauth2.views.authorize

authentik OAuth2 Authorization views

  1"""authentik OAuth2 Authorization views"""
  2
  3from dataclasses import InitVar, dataclass, field
  4from datetime import timedelta
  5from json import dumps
  6from re import error as RegexError
  7from re import fullmatch
  8from typing import Any
  9from urllib.parse import parse_qs, quote, urlencode, urlparse, urlsplit, urlunparse, urlunsplit
 10from uuid import uuid4
 11
 12from django.conf import settings
 13from django.http import HttpRequest, HttpResponse, HttpResponseRedirect
 14from django.http.response import Http404, HttpResponseBadRequest
 15from django.shortcuts import get_object_or_404
 16from django.utils import timezone, translation
 17from django.utils.translation import gettext as _
 18from structlog.stdlib import get_logger
 19
 20from authentik.common.oauth.constants import (
 21    PKCE_METHOD_PLAIN,
 22    PKCE_METHOD_S256,
 23    PROMPT_CONSENT,
 24    PROMPT_LOGIN,
 25    PROMPT_NONE,
 26    QS_LOGIN_HINT,
 27    SCOPE_GITHUB,
 28    SCOPE_OFFLINE_ACCESS,
 29    SCOPE_OPENID,
 30    TOKEN_TYPE,
 31    UI_LOCALES,
 32)
 33from authentik.core.models import Application
 34from authentik.events.models import Event, EventAction
 35from authentik.events.signals import get_login_event
 36from authentik.flows.challenge import (
 37    PLAN_CONTEXT_TITLE,
 38    AutosubmitChallenge,
 39    HttpChallengeResponse,
 40)
 41from authentik.flows.exceptions import FlowNonApplicableException
 42from authentik.flows.models import Flow, in_memory_stage
 43from authentik.flows.planner import PLAN_CONTEXT_APPLICATION, PLAN_CONTEXT_SSO, FlowPlanner
 44from authentik.flows.stage import PLAN_CONTEXT_PENDING_USER_IDENTIFIER, StageView
 45from authentik.lib.utils.time import timedelta_from_string
 46from authentik.lib.views import bad_request_message
 47from authentik.policies.types import PolicyRequest
 48from authentik.policies.views import PolicyAccessView, RequestValidationError
 49from authentik.providers.oauth2.errors import (
 50    AuthorizeError,
 51    ClientIdError,
 52    OAuth2Error,
 53    RedirectUriError,
 54)
 55from authentik.providers.oauth2.id_token import IDToken
 56from authentik.providers.oauth2.models import (
 57    AccessToken,
 58    AuthorizationCode,
 59    GrantTypes,
 60    OAuth2Provider,
 61    RedirectURI,
 62    RedirectURIMatchingMode,
 63    ResponseMode,
 64    ResponseTypes,
 65    ScopeMapping,
 66)
 67from authentik.providers.oauth2.utils import HttpResponseRedirectScheme
 68from authentik.providers.oauth2.views.userinfo import UserInfoView
 69from authentik.stages.consent.models import ConsentMode, ConsentStage
 70from authentik.stages.consent.stage import (
 71    PLAN_CONTEXT_CONSENT_HEADER,
 72    PLAN_CONTEXT_CONSENT_PERMISSIONS,
 73)
 74
 75LOGGER = get_logger()
 76
 77PLAN_CONTEXT_PARAMS = "goauthentik.io/providers/oauth2/params"
 78SESSION_KEY_LAST_LOGIN_UID = "authentik/providers/oauth2/last_login_uid"
 79
 80ALLOWED_PROMPT_PARAMS = {PROMPT_NONE, PROMPT_CONSENT, PROMPT_LOGIN}
 81FORBIDDEN_URI_SCHEMES = {"javascript", "data", "vbscript"}
 82
 83
 84@dataclass(slots=True)
 85class OAuthAuthorizationParams:
 86    """Parameters required to authorize an OAuth Client"""
 87
 88    client_id: str
 89    redirect_uri: str
 90    response_type: str
 91    response_mode: str | None
 92    scope: set[str]
 93    state: str
 94    nonce: str | None
 95    prompt: set[str]
 96    grant_type: str
 97
 98    provider: OAuth2Provider = field(default_factory=OAuth2Provider)
 99
100    request: str | None = None
101
102    max_age: int | None = None
103
104    code_challenge: str | None = None
105    code_challenge_method: str | None = None
106
107    github_compat: InitVar[bool] = False
108
109    @staticmethod
110    def from_request(request: HttpRequest, github_compat=False) -> OAuthAuthorizationParams:
111        """
112        Get all the params used by the Authorization Code Flow
113        (and also for the Implicit and Hybrid).
114
115        See: http://openid.net/specs/openid-connect-core-1_0.html#AuthRequest
116        """
117        # Because in this endpoint we handle both GET
118        # and POST request.
119        query_dict = request.POST if request.method == "POST" else request.GET
120        state = query_dict.get("state")
121        redirect_uri = query_dict.get("redirect_uri", "")
122
123        response_type = query_dict.get("response_type", "")
124
125        # Validate and check the response_mode against the predefined dict
126        # Set to Query or Fragment if not defined in request
127        response_mode = query_dict.get("response_mode", False)
128
129        max_age = query_dict.get("max_age")
130        return OAuthAuthorizationParams(
131            client_id=query_dict.get("client_id", ""),
132            redirect_uri=redirect_uri,
133            response_type=response_type,
134            response_mode=response_mode,
135            grant_type="",
136            scope=set(query_dict.get("scope", "").split()),
137            state=state,
138            nonce=query_dict.get("nonce"),
139            prompt=ALLOWED_PROMPT_PARAMS.intersection(set(query_dict.get("prompt", "").split())),
140            request=query_dict.get("request", None),
141            max_age=int(max_age) if max_age else None,
142            code_challenge=query_dict.get("code_challenge"),
143            code_challenge_method=query_dict.get("code_challenge_method", "plain"),
144            github_compat=github_compat,
145        )
146
147    def __post_init__(self, github_compat=False):
148        self.provider: OAuth2Provider = OAuth2Provider.objects.filter(
149            client_id=self.client_id
150        ).first()
151        if not self.provider:
152            LOGGER.warning("Invalid client identifier", client_id=self.client_id)
153            raise ClientIdError(client_id=self.client_id)
154        self.check_redirect_uri()
155        self.check_grant()
156        self.check_scope(github_compat)
157        if self.request:
158            raise AuthorizeError(
159                self.redirect_uri, "request_not_supported", self.grant_type, self.state
160            )
161        self.check_nonce()
162        self.check_code_challenge()
163
164    def check_grant(self):
165        """Check grant"""
166        # Determine which flow to use.
167        if self.response_type in [ResponseTypes.CODE]:
168            self.grant_type = GrantTypes.AUTHORIZATION_CODE
169        elif self.response_type in [
170            ResponseTypes.ID_TOKEN,
171            ResponseTypes.ID_TOKEN_TOKEN,
172        ]:
173            self.grant_type = GrantTypes.IMPLICIT
174        elif self.response_type in [
175            ResponseTypes.CODE_TOKEN,
176            ResponseTypes.CODE_ID_TOKEN,
177            ResponseTypes.CODE_ID_TOKEN_TOKEN,
178        ]:
179            self.grant_type = GrantTypes.HYBRID
180
181        # Grant type validation.
182        if not self.grant_type:
183            LOGGER.warning("Invalid response type", type=self.response_type)
184            raise AuthorizeError(self.redirect_uri, "unsupported_response_type", "", self.state)
185
186        if self.response_mode not in ResponseMode.values:
187            self.response_mode = ResponseMode.QUERY
188
189            if self.grant_type in [GrantTypes.IMPLICIT, GrantTypes.HYBRID]:
190                self.response_mode = ResponseMode.FRAGMENT
191
192    def check_redirect_uri(self):
193        """Redirect URI validation."""
194        allowed_redirect_urls = self.provider.redirect_uris
195        if not self.redirect_uri:
196            LOGGER.warning("Missing redirect uri.")
197            raise RedirectUriError("", allowed_redirect_urls).with_cause("redirect_uri_missing")
198
199        if len(allowed_redirect_urls) < 1:
200            LOGGER.info("Setting redirect for blank redirect_uris", redirect=self.redirect_uri)
201            self.provider.redirect_uris = [
202                RedirectURI(RedirectURIMatchingMode.STRICT, self.redirect_uri)
203            ]
204            self.provider.save()
205            allowed_redirect_urls = self.provider.redirect_uris
206
207        match_found = False
208        for allowed in allowed_redirect_urls:
209            if allowed.matching_mode == RedirectURIMatchingMode.STRICT:
210                if self.redirect_uri == allowed.url:
211                    match_found = True
212                    break
213            if allowed.matching_mode == RedirectURIMatchingMode.REGEX:
214                try:
215                    if fullmatch(allowed.url, self.redirect_uri):
216                        match_found = True
217                        break
218                except RegexError as exc:
219                    LOGGER.warning(
220                        "Failed to parse regular expression",
221                        exc=exc,
222                        url=allowed.url,
223                        provider=self.provider,
224                    )
225        if not match_found:
226            raise RedirectUriError(self.redirect_uri, allowed_redirect_urls).with_cause(
227                "redirect_uri_no_match"
228            )
229        # Check against forbidden schemes
230        if urlparse(self.redirect_uri).scheme in FORBIDDEN_URI_SCHEMES:
231            raise RedirectUriError(self.redirect_uri, allowed_redirect_urls).with_cause(
232                "redirect_uri_forbidden_scheme"
233            )
234
235    def check_scope(self, github_compat=False):
236        """Ensure openid scope is set in Hybrid flows, or when requesting an id_token"""
237        default_scope_names = set(
238            ScopeMapping.objects.filter(provider__in=[self.provider]).values_list(
239                "scope_name", flat=True
240            )
241        )
242        if len(self.scope) == 0:
243            self.scope = default_scope_names
244            LOGGER.info(
245                "No scopes requested, defaulting to all configured scopes", scopes=self.scope
246            )
247        scopes_to_check = self.scope
248        if github_compat:
249            scopes_to_check = self.scope - SCOPE_GITHUB
250        if not scopes_to_check.issubset(default_scope_names):
251            LOGGER.info(
252                "Application requested scopes not configured, setting to overlap",
253                scope_allowed=default_scope_names,
254                scope_given=self.scope,
255            )
256            self.scope = self.scope.intersection(default_scope_names)
257        if SCOPE_OPENID not in self.scope and (
258            self.grant_type == GrantTypes.HYBRID
259            or self.response_type in [ResponseTypes.ID_TOKEN, ResponseTypes.ID_TOKEN_TOKEN]
260        ):
261            LOGGER.warning("Missing 'openid' scope.")
262            raise AuthorizeError(
263                self.redirect_uri, "invalid_scope", self.grant_type, self.state
264            ).with_cause("scope_openid_missing")
265        if SCOPE_OFFLINE_ACCESS in self.scope:
266            # https://openid.net/specs/openid-connect-core-1_0.html#OfflineAccess
267            # Don't explicitly request consent with offline_access, as the spec allows for
268            # "other conditions for processing the request permitting offline access to the
269            # requested resources are in place"
270            # which we interpret as "the admin picks an authorization flow with or without consent"
271            if self.response_type not in [
272                ResponseTypes.CODE,
273                ResponseTypes.CODE_TOKEN,
274                ResponseTypes.CODE_ID_TOKEN,
275                ResponseTypes.CODE_ID_TOKEN_TOKEN,
276            ]:
277                # offline_access requires a response type that has some sort of token
278                # Spec says to ignore the scope when the response_type wouldn't result
279                # in an authorization code being generated
280                self.scope.remove(SCOPE_OFFLINE_ACCESS)
281
282    def check_nonce(self):
283        """Nonce parameter validation."""
284        # nonce is required for all flows that return an id_token from the authorization endpoint,
285        # see https://openid.net/specs/openid-connect-core-1_0.html#ImplicitAuthRequest or
286        # https://openid.net/specs/openid-connect-core-1_0.html#HybridIDToken and
287        # https://bitbucket.org/openid/connect/issues/972/nonce-requirement-in-hybrid-auth-request
288        if self.response_type not in [
289            ResponseTypes.ID_TOKEN,
290            ResponseTypes.ID_TOKEN_TOKEN,
291            ResponseTypes.CODE_ID_TOKEN,
292            ResponseTypes.CODE_ID_TOKEN_TOKEN,
293        ]:
294            return
295        if SCOPE_OPENID not in self.scope:
296            return
297        if not self.nonce:
298            LOGGER.warning("Missing nonce for OpenID Request")
299            raise AuthorizeError(
300                self.redirect_uri, "invalid_request", self.grant_type, self.state
301            ).with_cause("nonce_missing")
302
303    def check_code_challenge(self):
304        """PKCE validation of the transformation method."""
305        if self.code_challenge and self.code_challenge_method not in [
306            PKCE_METHOD_PLAIN,
307            PKCE_METHOD_S256,
308        ]:
309            raise AuthorizeError(
310                self.redirect_uri,
311                "invalid_request",
312                self.grant_type,
313                self.state,
314                f"Unsupported challenge method {self.code_challenge_method}",
315            )
316
317    def create_code(self, request: HttpRequest) -> AuthorizationCode:
318        """Create an AuthorizationCode object for the request"""
319        auth_event = get_login_event(request)
320
321        now = timezone.now()
322
323        code = AuthorizationCode(
324            user=request.user,
325            provider=self.provider,
326            auth_time=auth_event.created if auth_event else now,
327            code=uuid4().hex,
328            expires=now + timedelta_from_string(self.provider.access_code_validity),
329            scope=self.scope,
330            nonce=self.nonce,
331            session=request.session["authenticatedsession"],
332        )
333
334        if self.code_challenge and self.code_challenge_method:
335            code.code_challenge = self.code_challenge
336            code.code_challenge_method = self.code_challenge_method
337
338        return code
339
340
341class AuthorizationFlowInitView(PolicyAccessView):
342    """OAuth2 Flow initializer, checks access to application and starts flow"""
343
344    params: OAuthAuthorizationParams
345    # Enable GitHub compatibility (only allow for scopes which are handled
346    # differently for github compat)
347    github_compat = False
348
349    def pre_permission_check(self):
350        """Check prompt parameter before checking permission/authentication,
351        see https://openid.net/specs/openid-connect-core-1_0.html#rfc.section.3.1.2.6"""
352        # Quick sanity check at the beginning to prevent event spamming
353        if len(self.request.GET) < 1:
354            raise Http404
355        try:
356            self.params = OAuthAuthorizationParams.from_request(
357                self.request, github_compat=self.github_compat
358            )
359        except AuthorizeError as error:
360            LOGGER.warning(error.description, redirect_uri=error.redirect_uri, cause=error.cause)
361            raise RequestValidationError(error.get_response(self.request)) from None
362        except OAuth2Error as error:
363            LOGGER.warning(error.description, cause=error.cause)
364            raise RequestValidationError(
365                bad_request_message(self.request, error.description, title=error.error)
366            ) from None
367        except OAuth2Provider.DoesNotExist:
368            raise Http404 from None
369        if PROMPT_NONE in self.params.prompt and not self.request.user.is_authenticated:
370            # When "prompt" is set to "none" but the user is not logged in, show an error message
371            error = AuthorizeError(
372                self.params.redirect_uri,
373                "login_required",
374                self.params.grant_type,
375                self.params.state,
376            )
377            raise RequestValidationError(error.get_response(self.request))
378
379    def resolve_provider_application(self):
380        client_id = self.request.GET.get("client_id")
381        self.provider = get_object_or_404(OAuth2Provider, client_id=client_id)
382        self.application = self.provider.application
383
384    def modify_flow_context(self, flow: Flow, context: dict[str, Any]) -> dict[str, Any]:
385        if QS_LOGIN_HINT in self.request.GET:
386            context[PLAN_CONTEXT_PENDING_USER_IDENTIFIER] = self.request.GET.get(QS_LOGIN_HINT)
387        return super().modify_flow_context(flow, context)
388
389    def modify_policy_request(self, request: PolicyRequest) -> PolicyRequest:
390        request.context["oauth_scopes"] = self.params.scope
391        request.context["oauth_grant_type"] = self.params.grant_type
392        request.context["oauth_code_challenge"] = self.params.code_challenge
393        request.context["oauth_code_challenge_method"] = self.params.code_challenge_method
394        request.context["oauth_max_age"] = self.params.max_age
395        request.context["oauth_redirect_uri"] = self.params.redirect_uri
396        request.context["oauth_response_type"] = self.params.response_type
397        return request
398
399    def dispatch_with_language(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
400        """Activate language from OIDC specific ui_locales parameter, picking the earliest one
401        available"""
402        selected_language = None
403        if UI_LOCALES in self.request.GET:
404            languages = str(self.request.GET[UI_LOCALES]).split(" ")
405            for language in languages:
406                if translation.check_for_language(language):
407                    selected_language = translation.get_supported_language_variant(language)
408                    LOGGER.debug(
409                        "Activating language from oidc ui_locales", locale=selected_language
410                    )
411                    break
412            translation.activate(selected_language)
413        response = super().dispatch(request, *args, **kwargs)
414        if selected_language:
415            response.set_cookie(
416                settings.LANGUAGE_COOKIE_NAME,
417                selected_language,
418                max_age=settings.LANGUAGE_COOKIE_AGE,
419                path=settings.LANGUAGE_COOKIE_PATH,
420                domain=settings.LANGUAGE_COOKIE_DOMAIN,
421                secure=settings.LANGUAGE_COOKIE_SECURE,
422                httponly=settings.LANGUAGE_COOKIE_HTTPONLY,
423                samesite=settings.LANGUAGE_COOKIE_SAMESITE,
424            )
425            if isinstance(response, HttpResponseRedirect):
426                parsed_url = urlparse(response.url)
427                args = parse_qs(parsed_url.query)
428                args["locale"] = selected_language
429                response["Location"] = urlunparse(
430                    parsed_url._replace(query=urlencode(args, quote_via=quote, doseq=True))
431                )
432        return response
433
434    def dispatch(self, request: HttpRequest, *args, **kwargs):
435        # Activate language before parsing params (error messages should be localized)
436        return self.dispatch_with_language(request, *args, **kwargs)
437
438    def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
439        """Start FlowPLanner, return to flow executor shell"""
440        # Require a login event to be set, otherwise make the user re-login
441        login_event = get_login_event(request)
442        if not login_event:
443            LOGGER.warning("request with no login event")
444            return self.handle_no_permission()
445        login_uid = str(login_event.pk)
446        # After we've checked permissions, and the user has access, check if we need
447        # to re-authenticate the user
448        if self.params.max_age:
449            # Attempt to check via the session's login event if set, otherwise we can't
450            # check
451            login_time = login_event.created
452            current_age: timedelta = timezone.now() - login_time
453            if current_age.total_seconds() > self.params.max_age:
454                LOGGER.debug(
455                    "Triggering authentication as max_age requirement",
456                    max_age=self.params.max_age,
457                    ago=int(current_age.total_seconds()),
458                )
459                # Since we already need to re-authenticate the user, set the old login UID
460                # in case this request has both max_age and prompt=login
461                self.request.session[SESSION_KEY_LAST_LOGIN_UID] = login_uid
462                return self.handle_no_permission()
463        # If prompt=login, we need to re-authenticate the user regardless
464        # Check if we're not already doing the re-authentication
465        if PROMPT_LOGIN in self.params.prompt:
466            # No previous login UID saved, so save the current uid and trigger
467            # re-login, or previous login UID matches current one, so no re-login happened yet
468            if (
469                SESSION_KEY_LAST_LOGIN_UID not in self.request.session
470                or login_uid == self.request.session[SESSION_KEY_LAST_LOGIN_UID]
471            ):
472                self.request.session[SESSION_KEY_LAST_LOGIN_UID] = login_uid
473                return self.handle_no_permission()
474        scope_descriptions = UserInfoView().get_scope_descriptions(
475            self.params.scope, self.params.provider
476        )
477        # Regardless, we start the planner and return to it
478        planner = FlowPlanner(self.provider.authorization_flow)
479        planner.allow_empty_flows = True
480        try:
481            plan = planner.plan(
482                self.request,
483                {
484                    PLAN_CONTEXT_SSO: True,
485                    PLAN_CONTEXT_APPLICATION: self.application,
486                    # OAuth2 related params
487                    PLAN_CONTEXT_PARAMS: self.params,
488                    # Consent related params
489                    PLAN_CONTEXT_CONSENT_HEADER: _("You're about to sign into %(application)s.")
490                    % {"application": self.application.name},
491                    PLAN_CONTEXT_CONSENT_PERMISSIONS: scope_descriptions,
492                },
493            )
494        except FlowNonApplicableException:
495            return self.handle_no_permission_authenticated()
496        # OpenID clients can specify a `prompt` parameter, and if its set to consent we
497        # need to inject a consent stage
498        if PROMPT_CONSENT in self.params.prompt:
499            if not any(isinstance(x.stage, ConsentStage) for x in plan.bindings):
500                # Plan does not have any consent stage, so we add an in-memory one
501                stage = ConsentStage(
502                    name="OAuth2 Provider In-memory consent stage",
503                    mode=ConsentMode.ALWAYS_REQUIRE,
504                )
505                plan.append_stage(stage)
506
507        plan.append_stage(in_memory_stage(OAuthFulfillmentStage))
508
509        return plan.to_redirect(
510            self.request,
511            self.provider.authorization_flow,
512            # We can only skip the flow executor and directly go to the final redirect URL if
513            #  we can submit the data to the RP via URL
514            allowed_silent_types=(
515                [OAuthFulfillmentStage]
516                if self.params.response_mode in [ResponseMode.QUERY, ResponseMode.FRAGMENT]
517                else []
518            ),
519        )
520
521
522class OAuthFulfillmentStage(StageView):
523    """Final stage, restores params from Flow."""
524
525    params: OAuthAuthorizationParams
526    provider: OAuth2Provider
527    application: Application
528
529    def redirect(self, uri: str) -> HttpResponse:
530        """Redirect using HttpResponseRedirectScheme, compatible with non-http schemes"""
531        parsed = urlparse(uri)
532
533        if self.params.response_mode == ResponseMode.FORM_POST:
534            # parse_qs returns a dictionary with values wrapped in lists, however
535            # we need a flat dictionary for the autosubmit challenge
536
537            # this picks the first item in the list if the value is a list,
538            # otherwise just the value as-is
539            query_params = dict(
540                (k, v[0] if isinstance(v, list) else v) for k, v in parse_qs(parsed.query).items()
541            )
542
543            challenge = AutosubmitChallenge(
544                data={
545                    "component": "ak-stage-autosubmit",
546                    "title": self.executor.plan.context.get(
547                        PLAN_CONTEXT_TITLE,
548                        _("Redirecting to {app}...".format_map({"app": self.application.name})),
549                    ),
550                    "url": self.params.redirect_uri,
551                    "attrs": query_params,
552                }
553            )
554
555            challenge.is_valid()
556            self.executor.stage_ok()
557            return HttpChallengeResponse(
558                challenge=challenge,
559            )
560        self.executor.stage_ok()
561        return HttpResponseRedirectScheme(uri, allowed_schemes=[parsed.scheme])
562
563    def post(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
564        """Wrapper when this stage gets hit with a post request"""
565        return self.get(request, *args, **kwargs)
566
567    def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
568        """final Stage of an OAuth2 Flow"""
569        if PLAN_CONTEXT_PARAMS not in self.executor.plan.context:
570            LOGGER.warning("Got to fulfillment stage with no pending context")
571            return HttpResponseBadRequest()
572        self.params: OAuthAuthorizationParams = self.executor.plan.context.pop(PLAN_CONTEXT_PARAMS)
573        self.application: Application = self.executor.plan.context.pop(PLAN_CONTEXT_APPLICATION)
574        self.provider = get_object_or_404(OAuth2Provider, pk=self.application.provider_id)
575        try:
576            # At this point we don't need to check permissions anymore
577            if {PROMPT_NONE, PROMPT_CONSENT}.issubset(self.params.prompt):
578                raise AuthorizeError(
579                    self.params.redirect_uri,
580                    "consent_required",
581                    self.params.grant_type,
582                    self.params.state,
583                )
584            Event.new(
585                EventAction.AUTHORIZE_APPLICATION,
586                authorized_application=self.application,
587                flow=self.executor.plan.flow_pk,
588                scopes=" ".join(self.params.scope),
589            ).from_http(self.request)
590            return self.redirect(self.create_response_uri())
591        except (ClientIdError, RedirectUriError) as error:
592            error.to_event(application=self.application).from_http(request)
593            self.executor.stage_invalid()
594
595            return bad_request_message(request, error.description, title=error.error)
596        except AuthorizeError as error:
597            error.to_event(application=self.application).from_http(request)
598            self.executor.stage_invalid()
599            return error.get_response(self.request)
600
601    def create_response_uri(self) -> str:
602        """Create a final Response URI the user is redirected to."""
603        uri = urlsplit(self.params.redirect_uri)
604
605        try:
606            code = None
607
608            if self.params.grant_type in [
609                GrantTypes.AUTHORIZATION_CODE,
610                GrantTypes.HYBRID,
611            ]:
612                code = self.params.create_code(self.request)
613                code.save()
614
615            if self.params.response_mode == ResponseMode.QUERY:
616                query_params = parse_qs(uri.query)
617                query_params["code"] = code.code
618                query_params["state"] = [str(self.params.state) if self.params.state else ""]
619
620                uri = uri._replace(query=urlencode(query_params, doseq=True))
621                return urlunsplit(uri)
622
623            if self.params.response_mode == ResponseMode.FRAGMENT:
624                query_fragment = {}
625                if self.params.grant_type in [GrantTypes.AUTHORIZATION_CODE]:
626                    query_fragment["code"] = code.code
627                    query_fragment["state"] = [str(self.params.state) if self.params.state else ""]
628                else:
629                    query_fragment = self.create_implicit_response(code)
630
631                uri = uri._replace(
632                    fragment=uri.fragment + urlencode(query_fragment, doseq=True),
633                )
634
635                return urlunsplit(uri)
636
637            if self.params.response_mode == ResponseMode.FORM_POST:
638                post_params = {}
639                if self.params.grant_type in [GrantTypes.AUTHORIZATION_CODE]:
640                    post_params["code"] = code.code
641                    post_params["state"] = [str(self.params.state) if self.params.state else ""]
642                else:
643                    post_params = self.create_implicit_response(code)
644
645                uri = uri._replace(query=urlencode(post_params, doseq=True))
646
647                return urlunsplit(uri)
648
649            raise OAuth2Error()
650        except OAuth2Error as error:
651            LOGGER.warning("Error when trying to create response uri", error=error)
652            raise AuthorizeError(
653                self.params.redirect_uri,
654                "server_error",
655                self.params.grant_type,
656                self.params.state,
657            ) from None
658
659    def create_implicit_response(self, code: AuthorizationCode | None) -> dict:
660        """Create implicit response's URL Fragment dictionary"""
661        query_fragment = {}
662        auth_event = get_login_event(self.request)
663
664        now = timezone.now()
665        access_token_expiry = now + timedelta_from_string(self.provider.access_token_validity)
666        token = AccessToken(
667            user=self.request.user,
668            scope=self.params.scope,
669            expires=access_token_expiry,
670            provider=self.provider,
671            auth_time=auth_event.created if auth_event else now,
672            session=self.request.session["authenticatedsession"],
673        )
674
675        id_token = IDToken.new(self.provider, token, self.request)
676        id_token.nonce = self.params.nonce
677
678        if self.params.response_type in [
679            ResponseTypes.CODE_ID_TOKEN,
680            ResponseTypes.CODE_ID_TOKEN_TOKEN,
681        ]:
682            id_token.c_hash = code.c_hash
683        token.id_token = id_token
684
685        # Check if response_type must include access_token in the response.
686        if self.params.response_type in [
687            ResponseTypes.ID_TOKEN_TOKEN,
688            ResponseTypes.CODE_ID_TOKEN_TOKEN,
689            ResponseTypes.CODE_TOKEN,
690        ]:
691            query_fragment["access_token"] = token.token
692            # Get at_hash of the current token and update the id_token
693            id_token.at_hash = token.at_hash
694
695        # Check if response_type must include id_token in the response.
696        if self.params.response_type in [
697            ResponseTypes.ID_TOKEN,
698            ResponseTypes.ID_TOKEN_TOKEN,
699            ResponseTypes.CODE_ID_TOKEN,
700            ResponseTypes.CODE_ID_TOKEN_TOKEN,
701        ]:
702            query_fragment["id_token"] = self.provider.encode(id_token.to_dict())
703            token._id_token = dumps(id_token.to_dict())
704
705        token.save()
706
707        # Code parameter must be present if it's Hybrid Flow.
708        if self.params.grant_type == GrantTypes.HYBRID:
709            query_fragment["code"] = code.code
710
711        query_fragment["token_type"] = TOKEN_TYPE
712        query_fragment["expires_in"] = int(
713            timedelta_from_string(self.provider.access_token_validity).total_seconds()
714        )
715        query_fragment["state"] = self.params.state if self.params.state else ""
716        return query_fragment
LOGGER = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
PLAN_CONTEXT_PARAMS = 'goauthentik.io/providers/oauth2/params'
SESSION_KEY_LAST_LOGIN_UID = 'authentik/providers/oauth2/last_login_uid'
ALLOWED_PROMPT_PARAMS = {'consent', 'none', 'login'}
FORBIDDEN_URI_SCHEMES = {'data', 'vbscript', 'javascript'}
@dataclass(slots=True)
class OAuthAuthorizationParams:
 85@dataclass(slots=True)
 86class OAuthAuthorizationParams:
 87    """Parameters required to authorize an OAuth Client"""
 88
 89    client_id: str
 90    redirect_uri: str
 91    response_type: str
 92    response_mode: str | None
 93    scope: set[str]
 94    state: str
 95    nonce: str | None
 96    prompt: set[str]
 97    grant_type: str
 98
 99    provider: OAuth2Provider = field(default_factory=OAuth2Provider)
100
101    request: str | None = None
102
103    max_age: int | None = None
104
105    code_challenge: str | None = None
106    code_challenge_method: str | None = None
107
108    github_compat: InitVar[bool] = False
109
110    @staticmethod
111    def from_request(request: HttpRequest, github_compat=False) -> OAuthAuthorizationParams:
112        """
113        Get all the params used by the Authorization Code Flow
114        (and also for the Implicit and Hybrid).
115
116        See: http://openid.net/specs/openid-connect-core-1_0.html#AuthRequest
117        """
118        # Because in this endpoint we handle both GET
119        # and POST request.
120        query_dict = request.POST if request.method == "POST" else request.GET
121        state = query_dict.get("state")
122        redirect_uri = query_dict.get("redirect_uri", "")
123
124        response_type = query_dict.get("response_type", "")
125
126        # Validate and check the response_mode against the predefined dict
127        # Set to Query or Fragment if not defined in request
128        response_mode = query_dict.get("response_mode", False)
129
130        max_age = query_dict.get("max_age")
131        return OAuthAuthorizationParams(
132            client_id=query_dict.get("client_id", ""),
133            redirect_uri=redirect_uri,
134            response_type=response_type,
135            response_mode=response_mode,
136            grant_type="",
137            scope=set(query_dict.get("scope", "").split()),
138            state=state,
139            nonce=query_dict.get("nonce"),
140            prompt=ALLOWED_PROMPT_PARAMS.intersection(set(query_dict.get("prompt", "").split())),
141            request=query_dict.get("request", None),
142            max_age=int(max_age) if max_age else None,
143            code_challenge=query_dict.get("code_challenge"),
144            code_challenge_method=query_dict.get("code_challenge_method", "plain"),
145            github_compat=github_compat,
146        )
147
148    def __post_init__(self, github_compat=False):
149        self.provider: OAuth2Provider = OAuth2Provider.objects.filter(
150            client_id=self.client_id
151        ).first()
152        if not self.provider:
153            LOGGER.warning("Invalid client identifier", client_id=self.client_id)
154            raise ClientIdError(client_id=self.client_id)
155        self.check_redirect_uri()
156        self.check_grant()
157        self.check_scope(github_compat)
158        if self.request:
159            raise AuthorizeError(
160                self.redirect_uri, "request_not_supported", self.grant_type, self.state
161            )
162        self.check_nonce()
163        self.check_code_challenge()
164
165    def check_grant(self):
166        """Check grant"""
167        # Determine which flow to use.
168        if self.response_type in [ResponseTypes.CODE]:
169            self.grant_type = GrantTypes.AUTHORIZATION_CODE
170        elif self.response_type in [
171            ResponseTypes.ID_TOKEN,
172            ResponseTypes.ID_TOKEN_TOKEN,
173        ]:
174            self.grant_type = GrantTypes.IMPLICIT
175        elif self.response_type in [
176            ResponseTypes.CODE_TOKEN,
177            ResponseTypes.CODE_ID_TOKEN,
178            ResponseTypes.CODE_ID_TOKEN_TOKEN,
179        ]:
180            self.grant_type = GrantTypes.HYBRID
181
182        # Grant type validation.
183        if not self.grant_type:
184            LOGGER.warning("Invalid response type", type=self.response_type)
185            raise AuthorizeError(self.redirect_uri, "unsupported_response_type", "", self.state)
186
187        if self.response_mode not in ResponseMode.values:
188            self.response_mode = ResponseMode.QUERY
189
190            if self.grant_type in [GrantTypes.IMPLICIT, GrantTypes.HYBRID]:
191                self.response_mode = ResponseMode.FRAGMENT
192
193    def check_redirect_uri(self):
194        """Redirect URI validation."""
195        allowed_redirect_urls = self.provider.redirect_uris
196        if not self.redirect_uri:
197            LOGGER.warning("Missing redirect uri.")
198            raise RedirectUriError("", allowed_redirect_urls).with_cause("redirect_uri_missing")
199
200        if len(allowed_redirect_urls) < 1:
201            LOGGER.info("Setting redirect for blank redirect_uris", redirect=self.redirect_uri)
202            self.provider.redirect_uris = [
203                RedirectURI(RedirectURIMatchingMode.STRICT, self.redirect_uri)
204            ]
205            self.provider.save()
206            allowed_redirect_urls = self.provider.redirect_uris
207
208        match_found = False
209        for allowed in allowed_redirect_urls:
210            if allowed.matching_mode == RedirectURIMatchingMode.STRICT:
211                if self.redirect_uri == allowed.url:
212                    match_found = True
213                    break
214            if allowed.matching_mode == RedirectURIMatchingMode.REGEX:
215                try:
216                    if fullmatch(allowed.url, self.redirect_uri):
217                        match_found = True
218                        break
219                except RegexError as exc:
220                    LOGGER.warning(
221                        "Failed to parse regular expression",
222                        exc=exc,
223                        url=allowed.url,
224                        provider=self.provider,
225                    )
226        if not match_found:
227            raise RedirectUriError(self.redirect_uri, allowed_redirect_urls).with_cause(
228                "redirect_uri_no_match"
229            )
230        # Check against forbidden schemes
231        if urlparse(self.redirect_uri).scheme in FORBIDDEN_URI_SCHEMES:
232            raise RedirectUriError(self.redirect_uri, allowed_redirect_urls).with_cause(
233                "redirect_uri_forbidden_scheme"
234            )
235
236    def check_scope(self, github_compat=False):
237        """Ensure openid scope is set in Hybrid flows, or when requesting an id_token"""
238        default_scope_names = set(
239            ScopeMapping.objects.filter(provider__in=[self.provider]).values_list(
240                "scope_name", flat=True
241            )
242        )
243        if len(self.scope) == 0:
244            self.scope = default_scope_names
245            LOGGER.info(
246                "No scopes requested, defaulting to all configured scopes", scopes=self.scope
247            )
248        scopes_to_check = self.scope
249        if github_compat:
250            scopes_to_check = self.scope - SCOPE_GITHUB
251        if not scopes_to_check.issubset(default_scope_names):
252            LOGGER.info(
253                "Application requested scopes not configured, setting to overlap",
254                scope_allowed=default_scope_names,
255                scope_given=self.scope,
256            )
257            self.scope = self.scope.intersection(default_scope_names)
258        if SCOPE_OPENID not in self.scope and (
259            self.grant_type == GrantTypes.HYBRID
260            or self.response_type in [ResponseTypes.ID_TOKEN, ResponseTypes.ID_TOKEN_TOKEN]
261        ):
262            LOGGER.warning("Missing 'openid' scope.")
263            raise AuthorizeError(
264                self.redirect_uri, "invalid_scope", self.grant_type, self.state
265            ).with_cause("scope_openid_missing")
266        if SCOPE_OFFLINE_ACCESS in self.scope:
267            # https://openid.net/specs/openid-connect-core-1_0.html#OfflineAccess
268            # Don't explicitly request consent with offline_access, as the spec allows for
269            # "other conditions for processing the request permitting offline access to the
270            # requested resources are in place"
271            # which we interpret as "the admin picks an authorization flow with or without consent"
272            if self.response_type not in [
273                ResponseTypes.CODE,
274                ResponseTypes.CODE_TOKEN,
275                ResponseTypes.CODE_ID_TOKEN,
276                ResponseTypes.CODE_ID_TOKEN_TOKEN,
277            ]:
278                # offline_access requires a response type that has some sort of token
279                # Spec says to ignore the scope when the response_type wouldn't result
280                # in an authorization code being generated
281                self.scope.remove(SCOPE_OFFLINE_ACCESS)
282
283    def check_nonce(self):
284        """Nonce parameter validation."""
285        # nonce is required for all flows that return an id_token from the authorization endpoint,
286        # see https://openid.net/specs/openid-connect-core-1_0.html#ImplicitAuthRequest or
287        # https://openid.net/specs/openid-connect-core-1_0.html#HybridIDToken and
288        # https://bitbucket.org/openid/connect/issues/972/nonce-requirement-in-hybrid-auth-request
289        if self.response_type not in [
290            ResponseTypes.ID_TOKEN,
291            ResponseTypes.ID_TOKEN_TOKEN,
292            ResponseTypes.CODE_ID_TOKEN,
293            ResponseTypes.CODE_ID_TOKEN_TOKEN,
294        ]:
295            return
296        if SCOPE_OPENID not in self.scope:
297            return
298        if not self.nonce:
299            LOGGER.warning("Missing nonce for OpenID Request")
300            raise AuthorizeError(
301                self.redirect_uri, "invalid_request", self.grant_type, self.state
302            ).with_cause("nonce_missing")
303
304    def check_code_challenge(self):
305        """PKCE validation of the transformation method."""
306        if self.code_challenge and self.code_challenge_method not in [
307            PKCE_METHOD_PLAIN,
308            PKCE_METHOD_S256,
309        ]:
310            raise AuthorizeError(
311                self.redirect_uri,
312                "invalid_request",
313                self.grant_type,
314                self.state,
315                f"Unsupported challenge method {self.code_challenge_method}",
316            )
317
318    def create_code(self, request: HttpRequest) -> AuthorizationCode:
319        """Create an AuthorizationCode object for the request"""
320        auth_event = get_login_event(request)
321
322        now = timezone.now()
323
324        code = AuthorizationCode(
325            user=request.user,
326            provider=self.provider,
327            auth_time=auth_event.created if auth_event else now,
328            code=uuid4().hex,
329            expires=now + timedelta_from_string(self.provider.access_code_validity),
330            scope=self.scope,
331            nonce=self.nonce,
332            session=request.session["authenticatedsession"],
333        )
334
335        if self.code_challenge and self.code_challenge_method:
336            code.code_challenge = self.code_challenge
337            code.code_challenge_method = self.code_challenge_method
338
339        return code

Parameters required to authorize an OAuth Client

OAuthAuthorizationParams( client_id: str, redirect_uri: str, response_type: str, response_mode: str | None, scope: set[str], state: str, nonce: str | None, prompt: set[str], grant_type: str, provider: authentik.providers.oauth2.models.OAuth2Provider = <factory>, request: str | None = None, max_age: int | None = None, code_challenge: str | None = None, code_challenge_method: str | None = None, github_compat: dataclasses.InitVar[bool] = False)
client_id: str
redirect_uri: str
response_type: str
response_mode: str | None
scope: set[str]
state: str
nonce: str | None
prompt: set[str]
grant_type: str
request: str | None
max_age: int | None
code_challenge: str | None
code_challenge_method: str | None
github_compat: dataclasses.InitVar[bool] = False
@staticmethod
def from_request( request: django.http.request.HttpRequest, github_compat=False) -> OAuthAuthorizationParams:
110    @staticmethod
111    def from_request(request: HttpRequest, github_compat=False) -> OAuthAuthorizationParams:
112        """
113        Get all the params used by the Authorization Code Flow
114        (and also for the Implicit and Hybrid).
115
116        See: http://openid.net/specs/openid-connect-core-1_0.html#AuthRequest
117        """
118        # Because in this endpoint we handle both GET
119        # and POST request.
120        query_dict = request.POST if request.method == "POST" else request.GET
121        state = query_dict.get("state")
122        redirect_uri = query_dict.get("redirect_uri", "")
123
124        response_type = query_dict.get("response_type", "")
125
126        # Validate and check the response_mode against the predefined dict
127        # Set to Query or Fragment if not defined in request
128        response_mode = query_dict.get("response_mode", False)
129
130        max_age = query_dict.get("max_age")
131        return OAuthAuthorizationParams(
132            client_id=query_dict.get("client_id", ""),
133            redirect_uri=redirect_uri,
134            response_type=response_type,
135            response_mode=response_mode,
136            grant_type="",
137            scope=set(query_dict.get("scope", "").split()),
138            state=state,
139            nonce=query_dict.get("nonce"),
140            prompt=ALLOWED_PROMPT_PARAMS.intersection(set(query_dict.get("prompt", "").split())),
141            request=query_dict.get("request", None),
142            max_age=int(max_age) if max_age else None,
143            code_challenge=query_dict.get("code_challenge"),
144            code_challenge_method=query_dict.get("code_challenge_method", "plain"),
145            github_compat=github_compat,
146        )

Get all the params used by the Authorization Code Flow (and also for the Implicit and Hybrid).

See: http://openid.net/specs/openid-connect-core-1_0.html#AuthRequest

def check_grant(self):
165    def check_grant(self):
166        """Check grant"""
167        # Determine which flow to use.
168        if self.response_type in [ResponseTypes.CODE]:
169            self.grant_type = GrantTypes.AUTHORIZATION_CODE
170        elif self.response_type in [
171            ResponseTypes.ID_TOKEN,
172            ResponseTypes.ID_TOKEN_TOKEN,
173        ]:
174            self.grant_type = GrantTypes.IMPLICIT
175        elif self.response_type in [
176            ResponseTypes.CODE_TOKEN,
177            ResponseTypes.CODE_ID_TOKEN,
178            ResponseTypes.CODE_ID_TOKEN_TOKEN,
179        ]:
180            self.grant_type = GrantTypes.HYBRID
181
182        # Grant type validation.
183        if not self.grant_type:
184            LOGGER.warning("Invalid response type", type=self.response_type)
185            raise AuthorizeError(self.redirect_uri, "unsupported_response_type", "", self.state)
186
187        if self.response_mode not in ResponseMode.values:
188            self.response_mode = ResponseMode.QUERY
189
190            if self.grant_type in [GrantTypes.IMPLICIT, GrantTypes.HYBRID]:
191                self.response_mode = ResponseMode.FRAGMENT

Check grant

def check_redirect_uri(self):
193    def check_redirect_uri(self):
194        """Redirect URI validation."""
195        allowed_redirect_urls = self.provider.redirect_uris
196        if not self.redirect_uri:
197            LOGGER.warning("Missing redirect uri.")
198            raise RedirectUriError("", allowed_redirect_urls).with_cause("redirect_uri_missing")
199
200        if len(allowed_redirect_urls) < 1:
201            LOGGER.info("Setting redirect for blank redirect_uris", redirect=self.redirect_uri)
202            self.provider.redirect_uris = [
203                RedirectURI(RedirectURIMatchingMode.STRICT, self.redirect_uri)
204            ]
205            self.provider.save()
206            allowed_redirect_urls = self.provider.redirect_uris
207
208        match_found = False
209        for allowed in allowed_redirect_urls:
210            if allowed.matching_mode == RedirectURIMatchingMode.STRICT:
211                if self.redirect_uri == allowed.url:
212                    match_found = True
213                    break
214            if allowed.matching_mode == RedirectURIMatchingMode.REGEX:
215                try:
216                    if fullmatch(allowed.url, self.redirect_uri):
217                        match_found = True
218                        break
219                except RegexError as exc:
220                    LOGGER.warning(
221                        "Failed to parse regular expression",
222                        exc=exc,
223                        url=allowed.url,
224                        provider=self.provider,
225                    )
226        if not match_found:
227            raise RedirectUriError(self.redirect_uri, allowed_redirect_urls).with_cause(
228                "redirect_uri_no_match"
229            )
230        # Check against forbidden schemes
231        if urlparse(self.redirect_uri).scheme in FORBIDDEN_URI_SCHEMES:
232            raise RedirectUriError(self.redirect_uri, allowed_redirect_urls).with_cause(
233                "redirect_uri_forbidden_scheme"
234            )

Redirect URI validation.

def check_scope(self, github_compat=False):
236    def check_scope(self, github_compat=False):
237        """Ensure openid scope is set in Hybrid flows, or when requesting an id_token"""
238        default_scope_names = set(
239            ScopeMapping.objects.filter(provider__in=[self.provider]).values_list(
240                "scope_name", flat=True
241            )
242        )
243        if len(self.scope) == 0:
244            self.scope = default_scope_names
245            LOGGER.info(
246                "No scopes requested, defaulting to all configured scopes", scopes=self.scope
247            )
248        scopes_to_check = self.scope
249        if github_compat:
250            scopes_to_check = self.scope - SCOPE_GITHUB
251        if not scopes_to_check.issubset(default_scope_names):
252            LOGGER.info(
253                "Application requested scopes not configured, setting to overlap",
254                scope_allowed=default_scope_names,
255                scope_given=self.scope,
256            )
257            self.scope = self.scope.intersection(default_scope_names)
258        if SCOPE_OPENID not in self.scope and (
259            self.grant_type == GrantTypes.HYBRID
260            or self.response_type in [ResponseTypes.ID_TOKEN, ResponseTypes.ID_TOKEN_TOKEN]
261        ):
262            LOGGER.warning("Missing 'openid' scope.")
263            raise AuthorizeError(
264                self.redirect_uri, "invalid_scope", self.grant_type, self.state
265            ).with_cause("scope_openid_missing")
266        if SCOPE_OFFLINE_ACCESS in self.scope:
267            # https://openid.net/specs/openid-connect-core-1_0.html#OfflineAccess
268            # Don't explicitly request consent with offline_access, as the spec allows for
269            # "other conditions for processing the request permitting offline access to the
270            # requested resources are in place"
271            # which we interpret as "the admin picks an authorization flow with or without consent"
272            if self.response_type not in [
273                ResponseTypes.CODE,
274                ResponseTypes.CODE_TOKEN,
275                ResponseTypes.CODE_ID_TOKEN,
276                ResponseTypes.CODE_ID_TOKEN_TOKEN,
277            ]:
278                # offline_access requires a response type that has some sort of token
279                # Spec says to ignore the scope when the response_type wouldn't result
280                # in an authorization code being generated
281                self.scope.remove(SCOPE_OFFLINE_ACCESS)

Ensure openid scope is set in Hybrid flows, or when requesting an id_token

def check_nonce(self):
283    def check_nonce(self):
284        """Nonce parameter validation."""
285        # nonce is required for all flows that return an id_token from the authorization endpoint,
286        # see https://openid.net/specs/openid-connect-core-1_0.html#ImplicitAuthRequest or
287        # https://openid.net/specs/openid-connect-core-1_0.html#HybridIDToken and
288        # https://bitbucket.org/openid/connect/issues/972/nonce-requirement-in-hybrid-auth-request
289        if self.response_type not in [
290            ResponseTypes.ID_TOKEN,
291            ResponseTypes.ID_TOKEN_TOKEN,
292            ResponseTypes.CODE_ID_TOKEN,
293            ResponseTypes.CODE_ID_TOKEN_TOKEN,
294        ]:
295            return
296        if SCOPE_OPENID not in self.scope:
297            return
298        if not self.nonce:
299            LOGGER.warning("Missing nonce for OpenID Request")
300            raise AuthorizeError(
301                self.redirect_uri, "invalid_request", self.grant_type, self.state
302            ).with_cause("nonce_missing")

Nonce parameter validation.

def check_code_challenge(self):
304    def check_code_challenge(self):
305        """PKCE validation of the transformation method."""
306        if self.code_challenge and self.code_challenge_method not in [
307            PKCE_METHOD_PLAIN,
308            PKCE_METHOD_S256,
309        ]:
310            raise AuthorizeError(
311                self.redirect_uri,
312                "invalid_request",
313                self.grant_type,
314                self.state,
315                f"Unsupported challenge method {self.code_challenge_method}",
316            )

PKCE validation of the transformation method.

def create_code( self, request: django.http.request.HttpRequest) -> authentik.providers.oauth2.models.AuthorizationCode:
318    def create_code(self, request: HttpRequest) -> AuthorizationCode:
319        """Create an AuthorizationCode object for the request"""
320        auth_event = get_login_event(request)
321
322        now = timezone.now()
323
324        code = AuthorizationCode(
325            user=request.user,
326            provider=self.provider,
327            auth_time=auth_event.created if auth_event else now,
328            code=uuid4().hex,
329            expires=now + timedelta_from_string(self.provider.access_code_validity),
330            scope=self.scope,
331            nonce=self.nonce,
332            session=request.session["authenticatedsession"],
333        )
334
335        if self.code_challenge and self.code_challenge_method:
336            code.code_challenge = self.code_challenge
337            code.code_challenge_method = self.code_challenge_method
338
339        return code

Create an AuthorizationCode object for the request

class AuthorizationFlowInitView(authentik.policies.views.PolicyAccessView):
342class AuthorizationFlowInitView(PolicyAccessView):
343    """OAuth2 Flow initializer, checks access to application and starts flow"""
344
345    params: OAuthAuthorizationParams
346    # Enable GitHub compatibility (only allow for scopes which are handled
347    # differently for github compat)
348    github_compat = False
349
350    def pre_permission_check(self):
351        """Check prompt parameter before checking permission/authentication,
352        see https://openid.net/specs/openid-connect-core-1_0.html#rfc.section.3.1.2.6"""
353        # Quick sanity check at the beginning to prevent event spamming
354        if len(self.request.GET) < 1:
355            raise Http404
356        try:
357            self.params = OAuthAuthorizationParams.from_request(
358                self.request, github_compat=self.github_compat
359            )
360        except AuthorizeError as error:
361            LOGGER.warning(error.description, redirect_uri=error.redirect_uri, cause=error.cause)
362            raise RequestValidationError(error.get_response(self.request)) from None
363        except OAuth2Error as error:
364            LOGGER.warning(error.description, cause=error.cause)
365            raise RequestValidationError(
366                bad_request_message(self.request, error.description, title=error.error)
367            ) from None
368        except OAuth2Provider.DoesNotExist:
369            raise Http404 from None
370        if PROMPT_NONE in self.params.prompt and not self.request.user.is_authenticated:
371            # When "prompt" is set to "none" but the user is not logged in, show an error message
372            error = AuthorizeError(
373                self.params.redirect_uri,
374                "login_required",
375                self.params.grant_type,
376                self.params.state,
377            )
378            raise RequestValidationError(error.get_response(self.request))
379
380    def resolve_provider_application(self):
381        client_id = self.request.GET.get("client_id")
382        self.provider = get_object_or_404(OAuth2Provider, client_id=client_id)
383        self.application = self.provider.application
384
385    def modify_flow_context(self, flow: Flow, context: dict[str, Any]) -> dict[str, Any]:
386        if QS_LOGIN_HINT in self.request.GET:
387            context[PLAN_CONTEXT_PENDING_USER_IDENTIFIER] = self.request.GET.get(QS_LOGIN_HINT)
388        return super().modify_flow_context(flow, context)
389
390    def modify_policy_request(self, request: PolicyRequest) -> PolicyRequest:
391        request.context["oauth_scopes"] = self.params.scope
392        request.context["oauth_grant_type"] = self.params.grant_type
393        request.context["oauth_code_challenge"] = self.params.code_challenge
394        request.context["oauth_code_challenge_method"] = self.params.code_challenge_method
395        request.context["oauth_max_age"] = self.params.max_age
396        request.context["oauth_redirect_uri"] = self.params.redirect_uri
397        request.context["oauth_response_type"] = self.params.response_type
398        return request
399
400    def dispatch_with_language(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
401        """Activate language from OIDC specific ui_locales parameter, picking the earliest one
402        available"""
403        selected_language = None
404        if UI_LOCALES in self.request.GET:
405            languages = str(self.request.GET[UI_LOCALES]).split(" ")
406            for language in languages:
407                if translation.check_for_language(language):
408                    selected_language = translation.get_supported_language_variant(language)
409                    LOGGER.debug(
410                        "Activating language from oidc ui_locales", locale=selected_language
411                    )
412                    break
413            translation.activate(selected_language)
414        response = super().dispatch(request, *args, **kwargs)
415        if selected_language:
416            response.set_cookie(
417                settings.LANGUAGE_COOKIE_NAME,
418                selected_language,
419                max_age=settings.LANGUAGE_COOKIE_AGE,
420                path=settings.LANGUAGE_COOKIE_PATH,
421                domain=settings.LANGUAGE_COOKIE_DOMAIN,
422                secure=settings.LANGUAGE_COOKIE_SECURE,
423                httponly=settings.LANGUAGE_COOKIE_HTTPONLY,
424                samesite=settings.LANGUAGE_COOKIE_SAMESITE,
425            )
426            if isinstance(response, HttpResponseRedirect):
427                parsed_url = urlparse(response.url)
428                args = parse_qs(parsed_url.query)
429                args["locale"] = selected_language
430                response["Location"] = urlunparse(
431                    parsed_url._replace(query=urlencode(args, quote_via=quote, doseq=True))
432                )
433        return response
434
435    def dispatch(self, request: HttpRequest, *args, **kwargs):
436        # Activate language before parsing params (error messages should be localized)
437        return self.dispatch_with_language(request, *args, **kwargs)
438
439    def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
440        """Start FlowPLanner, return to flow executor shell"""
441        # Require a login event to be set, otherwise make the user re-login
442        login_event = get_login_event(request)
443        if not login_event:
444            LOGGER.warning("request with no login event")
445            return self.handle_no_permission()
446        login_uid = str(login_event.pk)
447        # After we've checked permissions, and the user has access, check if we need
448        # to re-authenticate the user
449        if self.params.max_age:
450            # Attempt to check via the session's login event if set, otherwise we can't
451            # check
452            login_time = login_event.created
453            current_age: timedelta = timezone.now() - login_time
454            if current_age.total_seconds() > self.params.max_age:
455                LOGGER.debug(
456                    "Triggering authentication as max_age requirement",
457                    max_age=self.params.max_age,
458                    ago=int(current_age.total_seconds()),
459                )
460                # Since we already need to re-authenticate the user, set the old login UID
461                # in case this request has both max_age and prompt=login
462                self.request.session[SESSION_KEY_LAST_LOGIN_UID] = login_uid
463                return self.handle_no_permission()
464        # If prompt=login, we need to re-authenticate the user regardless
465        # Check if we're not already doing the re-authentication
466        if PROMPT_LOGIN in self.params.prompt:
467            # No previous login UID saved, so save the current uid and trigger
468            # re-login, or previous login UID matches current one, so no re-login happened yet
469            if (
470                SESSION_KEY_LAST_LOGIN_UID not in self.request.session
471                or login_uid == self.request.session[SESSION_KEY_LAST_LOGIN_UID]
472            ):
473                self.request.session[SESSION_KEY_LAST_LOGIN_UID] = login_uid
474                return self.handle_no_permission()
475        scope_descriptions = UserInfoView().get_scope_descriptions(
476            self.params.scope, self.params.provider
477        )
478        # Regardless, we start the planner and return to it
479        planner = FlowPlanner(self.provider.authorization_flow)
480        planner.allow_empty_flows = True
481        try:
482            plan = planner.plan(
483                self.request,
484                {
485                    PLAN_CONTEXT_SSO: True,
486                    PLAN_CONTEXT_APPLICATION: self.application,
487                    # OAuth2 related params
488                    PLAN_CONTEXT_PARAMS: self.params,
489                    # Consent related params
490                    PLAN_CONTEXT_CONSENT_HEADER: _("You're about to sign into %(application)s.")
491                    % {"application": self.application.name},
492                    PLAN_CONTEXT_CONSENT_PERMISSIONS: scope_descriptions,
493                },
494            )
495        except FlowNonApplicableException:
496            return self.handle_no_permission_authenticated()
497        # OpenID clients can specify a `prompt` parameter, and if its set to consent we
498        # need to inject a consent stage
499        if PROMPT_CONSENT in self.params.prompt:
500            if not any(isinstance(x.stage, ConsentStage) for x in plan.bindings):
501                # Plan does not have any consent stage, so we add an in-memory one
502                stage = ConsentStage(
503                    name="OAuth2 Provider In-memory consent stage",
504                    mode=ConsentMode.ALWAYS_REQUIRE,
505                )
506                plan.append_stage(stage)
507
508        plan.append_stage(in_memory_stage(OAuthFulfillmentStage))
509
510        return plan.to_redirect(
511            self.request,
512            self.provider.authorization_flow,
513            # We can only skip the flow executor and directly go to the final redirect URL if
514            #  we can submit the data to the RP via URL
515            allowed_silent_types=(
516                [OAuthFulfillmentStage]
517                if self.params.response_mode in [ResponseMode.QUERY, ResponseMode.FRAGMENT]
518                else []
519            ),
520        )

OAuth2 Flow initializer, checks access to application and starts flow

github_compat = False
def pre_permission_check(self):
350    def pre_permission_check(self):
351        """Check prompt parameter before checking permission/authentication,
352        see https://openid.net/specs/openid-connect-core-1_0.html#rfc.section.3.1.2.6"""
353        # Quick sanity check at the beginning to prevent event spamming
354        if len(self.request.GET) < 1:
355            raise Http404
356        try:
357            self.params = OAuthAuthorizationParams.from_request(
358                self.request, github_compat=self.github_compat
359            )
360        except AuthorizeError as error:
361            LOGGER.warning(error.description, redirect_uri=error.redirect_uri, cause=error.cause)
362            raise RequestValidationError(error.get_response(self.request)) from None
363        except OAuth2Error as error:
364            LOGGER.warning(error.description, cause=error.cause)
365            raise RequestValidationError(
366                bad_request_message(self.request, error.description, title=error.error)
367            ) from None
368        except OAuth2Provider.DoesNotExist:
369            raise Http404 from None
370        if PROMPT_NONE in self.params.prompt and not self.request.user.is_authenticated:
371            # When "prompt" is set to "none" but the user is not logged in, show an error message
372            error = AuthorizeError(
373                self.params.redirect_uri,
374                "login_required",
375                self.params.grant_type,
376                self.params.state,
377            )
378            raise RequestValidationError(error.get_response(self.request))

Check prompt parameter before checking permission/authentication, see https://openid.net/specs/openid-connect-core-1_0.html#rfc.section.3.1.2.6

def resolve_provider_application(self):
380    def resolve_provider_application(self):
381        client_id = self.request.GET.get("client_id")
382        self.provider = get_object_or_404(OAuth2Provider, client_id=client_id)
383        self.application = self.provider.application

Resolve self.provider and self.application. *.DoesNotExist Exceptions cause a normal AccessDenied view to be shown. An Http404 exception is not caught, and will return directly

def modify_flow_context( self, flow: authentik.flows.models.Flow, context: dict[str, typing.Any]) -> dict[str, typing.Any]:
385    def modify_flow_context(self, flow: Flow, context: dict[str, Any]) -> dict[str, Any]:
386        if QS_LOGIN_HINT in self.request.GET:
387            context[PLAN_CONTEXT_PENDING_USER_IDENTIFIER] = self.request.GET.get(QS_LOGIN_HINT)
388        return super().modify_flow_context(flow, context)

optionally modify the flow context which is used for the authentication flow

def modify_policy_request( self, request: authentik.policies.types.PolicyRequest) -> authentik.policies.types.PolicyRequest:
390    def modify_policy_request(self, request: PolicyRequest) -> PolicyRequest:
391        request.context["oauth_scopes"] = self.params.scope
392        request.context["oauth_grant_type"] = self.params.grant_type
393        request.context["oauth_code_challenge"] = self.params.code_challenge
394        request.context["oauth_code_challenge_method"] = self.params.code_challenge_method
395        request.context["oauth_max_age"] = self.params.max_age
396        request.context["oauth_redirect_uri"] = self.params.redirect_uri
397        request.context["oauth_response_type"] = self.params.response_type
398        return request

optionally modify the policy request

def dispatch_with_language( self, request: django.http.request.HttpRequest, *args, **kwargs) -> django.http.response.HttpResponse:
400    def dispatch_with_language(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
401        """Activate language from OIDC specific ui_locales parameter, picking the earliest one
402        available"""
403        selected_language = None
404        if UI_LOCALES in self.request.GET:
405            languages = str(self.request.GET[UI_LOCALES]).split(" ")
406            for language in languages:
407                if translation.check_for_language(language):
408                    selected_language = translation.get_supported_language_variant(language)
409                    LOGGER.debug(
410                        "Activating language from oidc ui_locales", locale=selected_language
411                    )
412                    break
413            translation.activate(selected_language)
414        response = super().dispatch(request, *args, **kwargs)
415        if selected_language:
416            response.set_cookie(
417                settings.LANGUAGE_COOKIE_NAME,
418                selected_language,
419                max_age=settings.LANGUAGE_COOKIE_AGE,
420                path=settings.LANGUAGE_COOKIE_PATH,
421                domain=settings.LANGUAGE_COOKIE_DOMAIN,
422                secure=settings.LANGUAGE_COOKIE_SECURE,
423                httponly=settings.LANGUAGE_COOKIE_HTTPONLY,
424                samesite=settings.LANGUAGE_COOKIE_SAMESITE,
425            )
426            if isinstance(response, HttpResponseRedirect):
427                parsed_url = urlparse(response.url)
428                args = parse_qs(parsed_url.query)
429                args["locale"] = selected_language
430                response["Location"] = urlunparse(
431                    parsed_url._replace(query=urlencode(args, quote_via=quote, doseq=True))
432                )
433        return response

Activate language from OIDC specific ui_locales parameter, picking the earliest one available

def dispatch(self, request: django.http.request.HttpRequest, *args, **kwargs):
435    def dispatch(self, request: HttpRequest, *args, **kwargs):
436        # Activate language before parsing params (error messages should be localized)
437        return self.dispatch_with_language(request, *args, **kwargs)
def get( self, request: django.http.request.HttpRequest, *args, **kwargs) -> django.http.response.HttpResponse:
439    def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
440        """Start FlowPLanner, return to flow executor shell"""
441        # Require a login event to be set, otherwise make the user re-login
442        login_event = get_login_event(request)
443        if not login_event:
444            LOGGER.warning("request with no login event")
445            return self.handle_no_permission()
446        login_uid = str(login_event.pk)
447        # After we've checked permissions, and the user has access, check if we need
448        # to re-authenticate the user
449        if self.params.max_age:
450            # Attempt to check via the session's login event if set, otherwise we can't
451            # check
452            login_time = login_event.created
453            current_age: timedelta = timezone.now() - login_time
454            if current_age.total_seconds() > self.params.max_age:
455                LOGGER.debug(
456                    "Triggering authentication as max_age requirement",
457                    max_age=self.params.max_age,
458                    ago=int(current_age.total_seconds()),
459                )
460                # Since we already need to re-authenticate the user, set the old login UID
461                # in case this request has both max_age and prompt=login
462                self.request.session[SESSION_KEY_LAST_LOGIN_UID] = login_uid
463                return self.handle_no_permission()
464        # If prompt=login, we need to re-authenticate the user regardless
465        # Check if we're not already doing the re-authentication
466        if PROMPT_LOGIN in self.params.prompt:
467            # No previous login UID saved, so save the current uid and trigger
468            # re-login, or previous login UID matches current one, so no re-login happened yet
469            if (
470                SESSION_KEY_LAST_LOGIN_UID not in self.request.session
471                or login_uid == self.request.session[SESSION_KEY_LAST_LOGIN_UID]
472            ):
473                self.request.session[SESSION_KEY_LAST_LOGIN_UID] = login_uid
474                return self.handle_no_permission()
475        scope_descriptions = UserInfoView().get_scope_descriptions(
476            self.params.scope, self.params.provider
477        )
478        # Regardless, we start the planner and return to it
479        planner = FlowPlanner(self.provider.authorization_flow)
480        planner.allow_empty_flows = True
481        try:
482            plan = planner.plan(
483                self.request,
484                {
485                    PLAN_CONTEXT_SSO: True,
486                    PLAN_CONTEXT_APPLICATION: self.application,
487                    # OAuth2 related params
488                    PLAN_CONTEXT_PARAMS: self.params,
489                    # Consent related params
490                    PLAN_CONTEXT_CONSENT_HEADER: _("You're about to sign into %(application)s.")
491                    % {"application": self.application.name},
492                    PLAN_CONTEXT_CONSENT_PERMISSIONS: scope_descriptions,
493                },
494            )
495        except FlowNonApplicableException:
496            return self.handle_no_permission_authenticated()
497        # OpenID clients can specify a `prompt` parameter, and if its set to consent we
498        # need to inject a consent stage
499        if PROMPT_CONSENT in self.params.prompt:
500            if not any(isinstance(x.stage, ConsentStage) for x in plan.bindings):
501                # Plan does not have any consent stage, so we add an in-memory one
502                stage = ConsentStage(
503                    name="OAuth2 Provider In-memory consent stage",
504                    mode=ConsentMode.ALWAYS_REQUIRE,
505                )
506                plan.append_stage(stage)
507
508        plan.append_stage(in_memory_stage(OAuthFulfillmentStage))
509
510        return plan.to_redirect(
511            self.request,
512            self.provider.authorization_flow,
513            # We can only skip the flow executor and directly go to the final redirect URL if
514            #  we can submit the data to the RP via URL
515            allowed_silent_types=(
516                [OAuthFulfillmentStage]
517                if self.params.response_mode in [ResponseMode.QUERY, ResponseMode.FRAGMENT]
518                else []
519            ),
520        )

Start FlowPLanner, return to flow executor shell

class OAuthFulfillmentStage(authentik.flows.stage.StageView):
523class OAuthFulfillmentStage(StageView):
524    """Final stage, restores params from Flow."""
525
526    params: OAuthAuthorizationParams
527    provider: OAuth2Provider
528    application: Application
529
530    def redirect(self, uri: str) -> HttpResponse:
531        """Redirect using HttpResponseRedirectScheme, compatible with non-http schemes"""
532        parsed = urlparse(uri)
533
534        if self.params.response_mode == ResponseMode.FORM_POST:
535            # parse_qs returns a dictionary with values wrapped in lists, however
536            # we need a flat dictionary for the autosubmit challenge
537
538            # this picks the first item in the list if the value is a list,
539            # otherwise just the value as-is
540            query_params = dict(
541                (k, v[0] if isinstance(v, list) else v) for k, v in parse_qs(parsed.query).items()
542            )
543
544            challenge = AutosubmitChallenge(
545                data={
546                    "component": "ak-stage-autosubmit",
547                    "title": self.executor.plan.context.get(
548                        PLAN_CONTEXT_TITLE,
549                        _("Redirecting to {app}...".format_map({"app": self.application.name})),
550                    ),
551                    "url": self.params.redirect_uri,
552                    "attrs": query_params,
553                }
554            )
555
556            challenge.is_valid()
557            self.executor.stage_ok()
558            return HttpChallengeResponse(
559                challenge=challenge,
560            )
561        self.executor.stage_ok()
562        return HttpResponseRedirectScheme(uri, allowed_schemes=[parsed.scheme])
563
564    def post(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
565        """Wrapper when this stage gets hit with a post request"""
566        return self.get(request, *args, **kwargs)
567
568    def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
569        """final Stage of an OAuth2 Flow"""
570        if PLAN_CONTEXT_PARAMS not in self.executor.plan.context:
571            LOGGER.warning("Got to fulfillment stage with no pending context")
572            return HttpResponseBadRequest()
573        self.params: OAuthAuthorizationParams = self.executor.plan.context.pop(PLAN_CONTEXT_PARAMS)
574        self.application: Application = self.executor.plan.context.pop(PLAN_CONTEXT_APPLICATION)
575        self.provider = get_object_or_404(OAuth2Provider, pk=self.application.provider_id)
576        try:
577            # At this point we don't need to check permissions anymore
578            if {PROMPT_NONE, PROMPT_CONSENT}.issubset(self.params.prompt):
579                raise AuthorizeError(
580                    self.params.redirect_uri,
581                    "consent_required",
582                    self.params.grant_type,
583                    self.params.state,
584                )
585            Event.new(
586                EventAction.AUTHORIZE_APPLICATION,
587                authorized_application=self.application,
588                flow=self.executor.plan.flow_pk,
589                scopes=" ".join(self.params.scope),
590            ).from_http(self.request)
591            return self.redirect(self.create_response_uri())
592        except (ClientIdError, RedirectUriError) as error:
593            error.to_event(application=self.application).from_http(request)
594            self.executor.stage_invalid()
595
596            return bad_request_message(request, error.description, title=error.error)
597        except AuthorizeError as error:
598            error.to_event(application=self.application).from_http(request)
599            self.executor.stage_invalid()
600            return error.get_response(self.request)
601
602    def create_response_uri(self) -> str:
603        """Create a final Response URI the user is redirected to."""
604        uri = urlsplit(self.params.redirect_uri)
605
606        try:
607            code = None
608
609            if self.params.grant_type in [
610                GrantTypes.AUTHORIZATION_CODE,
611                GrantTypes.HYBRID,
612            ]:
613                code = self.params.create_code(self.request)
614                code.save()
615
616            if self.params.response_mode == ResponseMode.QUERY:
617                query_params = parse_qs(uri.query)
618                query_params["code"] = code.code
619                query_params["state"] = [str(self.params.state) if self.params.state else ""]
620
621                uri = uri._replace(query=urlencode(query_params, doseq=True))
622                return urlunsplit(uri)
623
624            if self.params.response_mode == ResponseMode.FRAGMENT:
625                query_fragment = {}
626                if self.params.grant_type in [GrantTypes.AUTHORIZATION_CODE]:
627                    query_fragment["code"] = code.code
628                    query_fragment["state"] = [str(self.params.state) if self.params.state else ""]
629                else:
630                    query_fragment = self.create_implicit_response(code)
631
632                uri = uri._replace(
633                    fragment=uri.fragment + urlencode(query_fragment, doseq=True),
634                )
635
636                return urlunsplit(uri)
637
638            if self.params.response_mode == ResponseMode.FORM_POST:
639                post_params = {}
640                if self.params.grant_type in [GrantTypes.AUTHORIZATION_CODE]:
641                    post_params["code"] = code.code
642                    post_params["state"] = [str(self.params.state) if self.params.state else ""]
643                else:
644                    post_params = self.create_implicit_response(code)
645
646                uri = uri._replace(query=urlencode(post_params, doseq=True))
647
648                return urlunsplit(uri)
649
650            raise OAuth2Error()
651        except OAuth2Error as error:
652            LOGGER.warning("Error when trying to create response uri", error=error)
653            raise AuthorizeError(
654                self.params.redirect_uri,
655                "server_error",
656                self.params.grant_type,
657                self.params.state,
658            ) from None
659
660    def create_implicit_response(self, code: AuthorizationCode | None) -> dict:
661        """Create implicit response's URL Fragment dictionary"""
662        query_fragment = {}
663        auth_event = get_login_event(self.request)
664
665        now = timezone.now()
666        access_token_expiry = now + timedelta_from_string(self.provider.access_token_validity)
667        token = AccessToken(
668            user=self.request.user,
669            scope=self.params.scope,
670            expires=access_token_expiry,
671            provider=self.provider,
672            auth_time=auth_event.created if auth_event else now,
673            session=self.request.session["authenticatedsession"],
674        )
675
676        id_token = IDToken.new(self.provider, token, self.request)
677        id_token.nonce = self.params.nonce
678
679        if self.params.response_type in [
680            ResponseTypes.CODE_ID_TOKEN,
681            ResponseTypes.CODE_ID_TOKEN_TOKEN,
682        ]:
683            id_token.c_hash = code.c_hash
684        token.id_token = id_token
685
686        # Check if response_type must include access_token in the response.
687        if self.params.response_type in [
688            ResponseTypes.ID_TOKEN_TOKEN,
689            ResponseTypes.CODE_ID_TOKEN_TOKEN,
690            ResponseTypes.CODE_TOKEN,
691        ]:
692            query_fragment["access_token"] = token.token
693            # Get at_hash of the current token and update the id_token
694            id_token.at_hash = token.at_hash
695
696        # Check if response_type must include id_token in the response.
697        if self.params.response_type in [
698            ResponseTypes.ID_TOKEN,
699            ResponseTypes.ID_TOKEN_TOKEN,
700            ResponseTypes.CODE_ID_TOKEN,
701            ResponseTypes.CODE_ID_TOKEN_TOKEN,
702        ]:
703            query_fragment["id_token"] = self.provider.encode(id_token.to_dict())
704            token._id_token = dumps(id_token.to_dict())
705
706        token.save()
707
708        # Code parameter must be present if it's Hybrid Flow.
709        if self.params.grant_type == GrantTypes.HYBRID:
710            query_fragment["code"] = code.code
711
712        query_fragment["token_type"] = TOKEN_TYPE
713        query_fragment["expires_in"] = int(
714            timedelta_from_string(self.provider.access_token_validity).total_seconds()
715        )
716        query_fragment["state"] = self.params.state if self.params.state else ""
717        return query_fragment

Final stage, restores params from Flow.

def redirect(self, uri: str) -> django.http.response.HttpResponse:
530    def redirect(self, uri: str) -> HttpResponse:
531        """Redirect using HttpResponseRedirectScheme, compatible with non-http schemes"""
532        parsed = urlparse(uri)
533
534        if self.params.response_mode == ResponseMode.FORM_POST:
535            # parse_qs returns a dictionary with values wrapped in lists, however
536            # we need a flat dictionary for the autosubmit challenge
537
538            # this picks the first item in the list if the value is a list,
539            # otherwise just the value as-is
540            query_params = dict(
541                (k, v[0] if isinstance(v, list) else v) for k, v in parse_qs(parsed.query).items()
542            )
543
544            challenge = AutosubmitChallenge(
545                data={
546                    "component": "ak-stage-autosubmit",
547                    "title": self.executor.plan.context.get(
548                        PLAN_CONTEXT_TITLE,
549                        _("Redirecting to {app}...".format_map({"app": self.application.name})),
550                    ),
551                    "url": self.params.redirect_uri,
552                    "attrs": query_params,
553                }
554            )
555
556            challenge.is_valid()
557            self.executor.stage_ok()
558            return HttpChallengeResponse(
559                challenge=challenge,
560            )
561        self.executor.stage_ok()
562        return HttpResponseRedirectScheme(uri, allowed_schemes=[parsed.scheme])

Redirect using HttpResponseRedirectScheme, compatible with non-http schemes

def post( self, request: django.http.request.HttpRequest, *args, **kwargs) -> django.http.response.HttpResponse:
564    def post(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
565        """Wrapper when this stage gets hit with a post request"""
566        return self.get(request, *args, **kwargs)

Wrapper when this stage gets hit with a post request

def get( self, request: django.http.request.HttpRequest, *args, **kwargs) -> django.http.response.HttpResponse:
568    def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
569        """final Stage of an OAuth2 Flow"""
570        if PLAN_CONTEXT_PARAMS not in self.executor.plan.context:
571            LOGGER.warning("Got to fulfillment stage with no pending context")
572            return HttpResponseBadRequest()
573        self.params: OAuthAuthorizationParams = self.executor.plan.context.pop(PLAN_CONTEXT_PARAMS)
574        self.application: Application = self.executor.plan.context.pop(PLAN_CONTEXT_APPLICATION)
575        self.provider = get_object_or_404(OAuth2Provider, pk=self.application.provider_id)
576        try:
577            # At this point we don't need to check permissions anymore
578            if {PROMPT_NONE, PROMPT_CONSENT}.issubset(self.params.prompt):
579                raise AuthorizeError(
580                    self.params.redirect_uri,
581                    "consent_required",
582                    self.params.grant_type,
583                    self.params.state,
584                )
585            Event.new(
586                EventAction.AUTHORIZE_APPLICATION,
587                authorized_application=self.application,
588                flow=self.executor.plan.flow_pk,
589                scopes=" ".join(self.params.scope),
590            ).from_http(self.request)
591            return self.redirect(self.create_response_uri())
592        except (ClientIdError, RedirectUriError) as error:
593            error.to_event(application=self.application).from_http(request)
594            self.executor.stage_invalid()
595
596            return bad_request_message(request, error.description, title=error.error)
597        except AuthorizeError as error:
598            error.to_event(application=self.application).from_http(request)
599            self.executor.stage_invalid()
600            return error.get_response(self.request)

final Stage of an OAuth2 Flow

def create_response_uri(self) -> str:
602    def create_response_uri(self) -> str:
603        """Create a final Response URI the user is redirected to."""
604        uri = urlsplit(self.params.redirect_uri)
605
606        try:
607            code = None
608
609            if self.params.grant_type in [
610                GrantTypes.AUTHORIZATION_CODE,
611                GrantTypes.HYBRID,
612            ]:
613                code = self.params.create_code(self.request)
614                code.save()
615
616            if self.params.response_mode == ResponseMode.QUERY:
617                query_params = parse_qs(uri.query)
618                query_params["code"] = code.code
619                query_params["state"] = [str(self.params.state) if self.params.state else ""]
620
621                uri = uri._replace(query=urlencode(query_params, doseq=True))
622                return urlunsplit(uri)
623
624            if self.params.response_mode == ResponseMode.FRAGMENT:
625                query_fragment = {}
626                if self.params.grant_type in [GrantTypes.AUTHORIZATION_CODE]:
627                    query_fragment["code"] = code.code
628                    query_fragment["state"] = [str(self.params.state) if self.params.state else ""]
629                else:
630                    query_fragment = self.create_implicit_response(code)
631
632                uri = uri._replace(
633                    fragment=uri.fragment + urlencode(query_fragment, doseq=True),
634                )
635
636                return urlunsplit(uri)
637
638            if self.params.response_mode == ResponseMode.FORM_POST:
639                post_params = {}
640                if self.params.grant_type in [GrantTypes.AUTHORIZATION_CODE]:
641                    post_params["code"] = code.code
642                    post_params["state"] = [str(self.params.state) if self.params.state else ""]
643                else:
644                    post_params = self.create_implicit_response(code)
645
646                uri = uri._replace(query=urlencode(post_params, doseq=True))
647
648                return urlunsplit(uri)
649
650            raise OAuth2Error()
651        except OAuth2Error as error:
652            LOGGER.warning("Error when trying to create response uri", error=error)
653            raise AuthorizeError(
654                self.params.redirect_uri,
655                "server_error",
656                self.params.grant_type,
657                self.params.state,
658            ) from None

Create a final Response URI the user is redirected to.

def create_implicit_response( self, code: authentik.providers.oauth2.models.AuthorizationCode | None) -> dict:
660    def create_implicit_response(self, code: AuthorizationCode | None) -> dict:
661        """Create implicit response's URL Fragment dictionary"""
662        query_fragment = {}
663        auth_event = get_login_event(self.request)
664
665        now = timezone.now()
666        access_token_expiry = now + timedelta_from_string(self.provider.access_token_validity)
667        token = AccessToken(
668            user=self.request.user,
669            scope=self.params.scope,
670            expires=access_token_expiry,
671            provider=self.provider,
672            auth_time=auth_event.created if auth_event else now,
673            session=self.request.session["authenticatedsession"],
674        )
675
676        id_token = IDToken.new(self.provider, token, self.request)
677        id_token.nonce = self.params.nonce
678
679        if self.params.response_type in [
680            ResponseTypes.CODE_ID_TOKEN,
681            ResponseTypes.CODE_ID_TOKEN_TOKEN,
682        ]:
683            id_token.c_hash = code.c_hash
684        token.id_token = id_token
685
686        # Check if response_type must include access_token in the response.
687        if self.params.response_type in [
688            ResponseTypes.ID_TOKEN_TOKEN,
689            ResponseTypes.CODE_ID_TOKEN_TOKEN,
690            ResponseTypes.CODE_TOKEN,
691        ]:
692            query_fragment["access_token"] = token.token
693            # Get at_hash of the current token and update the id_token
694            id_token.at_hash = token.at_hash
695
696        # Check if response_type must include id_token in the response.
697        if self.params.response_type in [
698            ResponseTypes.ID_TOKEN,
699            ResponseTypes.ID_TOKEN_TOKEN,
700            ResponseTypes.CODE_ID_TOKEN,
701            ResponseTypes.CODE_ID_TOKEN_TOKEN,
702        ]:
703            query_fragment["id_token"] = self.provider.encode(id_token.to_dict())
704            token._id_token = dumps(id_token.to_dict())
705
706        token.save()
707
708        # Code parameter must be present if it's Hybrid Flow.
709        if self.params.grant_type == GrantTypes.HYBRID:
710            query_fragment["code"] = code.code
711
712        query_fragment["token_type"] = TOKEN_TYPE
713        query_fragment["expires_in"] = int(
714            timedelta_from_string(self.provider.access_token_validity).total_seconds()
715        )
716        query_fragment["state"] = self.params.state if self.params.state else ""
717        return query_fragment

Create implicit response's URL Fragment dictionary