authentik.providers.oauth2.views.token

authentik OAuth2 Token views

  1"""authentik OAuth2 Token views"""
  2
  3from base64 import b64decode
  4from binascii import Error
  5from dataclasses import InitVar, dataclass
  6from datetime import datetime
  7from hmac import compare_digest
  8from re import error as RegexError
  9from re import fullmatch
 10from typing import Any
 11from urllib.parse import urlparse
 12
 13from django.http import HttpRequest, HttpResponse
 14from django.utils import timezone
 15from django.utils.decorators import method_decorator
 16from django.views import View
 17from django.views.decorators.csrf import csrf_exempt
 18from guardian.shortcuts import get_anonymous_user
 19from jwt import PyJWK, PyJWT, PyJWTError, decode
 20from sentry_sdk import start_span
 21from structlog.stdlib import get_logger
 22
 23from authentik.common.oauth.constants import (
 24    CLIENT_ASSERTION,
 25    CLIENT_ASSERTION_TYPE,
 26    CLIENT_ASSERTION_TYPE_JWT,
 27    GRANT_TYPE_AUTHORIZATION_CODE,
 28    GRANT_TYPE_CLIENT_CREDENTIALS,
 29    GRANT_TYPE_DEVICE_CODE,
 30    GRANT_TYPE_PASSWORD,
 31    GRANT_TYPE_REFRESH_TOKEN,
 32    PKCE_METHOD_S256,
 33    SCOPE_OFFLINE_ACCESS,
 34    TOKEN_TYPE,
 35)
 36from authentik.core.apps import AppAccessWithoutBindings
 37from authentik.core.middleware import CTX_AUTH_VIA
 38from authentik.core.models import (
 39    USER_ATTRIBUTE_EXPIRES,
 40    USER_ATTRIBUTE_GENERATED,
 41    USER_PATH_SYSTEM_PREFIX,
 42    USERNAME_MAX_LENGTH,
 43    Application,
 44    Token,
 45    TokenIntents,
 46    User,
 47    UserTypes,
 48)
 49from authentik.core.sources.mapper import SourceMapper
 50from authentik.events.middleware import audit_ignore
 51from authentik.events.models import Event, EventAction
 52from authentik.events.signals import get_login_event
 53from authentik.flows.planner import PLAN_CONTEXT_APPLICATION
 54from authentik.lib.utils.time import timedelta_from_string
 55from authentik.policies.engine import PolicyEngine
 56from authentik.providers.oauth2.errors import DeviceCodeError, TokenError, UserAuthError
 57from authentik.providers.oauth2.id_token import IDToken
 58from authentik.providers.oauth2.models import (
 59    AccessToken,
 60    AuthorizationCode,
 61    ClientType,
 62    DeviceToken,
 63    OAuth2Provider,
 64    RedirectURIMatchingMode,
 65    RefreshToken,
 66    ScopeMapping,
 67)
 68from authentik.providers.oauth2.utils import (
 69    TokenResponse,
 70    cors_allow,
 71    extract_client_auth,
 72    pkce_s256_challenge,
 73)
 74from authentik.providers.oauth2.views.authorize import FORBIDDEN_URI_SCHEMES
 75from authentik.sources.oauth.models import OAuthSource
 76from authentik.stages.password.stage import PLAN_CONTEXT_METHOD, PLAN_CONTEXT_METHOD_ARGS
 77
 78LOGGER = get_logger()
 79
 80
 81@dataclass(slots=True)
 82class TokenParams:
 83    """Token params"""
 84
 85    client_id: str
 86    client_secret: str
 87    redirect_uri: str
 88    grant_type: str
 89    state: str
 90    scope: set[str]
 91
 92    provider: OAuth2Provider
 93
 94    authorization_code: AuthorizationCode | None = None
 95    refresh_token: RefreshToken | None = None
 96    device_code: DeviceToken | None = None
 97    user: User | None = None
 98
 99    code_verifier: str | None = None
100
101    raw_code: InitVar[str] = ""
102    raw_token: InitVar[str] = ""
103    request: InitVar[HttpRequest | None] = None
104
105    @staticmethod
106    def parse(
107        request: HttpRequest,
108        provider: OAuth2Provider,
109        client_id: str,
110        client_secret: str,
111    ) -> TokenParams:
112        """Parse params for request"""
113        return TokenParams(
114            # Init vars
115            raw_code=request.POST.get("code", ""),
116            raw_token=request.POST.get("refresh_token", ""),
117            request=request,
118            # Regular params
119            provider=provider,
120            client_id=client_id,
121            client_secret=client_secret,
122            redirect_uri=request.POST.get("redirect_uri", ""),
123            grant_type=request.POST.get("grant_type", ""),
124            state=request.POST.get("state", ""),
125            scope=set(request.POST.get("scope", "").split()),
126            # PKCE parameter.
127            code_verifier=request.POST.get("code_verifier"),
128        )
129
130    def __check_scopes(self):
131        allowed_scope_names = set(
132            ScopeMapping.objects.filter(provider__in=[self.provider]).values_list(
133                "scope_name", flat=True
134            )
135        )
136        scopes_to_check = self.scope
137        if not scopes_to_check.issubset(allowed_scope_names):
138            LOGGER.info(
139                "Application requested scopes not configured, setting to overlap",
140                scope_allowed=allowed_scope_names,
141                scope_given=self.scope,
142            )
143            self.scope = self.scope.intersection(allowed_scope_names)
144
145    def __check_policy_access(self, app: Application, request: HttpRequest, **kwargs):
146        with start_span(
147            op="authentik.providers.oauth2.token.policy",
148        ):
149            user = self.user if self.user else get_anonymous_user()
150            engine = PolicyEngine(app, user, request)
151            engine.empty_result = AppAccessWithoutBindings.get()
152            # Don't cache as for client_credentials flows the user will not be set
153            # so we'll get generic cache results
154            engine.use_cache = False
155            engine.request.context["oauth_scopes"] = self.scope
156            engine.request.context["oauth_grant_type"] = self.grant_type
157            engine.request.context["oauth_code_verifier"] = self.code_verifier
158            engine.request.context.update(kwargs)
159            engine.build()
160            result = engine.result
161            if not result.passing:
162                LOGGER.info(
163                    "User not authenticated for application", user=self.user, app_slug=app.slug
164                )
165                raise TokenError("invalid_grant")
166
167    def __post_init__(self, raw_code: str, raw_token: str, request: HttpRequest):
168        if self.grant_type not in self.provider.grant_types:
169            LOGGER.warning("Invalid grant_type for provider", grant_type=self.grant_type)
170            raise TokenError("invalid_grant").with_cause("grant_type_not_configured")
171
172        # Confidential clients MUST authenticate to the token endpoint per
173        # RFC 6749 §2.3.1. The device code grant (RFC 8628 §3.4) inherits
174        # that requirement - the device_code alone is not a substitute for
175        # client credentials.
176        if self.grant_type in [
177            GRANT_TYPE_AUTHORIZATION_CODE,
178            GRANT_TYPE_REFRESH_TOKEN,
179            GRANT_TYPE_DEVICE_CODE,
180        ]:
181            if self.provider.client_type == ClientType.CONFIDENTIAL and not compare_digest(
182                self.provider.client_secret, self.client_secret
183            ):
184                LOGGER.warning(
185                    "Invalid client secret",
186                    client_id=self.provider.client_id,
187                )
188                raise TokenError("invalid_client")
189        self.__check_scopes()
190        if self.grant_type == GRANT_TYPE_AUTHORIZATION_CODE:
191            with start_span(
192                op="authentik.providers.oauth2.post.parse.code",
193            ):
194                self.__post_init_code(raw_code, request)
195        elif self.grant_type == GRANT_TYPE_REFRESH_TOKEN:
196            with start_span(
197                op="authentik.providers.oauth2.post.parse.refresh",
198            ):
199                self.__post_init_refresh(raw_token, request)
200        elif self.grant_type in [GRANT_TYPE_CLIENT_CREDENTIALS, GRANT_TYPE_PASSWORD]:
201            with start_span(
202                op="authentik.providers.oauth2.post.parse.client_credentials",
203            ):
204                self.__post_init_client_credentials(request)
205        elif self.grant_type == GRANT_TYPE_DEVICE_CODE:
206            with start_span(
207                op="authentik.providers.oauth2.post.parse.device_code",
208            ):
209                self.__post_init_device_code(request)
210        else:
211            LOGGER.warning("Invalid grant type", grant_type=self.grant_type)
212            raise TokenError("unsupported_grant_type")
213
214    def __post_init_code(self, raw_code: str, request: HttpRequest):
215        if not raw_code:
216            LOGGER.warning("Missing authorization code")
217            raise TokenError("invalid_grant")
218
219        self.__check_redirect_uri(request)
220
221        self.authorization_code = AuthorizationCode.objects.filter(code=raw_code).first()
222        if not self.authorization_code:
223            LOGGER.warning("Code does not exist", code=raw_code)
224            raise TokenError("invalid_grant")
225
226        if self.authorization_code.is_expired:
227            LOGGER.warning(
228                "Code is expired",
229                token=raw_code,
230            )
231            raise TokenError("invalid_grant")
232
233        if self.authorization_code.provider != self.provider or self.authorization_code.is_expired:
234            LOGGER.warning("Invalid code: invalid client or code has expired")
235            raise TokenError("invalid_grant")
236
237        # Validate PKCE parameters.
238        if self.authorization_code.code_challenge:
239            # Authorization code had PKCE but we didn't get one
240            if not self.code_verifier:
241                raise TokenError("invalid_grant")
242            if self.authorization_code.code_challenge_method == PKCE_METHOD_S256:
243                new_code_challenge = pkce_s256_challenge(self.code_verifier)
244            else:
245                new_code_challenge = self.code_verifier
246
247            if new_code_challenge != self.authorization_code.code_challenge:
248                LOGGER.warning("Code challenge not matching")
249                raise TokenError("invalid_grant")
250        # Token request had a code_verifier but code did not have a code challenge
251        # Prevent downgrade
252        if not self.authorization_code.code_challenge and self.code_verifier:
253            raise TokenError("invalid_grant")
254
255    def __check_redirect_uri(self, request: HttpRequest):
256        allowed_redirect_urls = self.provider.authorization_redirect_uris
257        # At this point, no provider should have a blank redirect_uri, in case they do
258        # this will check an empty array and raise an error
259
260        match_found = False
261        for allowed in allowed_redirect_urls:
262            if allowed.matching_mode == RedirectURIMatchingMode.STRICT:
263                if self.redirect_uri == allowed.url:
264                    match_found = True
265                    break
266            if allowed.matching_mode == RedirectURIMatchingMode.REGEX:
267                try:
268                    if fullmatch(allowed.url, self.redirect_uri):
269                        match_found = True
270                        break
271                except RegexError as exc:
272                    LOGGER.warning(
273                        "Failed to parse regular expression",
274                        exc=exc,
275                        url=allowed.url,
276                        provider=self.provider,
277                    )
278                    Event.new(
279                        EventAction.CONFIGURATION_ERROR,
280                        message="Invalid redirect_uri configured",
281                        provider=self.provider,
282                    ).from_http(request)
283        if not match_found:
284            Event.new(
285                EventAction.CONFIGURATION_ERROR,
286                message="Invalid redirect URI used by provider",
287                provider=self.provider,
288                redirect_uri=self.redirect_uri,
289                expected=allowed_redirect_urls,
290            ).from_http(request)
291            raise TokenError("invalid_client")
292
293        # Check against forbidden schemes
294        if urlparse(self.redirect_uri).scheme in FORBIDDEN_URI_SCHEMES:
295            raise TokenError("invalid_request")
296
297    def __post_init_refresh(self, raw_token: str, request: HttpRequest):
298        if not raw_token:
299            LOGGER.warning("Missing refresh token")
300            raise TokenError("invalid_grant")
301
302        self.refresh_token = RefreshToken.objects.filter(
303            token=raw_token, provider=self.provider
304        ).first()
305        if not self.refresh_token:
306            LOGGER.warning(
307                "Refresh token does not exist",
308                token=raw_token,
309            )
310            raise TokenError("invalid_grant")
311        if self.refresh_token.is_expired:
312            LOGGER.warning(
313                "Refresh token is expired",
314                token=raw_token,
315            )
316            raise TokenError("invalid_grant")
317        # https://datatracker.ietf.org/doc/html/rfc6749#section-6
318        # Fallback to original token's scopes when none are given
319        if not self.scope:
320            self.scope = self.refresh_token.scope
321        if self.refresh_token.revoked:
322            LOGGER.warning("Refresh token is revoked", token=raw_token)
323            Event.new(
324                action=EventAction.SUSPICIOUS_REQUEST,
325                message="Revoked refresh token was used",
326                token=self.refresh_token,
327                provider=self.refresh_token.provider,
328            ).from_http(request, user=self.refresh_token.user)
329            raise TokenError("invalid_grant")
330
331    def __post_init_client_credentials(self, request: HttpRequest):
332        # client_credentials flow with client assertion
333        if request.POST.get(CLIENT_ASSERTION_TYPE, "") != "":
334            return self.__post_init_client_credentials_jwt(request)
335        # authentik-custom-ish client credentials flow
336        if request.POST.get("username", "") != "":
337            return self.__post_init_client_credentials_creds(
338                request, request.POST.get("username"), request.POST.get("password")
339            )
340        # Standard method which creates an automatic user
341        if self.client_secret == self.provider.client_secret:
342            return self.__post_init_client_credentials_generated(request)
343        # Standard workaround method which stores username:password
344        # as client_secret
345        try:
346            user, _, password = b64decode(self.client_secret).decode("utf-8").partition(":")
347            return self.__post_init_client_credentials_creds(request, user, password)
348        except ValueError, Error:
349            raise TokenError("invalid_grant") from None
350
351    def __post_init_client_credentials_creds(
352        self, request: HttpRequest, username: str, password: str
353    ):
354        # Authenticate user based on credentials
355        user = User.objects.filter(username=username, is_active=True).first()
356        if not user:
357            raise TokenError("invalid_grant")
358        token: Token = Token.objects.filter(
359            key=password, intent=TokenIntents.INTENT_APP_PASSWORD, user=user
360        ).first()
361        if not token or token.user.uid != user.uid:
362            raise TokenError("invalid_grant")
363        self.user = user
364        # Authorize user access
365        app = Application.objects.filter(provider=self.provider).first()
366        if not app or not app.provider:
367            raise TokenError("invalid_grant")
368        self.__check_policy_access(app, request)
369
370        Event.new(
371            action=EventAction.LOGIN,
372            **{
373                PLAN_CONTEXT_METHOD: "token",
374                PLAN_CONTEXT_METHOD_ARGS: {
375                    "identifier": token.identifier,
376                },
377                PLAN_CONTEXT_APPLICATION: app,
378            },
379        ).from_http(request, user=user)
380
381    def __validate_jwt_from_source(
382        self, assertion: str
383    ) -> tuple[dict, OAuthSource] | tuple[None, None]:
384        # Fully decode the JWT without verifying the signature, so we can get access to
385        # the header.
386        # Get the Key ID from the header, and use that to optimize our source query to only find
387        # sources that have a JWK for that Key ID
388        # The Key ID doesn't have a fixed format, but must match between an issued JWT
389        # and whatever is returned by the JWKS endpoint
390        try:
391            decode_unvalidated = PyJWT().decode_complete(
392                assertion, options={"verify_signature": False}
393            )
394        except (PyJWTError, ValueError, TypeError, AttributeError) as exc:
395            LOGGER.warning("failed to parse JWT for kid lookup", exc=exc)
396            raise TokenError("invalid_grant") from None
397        expected_kid = decode_unvalidated["header"].get("kid")
398        fallback_alg = decode_unvalidated["header"].get("alg")
399        token = source = None
400        if not expected_kid or not fallback_alg:
401            return None, None
402        for source in self.provider.jwt_federation_sources.filter(
403            oidc_jwks__keys__contains=[{"kid": expected_kid}]
404        ):
405            LOGGER.debug("verifying JWT with source", source=source.slug)
406            keys = source.oidc_jwks.get("keys", [])
407            for key in keys:
408                if key.get("kid") and key.get("kid") != expected_kid:
409                    continue
410                LOGGER.debug("verifying JWT with key", source=source.slug, key=key.get("kid"))
411                try:
412                    parsed_key = PyJWK.from_dict(key).key
413                    token = decode(
414                        assertion,
415                        parsed_key,
416                        algorithms=[key.get("alg")] if "alg" in key else [fallback_alg],
417                        options={
418                            "verify_aud": False,
419                        },
420                    )
421                # AttributeError is raised when the configured JWK is a private key
422                # and not a public key
423                except (PyJWTError, ValueError, TypeError, AttributeError) as exc:
424                    LOGGER.warning("failed to verify JWT", exc=exc, source=source.slug)
425        if token:
426            LOGGER.info("successfully verified JWT with source", source=source.slug)
427        return token, source
428
429    def __validate_jwt_from_provider(
430        self, assertion: str
431    ) -> tuple[dict, OAuth2Provider] | tuple[None, None]:
432        token = provider = _key = None
433        federated_token = AccessToken.objects.filter(
434            token=assertion, provider__in=self.provider.jwt_federation_providers.all()
435        ).first()
436        if federated_token:
437            _key, _alg = federated_token.provider.jwt_key
438            try:
439                token = decode(
440                    assertion,
441                    _key.public_key(),
442                    algorithms=[_alg],
443                    options={
444                        "verify_aud": False,
445                    },
446                )
447                provider = federated_token.provider
448                self.user = federated_token.user
449            except (PyJWTError, ValueError, TypeError, AttributeError) as exc:
450                LOGGER.warning(
451                    "failed to verify JWT", exc=exc, provider=federated_token.provider.name
452                )
453
454        if token:
455            LOGGER.info("successfully verified JWT with provider", provider=provider.name)
456        return token, provider
457
458    def __post_init_client_credentials_jwt(self, request: HttpRequest):
459        assertion_type = request.POST.get(CLIENT_ASSERTION_TYPE, "")
460        if assertion_type != CLIENT_ASSERTION_TYPE_JWT:
461            LOGGER.warning("Invalid assertion type", assertion_type=assertion_type)
462            raise TokenError("invalid_grant")
463
464        client_secret = request.POST.get("client_secret", None)
465        assertion = request.POST.get(CLIENT_ASSERTION, client_secret)
466        if not assertion:
467            LOGGER.warning("Missing client assertion")
468            raise TokenError("invalid_grant")
469
470        source = provider = None
471
472        token, source = self.__validate_jwt_from_source(assertion)
473        if not token:
474            token, provider = self.__validate_jwt_from_provider(assertion)
475
476        if not token:
477            LOGGER.warning("No token could be verified")
478            raise TokenError("invalid_grant")
479
480        if "exp" in token:
481            exp = datetime.fromtimestamp(token["exp"])
482            # Non-timezone aware check since we assume `exp` is in UTC
483            if datetime.now() >= exp:
484                LOGGER.info("JWT token expired")
485                raise TokenError("invalid_grant")
486
487        app = Application.objects.filter(provider=self.provider).first()
488        if not app or not app.provider:
489            LOGGER.info("client_credentials grant for provider without application")
490            raise TokenError("invalid_grant")
491
492        self.__check_policy_access(app, request, oauth_jwt=token)
493        if not provider:
494            self.__create_user_from_jwt(token, app, source, request)
495
496        method_args = {
497            "jwt": token,
498        }
499        if source:
500            method_args["source"] = source
501        if provider:
502            method_args["provider"] = provider
503        Event.new(
504            action=EventAction.LOGIN,
505            **{
506                PLAN_CONTEXT_METHOD: "jwt",
507                PLAN_CONTEXT_METHOD_ARGS: method_args,
508                PLAN_CONTEXT_APPLICATION: app,
509            },
510        ).from_http(request, user=self.user)
511
512    def __post_init_client_credentials_generated(self, request: HttpRequest):
513        # Authorize user access
514        app = Application.objects.filter(provider=self.provider).first()
515        if not app or not app.provider:
516            raise TokenError("invalid_grant")
517        with audit_ignore():
518            self.user, _ = User.objects.update_or_create(
519                # trim username to ensure the entire username is max 150 chars
520                # (22 chars being the length of the "template")
521                username=f"ak-{self.provider.name[: USERNAME_MAX_LENGTH - 22]}-client_credentials",
522                defaults={
523                    "last_login": timezone.now(),
524                    "name": f"Autogenerated user from application {app.name} (client credentials)",
525                    "path": f"{USER_PATH_SYSTEM_PREFIX}/apps/{app.slug}",
526                    "type": UserTypes.SERVICE_ACCOUNT,
527                },
528            )
529            self.user.attributes[USER_ATTRIBUTE_GENERATED] = True
530            self.user.save()
531        self.__check_policy_access(app, request)
532
533        Event.new(
534            action=EventAction.LOGIN,
535            **{
536                PLAN_CONTEXT_METHOD: "oauth_client_secret",
537                PLAN_CONTEXT_APPLICATION: app,
538            },
539        ).from_http(request, user=self.user)
540
541    def __post_init_device_code(self, request: HttpRequest):
542        device_code = request.POST.get("device_code", "")
543        code = DeviceToken.objects.filter(device_code=device_code, provider=self.provider).first()
544        if not code:
545            raise TokenError("invalid_grant")
546        self.device_code = code
547
548    def __create_user_from_jwt(
549        self, token: dict[str, Any], app: Application, source: OAuthSource, request: HttpRequest
550    ):
551        """Create user from JWT"""
552        with audit_ignore():
553            # Run the JWT payload through the core mapping engine
554            mapped = SourceMapper(source).build_object_properties(
555                User, request=request, info=token, oauth_userinfo=token
556            )
557
558            self.user, created = User.objects.update_or_create(
559                username=mapped.get("username", f"{self.provider.name}-{token.get('sub')}")[
560                    :USERNAME_MAX_LENGTH
561                ],
562                defaults={
563                    "last_login": timezone.now(),
564                    "name": mapped.get(
565                        "name",
566                        f"Autogenerated user from application {app.name} (client credentials JWT)",
567                    ),
568                    "email": mapped.get("email", ""),
569                    "path": source.get_user_path(),
570                    "type": UserTypes.SERVICE_ACCOUNT,
571                    "attributes": mapped.get("attributes", {}),
572                },
573            )
574            self.user.attributes[USER_ATTRIBUTE_GENERATED] = True
575            self.user.save()
576            exp = token.get("exp")
577            if created and exp:
578                self.user.attributes[USER_ATTRIBUTE_EXPIRES] = exp
579                self.user.save()
580
581
582@method_decorator(csrf_exempt, name="dispatch")
583class TokenView(View):
584    """Generate tokens for clients"""
585
586    provider: OAuth2Provider | None = None
587    params: TokenParams | None = None
588    params_class = TokenParams
589    provider_class = OAuth2Provider
590
591    def dispatch(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse:
592        response = super().dispatch(request, *args, **kwargs)
593        allowed_origins = []
594        if self.provider:
595            allowed_origins = [x.url for x in self.provider.redirect_uris]
596        cors_allow(self.request, response, *allowed_origins)
597        return response
598
599    def options(self, request: HttpRequest) -> HttpResponse:
600        return TokenResponse({})
601
602    def post(self, request: HttpRequest) -> HttpResponse:
603        """Generate tokens for clients"""
604        try:
605            with start_span(
606                op="authentik.providers.oauth2.post.parse",
607            ):
608                client_id, client_secret = extract_client_auth(request)
609                self.provider = self.provider_class.objects.filter(client_id=client_id).first()
610                if not self.provider:
611                    LOGGER.warning("OAuth2Provider does not exist", client_id=client_id)
612                    raise TokenError("invalid_client")
613                self.params = self.params_class.parse(
614                    request, self.provider, client_id, client_secret
615                )
616                CTX_AUTH_VIA.set("oauth_client_secret")
617
618            with start_span(
619                op="authentik.providers.oauth2.post.response",
620            ):
621                if self.params.grant_type == GRANT_TYPE_AUTHORIZATION_CODE:
622                    LOGGER.debug("Converting authorization code to access token")
623                    return TokenResponse(self.create_code_response())
624                if self.params.grant_type == GRANT_TYPE_REFRESH_TOKEN:
625                    LOGGER.debug("Refreshing refresh token")
626                    return TokenResponse(self.create_refresh_response())
627                if self.params.grant_type in [GRANT_TYPE_CLIENT_CREDENTIALS, GRANT_TYPE_PASSWORD]:
628                    LOGGER.debug("Client credentials/password grant")
629                    return TokenResponse(self.create_client_credentials_response())
630                if self.params.grant_type == GRANT_TYPE_DEVICE_CODE:
631                    LOGGER.debug("Device code grant")
632                    return TokenResponse(self.create_device_code_response())
633                raise TokenError("unsupported_grant_type")
634        except (TokenError, DeviceCodeError) as error:
635            return TokenResponse(error.create_dict(request), status=400)
636        except UserAuthError as error:
637            return TokenResponse(error.create_dict(request), status=403)
638
639    def create_code_response(self) -> dict[str, Any]:
640        """See https://datatracker.ietf.org/doc/html/rfc6749#section-4.1"""
641        now = timezone.now()
642        access_token_expiry = now + timedelta_from_string(self.provider.access_token_validity)
643        access_token = AccessToken(
644            provider=self.provider,
645            user=self.params.authorization_code.user,
646            expires=access_token_expiry,
647            # Keep same scopes as previous token
648            scope=self.params.authorization_code.scope,
649            auth_time=self.params.authorization_code.auth_time,
650            session=self.params.authorization_code.session,
651        )
652        access_id_token = IDToken.new(
653            self.provider,
654            access_token,
655            self.request,
656        )
657        access_id_token.nonce = self.params.authorization_code.nonce
658        access_token.id_token = access_id_token
659        access_token.save()
660
661        response = {
662            "access_token": access_token.token,
663            "token_type": TOKEN_TYPE,
664            "scope": " ".join(access_token.scope),
665            "expires_in": int(
666                timedelta_from_string(self.provider.access_token_validity).total_seconds()
667            ),
668            "id_token": access_token.id_token.to_jwt(self.provider),
669        }
670
671        if SCOPE_OFFLINE_ACCESS in self.params.authorization_code.scope:
672            refresh_token_expiry = now + timedelta_from_string(self.provider.refresh_token_validity)
673            refresh_token = RefreshToken(
674                user=self.params.authorization_code.user,
675                scope=self.params.authorization_code.scope,
676                expires=refresh_token_expiry,
677                provider=self.provider,
678                auth_time=self.params.authorization_code.auth_time,
679                session=self.params.authorization_code.session,
680            )
681            id_token = IDToken.new(
682                self.provider,
683                refresh_token,
684                self.request,
685            )
686            id_token.nonce = self.params.authorization_code.nonce
687            id_token.at_hash = access_token.at_hash
688            refresh_token.id_token = id_token
689            refresh_token.save()
690            response["refresh_token"] = refresh_token.token
691
692        # Delete old code
693        self.params.authorization_code.delete()
694        return response
695
696    def create_refresh_response(self) -> dict[str, Any]:
697        """See https://datatracker.ietf.org/doc/html/rfc6749#section-6"""
698        unauthorized_scopes = set(self.params.scope) - set(self.params.refresh_token.scope)
699        if unauthorized_scopes:
700            raise TokenError("invalid_scope")
701        if SCOPE_OFFLINE_ACCESS not in self.params.scope:
702            raise TokenError("invalid_scope")
703        now = timezone.now()
704        access_token_expiry = now + timedelta_from_string(self.provider.access_token_validity)
705        access_token = AccessToken(
706            provider=self.provider,
707            user=self.params.refresh_token.user,
708            expires=access_token_expiry,
709            # Keep same scopes as previous token
710            scope=self.params.refresh_token.scope,
711            auth_time=self.params.refresh_token.auth_time,
712            session=self.params.refresh_token.session,
713        )
714        access_token.id_token = IDToken.new(
715            self.provider,
716            access_token,
717            self.request,
718        )
719        access_token.save()
720
721        res = {
722            "access_token": access_token.token,
723            "token_type": TOKEN_TYPE,
724            "scope": " ".join(access_token.scope),
725            "expires_in": int(
726                timedelta_from_string(self.provider.access_token_validity).total_seconds()
727            ),
728            "id_token": access_token.id_token.to_jwt(self.provider),
729        }
730
731        refresh_token_threshold = timedelta_from_string(self.provider.refresh_token_threshold)
732        if (
733            refresh_token_threshold.total_seconds() == 0
734            or (self.params.refresh_token.expires - now) < refresh_token_threshold
735        ):
736            refresh_token_expiry = now + timedelta_from_string(self.provider.refresh_token_validity)
737            refresh_token = RefreshToken(
738                user=self.params.refresh_token.user,
739                scope=self.params.refresh_token.scope,
740                expires=refresh_token_expiry,
741                provider=self.provider,
742                auth_time=self.params.refresh_token.auth_time,
743                session=self.params.refresh_token.session,
744            )
745            id_token = IDToken.new(
746                self.provider,
747                refresh_token,
748                self.request,
749            )
750            id_token.nonce = self.params.refresh_token.id_token.nonce
751            id_token.at_hash = access_token.at_hash
752            refresh_token.id_token = id_token
753            refresh_token.save()
754
755            # Mark old token as revoked
756            self.params.refresh_token.revoked = True
757            self.params.refresh_token.save()
758            res["refresh_token"] = refresh_token.token
759
760        return res
761
762    def create_client_credentials_response(self) -> dict[str, Any]:
763        """See https://datatracker.ietf.org/doc/html/rfc6749#section-4.4"""
764        now = timezone.now()
765        access_token_expiry = now + timedelta_from_string(self.provider.access_token_validity)
766        access_token = AccessToken(
767            provider=self.provider,
768            user=self.params.user,
769            expires=access_token_expiry,
770            scope=self.params.scope,
771            auth_time=now,
772        )
773        access_token.id_token = IDToken.new(
774            self.provider,
775            access_token,
776            self.request,
777        )
778        access_token.save()
779        return {
780            "access_token": access_token.token,
781            "token_type": TOKEN_TYPE,
782            "scope": " ".join(access_token.scope),
783            "expires_in": int(
784                timedelta_from_string(self.provider.access_token_validity).total_seconds()
785            ),
786            "id_token": access_token.id_token.to_jwt(self.provider),
787        }
788
789    def create_device_code_response(self) -> dict[str, Any]:
790        """See https://datatracker.ietf.org/doc/html/rfc8628"""
791        if not self.params.device_code.user:
792            raise DeviceCodeError("authorization_pending")
793        now = timezone.now()
794        access_token_expiry = now + timedelta_from_string(self.provider.access_token_validity)
795        auth_event = get_login_event(self.params.device_code.session)
796        access_token = AccessToken(
797            provider=self.provider,
798            user=self.params.device_code.user,
799            expires=access_token_expiry,
800            scope=self.params.device_code.scope,
801            auth_time=auth_event.created if auth_event else now,
802            session=self.params.device_code.session,
803        )
804        access_token.id_token = IDToken.new(
805            self.provider,
806            access_token,
807            self.request,
808        )
809        access_token.save()
810
811        response = {
812            "access_token": access_token.token,
813            "token_type": TOKEN_TYPE,
814            "scope": " ".join(access_token.scope),
815            "expires_in": int(
816                timedelta_from_string(self.provider.access_token_validity).total_seconds()
817            ),
818            "id_token": access_token.id_token.to_jwt(self.provider),
819        }
820
821        if SCOPE_OFFLINE_ACCESS in self.params.device_code.scope:
822            refresh_token_expiry = now + timedelta_from_string(self.provider.refresh_token_validity)
823            refresh_token = RefreshToken(
824                user=self.params.device_code.user,
825                scope=self.params.device_code.scope,
826                expires=refresh_token_expiry,
827                provider=self.provider,
828                auth_time=auth_event.created if auth_event else now,
829            )
830            id_token = IDToken.new(
831                self.provider,
832                refresh_token,
833                self.request,
834            )
835            id_token.at_hash = access_token.at_hash
836            refresh_token.id_token = id_token
837            refresh_token.save()
838            response["refresh_token"] = refresh_token.token
839
840        # Delete device code
841        self.params.device_code.delete()
842        return response
LOGGER = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
@dataclass(slots=True)
class TokenParams:
 82@dataclass(slots=True)
 83class TokenParams:
 84    """Token params"""
 85
 86    client_id: str
 87    client_secret: str
 88    redirect_uri: str
 89    grant_type: str
 90    state: str
 91    scope: set[str]
 92
 93    provider: OAuth2Provider
 94
 95    authorization_code: AuthorizationCode | None = None
 96    refresh_token: RefreshToken | None = None
 97    device_code: DeviceToken | None = None
 98    user: User | None = None
 99
100    code_verifier: str | None = None
101
102    raw_code: InitVar[str] = ""
103    raw_token: InitVar[str] = ""
104    request: InitVar[HttpRequest | None] = None
105
106    @staticmethod
107    def parse(
108        request: HttpRequest,
109        provider: OAuth2Provider,
110        client_id: str,
111        client_secret: str,
112    ) -> TokenParams:
113        """Parse params for request"""
114        return TokenParams(
115            # Init vars
116            raw_code=request.POST.get("code", ""),
117            raw_token=request.POST.get("refresh_token", ""),
118            request=request,
119            # Regular params
120            provider=provider,
121            client_id=client_id,
122            client_secret=client_secret,
123            redirect_uri=request.POST.get("redirect_uri", ""),
124            grant_type=request.POST.get("grant_type", ""),
125            state=request.POST.get("state", ""),
126            scope=set(request.POST.get("scope", "").split()),
127            # PKCE parameter.
128            code_verifier=request.POST.get("code_verifier"),
129        )
130
131    def __check_scopes(self):
132        allowed_scope_names = set(
133            ScopeMapping.objects.filter(provider__in=[self.provider]).values_list(
134                "scope_name", flat=True
135            )
136        )
137        scopes_to_check = self.scope
138        if not scopes_to_check.issubset(allowed_scope_names):
139            LOGGER.info(
140                "Application requested scopes not configured, setting to overlap",
141                scope_allowed=allowed_scope_names,
142                scope_given=self.scope,
143            )
144            self.scope = self.scope.intersection(allowed_scope_names)
145
146    def __check_policy_access(self, app: Application, request: HttpRequest, **kwargs):
147        with start_span(
148            op="authentik.providers.oauth2.token.policy",
149        ):
150            user = self.user if self.user else get_anonymous_user()
151            engine = PolicyEngine(app, user, request)
152            engine.empty_result = AppAccessWithoutBindings.get()
153            # Don't cache as for client_credentials flows the user will not be set
154            # so we'll get generic cache results
155            engine.use_cache = False
156            engine.request.context["oauth_scopes"] = self.scope
157            engine.request.context["oauth_grant_type"] = self.grant_type
158            engine.request.context["oauth_code_verifier"] = self.code_verifier
159            engine.request.context.update(kwargs)
160            engine.build()
161            result = engine.result
162            if not result.passing:
163                LOGGER.info(
164                    "User not authenticated for application", user=self.user, app_slug=app.slug
165                )
166                raise TokenError("invalid_grant")
167
168    def __post_init__(self, raw_code: str, raw_token: str, request: HttpRequest):
169        if self.grant_type not in self.provider.grant_types:
170            LOGGER.warning("Invalid grant_type for provider", grant_type=self.grant_type)
171            raise TokenError("invalid_grant").with_cause("grant_type_not_configured")
172
173        # Confidential clients MUST authenticate to the token endpoint per
174        # RFC 6749 §2.3.1. The device code grant (RFC 8628 §3.4) inherits
175        # that requirement - the device_code alone is not a substitute for
176        # client credentials.
177        if self.grant_type in [
178            GRANT_TYPE_AUTHORIZATION_CODE,
179            GRANT_TYPE_REFRESH_TOKEN,
180            GRANT_TYPE_DEVICE_CODE,
181        ]:
182            if self.provider.client_type == ClientType.CONFIDENTIAL and not compare_digest(
183                self.provider.client_secret, self.client_secret
184            ):
185                LOGGER.warning(
186                    "Invalid client secret",
187                    client_id=self.provider.client_id,
188                )
189                raise TokenError("invalid_client")
190        self.__check_scopes()
191        if self.grant_type == GRANT_TYPE_AUTHORIZATION_CODE:
192            with start_span(
193                op="authentik.providers.oauth2.post.parse.code",
194            ):
195                self.__post_init_code(raw_code, request)
196        elif self.grant_type == GRANT_TYPE_REFRESH_TOKEN:
197            with start_span(
198                op="authentik.providers.oauth2.post.parse.refresh",
199            ):
200                self.__post_init_refresh(raw_token, request)
201        elif self.grant_type in [GRANT_TYPE_CLIENT_CREDENTIALS, GRANT_TYPE_PASSWORD]:
202            with start_span(
203                op="authentik.providers.oauth2.post.parse.client_credentials",
204            ):
205                self.__post_init_client_credentials(request)
206        elif self.grant_type == GRANT_TYPE_DEVICE_CODE:
207            with start_span(
208                op="authentik.providers.oauth2.post.parse.device_code",
209            ):
210                self.__post_init_device_code(request)
211        else:
212            LOGGER.warning("Invalid grant type", grant_type=self.grant_type)
213            raise TokenError("unsupported_grant_type")
214
215    def __post_init_code(self, raw_code: str, request: HttpRequest):
216        if not raw_code:
217            LOGGER.warning("Missing authorization code")
218            raise TokenError("invalid_grant")
219
220        self.__check_redirect_uri(request)
221
222        self.authorization_code = AuthorizationCode.objects.filter(code=raw_code).first()
223        if not self.authorization_code:
224            LOGGER.warning("Code does not exist", code=raw_code)
225            raise TokenError("invalid_grant")
226
227        if self.authorization_code.is_expired:
228            LOGGER.warning(
229                "Code is expired",
230                token=raw_code,
231            )
232            raise TokenError("invalid_grant")
233
234        if self.authorization_code.provider != self.provider or self.authorization_code.is_expired:
235            LOGGER.warning("Invalid code: invalid client or code has expired")
236            raise TokenError("invalid_grant")
237
238        # Validate PKCE parameters.
239        if self.authorization_code.code_challenge:
240            # Authorization code had PKCE but we didn't get one
241            if not self.code_verifier:
242                raise TokenError("invalid_grant")
243            if self.authorization_code.code_challenge_method == PKCE_METHOD_S256:
244                new_code_challenge = pkce_s256_challenge(self.code_verifier)
245            else:
246                new_code_challenge = self.code_verifier
247
248            if new_code_challenge != self.authorization_code.code_challenge:
249                LOGGER.warning("Code challenge not matching")
250                raise TokenError("invalid_grant")
251        # Token request had a code_verifier but code did not have a code challenge
252        # Prevent downgrade
253        if not self.authorization_code.code_challenge and self.code_verifier:
254            raise TokenError("invalid_grant")
255
256    def __check_redirect_uri(self, request: HttpRequest):
257        allowed_redirect_urls = self.provider.authorization_redirect_uris
258        # At this point, no provider should have a blank redirect_uri, in case they do
259        # this will check an empty array and raise an error
260
261        match_found = False
262        for allowed in allowed_redirect_urls:
263            if allowed.matching_mode == RedirectURIMatchingMode.STRICT:
264                if self.redirect_uri == allowed.url:
265                    match_found = True
266                    break
267            if allowed.matching_mode == RedirectURIMatchingMode.REGEX:
268                try:
269                    if fullmatch(allowed.url, self.redirect_uri):
270                        match_found = True
271                        break
272                except RegexError as exc:
273                    LOGGER.warning(
274                        "Failed to parse regular expression",
275                        exc=exc,
276                        url=allowed.url,
277                        provider=self.provider,
278                    )
279                    Event.new(
280                        EventAction.CONFIGURATION_ERROR,
281                        message="Invalid redirect_uri configured",
282                        provider=self.provider,
283                    ).from_http(request)
284        if not match_found:
285            Event.new(
286                EventAction.CONFIGURATION_ERROR,
287                message="Invalid redirect URI used by provider",
288                provider=self.provider,
289                redirect_uri=self.redirect_uri,
290                expected=allowed_redirect_urls,
291            ).from_http(request)
292            raise TokenError("invalid_client")
293
294        # Check against forbidden schemes
295        if urlparse(self.redirect_uri).scheme in FORBIDDEN_URI_SCHEMES:
296            raise TokenError("invalid_request")
297
298    def __post_init_refresh(self, raw_token: str, request: HttpRequest):
299        if not raw_token:
300            LOGGER.warning("Missing refresh token")
301            raise TokenError("invalid_grant")
302
303        self.refresh_token = RefreshToken.objects.filter(
304            token=raw_token, provider=self.provider
305        ).first()
306        if not self.refresh_token:
307            LOGGER.warning(
308                "Refresh token does not exist",
309                token=raw_token,
310            )
311            raise TokenError("invalid_grant")
312        if self.refresh_token.is_expired:
313            LOGGER.warning(
314                "Refresh token is expired",
315                token=raw_token,
316            )
317            raise TokenError("invalid_grant")
318        # https://datatracker.ietf.org/doc/html/rfc6749#section-6
319        # Fallback to original token's scopes when none are given
320        if not self.scope:
321            self.scope = self.refresh_token.scope
322        if self.refresh_token.revoked:
323            LOGGER.warning("Refresh token is revoked", token=raw_token)
324            Event.new(
325                action=EventAction.SUSPICIOUS_REQUEST,
326                message="Revoked refresh token was used",
327                token=self.refresh_token,
328                provider=self.refresh_token.provider,
329            ).from_http(request, user=self.refresh_token.user)
330            raise TokenError("invalid_grant")
331
332    def __post_init_client_credentials(self, request: HttpRequest):
333        # client_credentials flow with client assertion
334        if request.POST.get(CLIENT_ASSERTION_TYPE, "") != "":
335            return self.__post_init_client_credentials_jwt(request)
336        # authentik-custom-ish client credentials flow
337        if request.POST.get("username", "") != "":
338            return self.__post_init_client_credentials_creds(
339                request, request.POST.get("username"), request.POST.get("password")
340            )
341        # Standard method which creates an automatic user
342        if self.client_secret == self.provider.client_secret:
343            return self.__post_init_client_credentials_generated(request)
344        # Standard workaround method which stores username:password
345        # as client_secret
346        try:
347            user, _, password = b64decode(self.client_secret).decode("utf-8").partition(":")
348            return self.__post_init_client_credentials_creds(request, user, password)
349        except ValueError, Error:
350            raise TokenError("invalid_grant") from None
351
352    def __post_init_client_credentials_creds(
353        self, request: HttpRequest, username: str, password: str
354    ):
355        # Authenticate user based on credentials
356        user = User.objects.filter(username=username, is_active=True).first()
357        if not user:
358            raise TokenError("invalid_grant")
359        token: Token = Token.objects.filter(
360            key=password, intent=TokenIntents.INTENT_APP_PASSWORD, user=user
361        ).first()
362        if not token or token.user.uid != user.uid:
363            raise TokenError("invalid_grant")
364        self.user = user
365        # Authorize user access
366        app = Application.objects.filter(provider=self.provider).first()
367        if not app or not app.provider:
368            raise TokenError("invalid_grant")
369        self.__check_policy_access(app, request)
370
371        Event.new(
372            action=EventAction.LOGIN,
373            **{
374                PLAN_CONTEXT_METHOD: "token",
375                PLAN_CONTEXT_METHOD_ARGS: {
376                    "identifier": token.identifier,
377                },
378                PLAN_CONTEXT_APPLICATION: app,
379            },
380        ).from_http(request, user=user)
381
382    def __validate_jwt_from_source(
383        self, assertion: str
384    ) -> tuple[dict, OAuthSource] | tuple[None, None]:
385        # Fully decode the JWT without verifying the signature, so we can get access to
386        # the header.
387        # Get the Key ID from the header, and use that to optimize our source query to only find
388        # sources that have a JWK for that Key ID
389        # The Key ID doesn't have a fixed format, but must match between an issued JWT
390        # and whatever is returned by the JWKS endpoint
391        try:
392            decode_unvalidated = PyJWT().decode_complete(
393                assertion, options={"verify_signature": False}
394            )
395        except (PyJWTError, ValueError, TypeError, AttributeError) as exc:
396            LOGGER.warning("failed to parse JWT for kid lookup", exc=exc)
397            raise TokenError("invalid_grant") from None
398        expected_kid = decode_unvalidated["header"].get("kid")
399        fallback_alg = decode_unvalidated["header"].get("alg")
400        token = source = None
401        if not expected_kid or not fallback_alg:
402            return None, None
403        for source in self.provider.jwt_federation_sources.filter(
404            oidc_jwks__keys__contains=[{"kid": expected_kid}]
405        ):
406            LOGGER.debug("verifying JWT with source", source=source.slug)
407            keys = source.oidc_jwks.get("keys", [])
408            for key in keys:
409                if key.get("kid") and key.get("kid") != expected_kid:
410                    continue
411                LOGGER.debug("verifying JWT with key", source=source.slug, key=key.get("kid"))
412                try:
413                    parsed_key = PyJWK.from_dict(key).key
414                    token = decode(
415                        assertion,
416                        parsed_key,
417                        algorithms=[key.get("alg")] if "alg" in key else [fallback_alg],
418                        options={
419                            "verify_aud": False,
420                        },
421                    )
422                # AttributeError is raised when the configured JWK is a private key
423                # and not a public key
424                except (PyJWTError, ValueError, TypeError, AttributeError) as exc:
425                    LOGGER.warning("failed to verify JWT", exc=exc, source=source.slug)
426        if token:
427            LOGGER.info("successfully verified JWT with source", source=source.slug)
428        return token, source
429
430    def __validate_jwt_from_provider(
431        self, assertion: str
432    ) -> tuple[dict, OAuth2Provider] | tuple[None, None]:
433        token = provider = _key = None
434        federated_token = AccessToken.objects.filter(
435            token=assertion, provider__in=self.provider.jwt_federation_providers.all()
436        ).first()
437        if federated_token:
438            _key, _alg = federated_token.provider.jwt_key
439            try:
440                token = decode(
441                    assertion,
442                    _key.public_key(),
443                    algorithms=[_alg],
444                    options={
445                        "verify_aud": False,
446                    },
447                )
448                provider = federated_token.provider
449                self.user = federated_token.user
450            except (PyJWTError, ValueError, TypeError, AttributeError) as exc:
451                LOGGER.warning(
452                    "failed to verify JWT", exc=exc, provider=federated_token.provider.name
453                )
454
455        if token:
456            LOGGER.info("successfully verified JWT with provider", provider=provider.name)
457        return token, provider
458
459    def __post_init_client_credentials_jwt(self, request: HttpRequest):
460        assertion_type = request.POST.get(CLIENT_ASSERTION_TYPE, "")
461        if assertion_type != CLIENT_ASSERTION_TYPE_JWT:
462            LOGGER.warning("Invalid assertion type", assertion_type=assertion_type)
463            raise TokenError("invalid_grant")
464
465        client_secret = request.POST.get("client_secret", None)
466        assertion = request.POST.get(CLIENT_ASSERTION, client_secret)
467        if not assertion:
468            LOGGER.warning("Missing client assertion")
469            raise TokenError("invalid_grant")
470
471        source = provider = None
472
473        token, source = self.__validate_jwt_from_source(assertion)
474        if not token:
475            token, provider = self.__validate_jwt_from_provider(assertion)
476
477        if not token:
478            LOGGER.warning("No token could be verified")
479            raise TokenError("invalid_grant")
480
481        if "exp" in token:
482            exp = datetime.fromtimestamp(token["exp"])
483            # Non-timezone aware check since we assume `exp` is in UTC
484            if datetime.now() >= exp:
485                LOGGER.info("JWT token expired")
486                raise TokenError("invalid_grant")
487
488        app = Application.objects.filter(provider=self.provider).first()
489        if not app or not app.provider:
490            LOGGER.info("client_credentials grant for provider without application")
491            raise TokenError("invalid_grant")
492
493        self.__check_policy_access(app, request, oauth_jwt=token)
494        if not provider:
495            self.__create_user_from_jwt(token, app, source, request)
496
497        method_args = {
498            "jwt": token,
499        }
500        if source:
501            method_args["source"] = source
502        if provider:
503            method_args["provider"] = provider
504        Event.new(
505            action=EventAction.LOGIN,
506            **{
507                PLAN_CONTEXT_METHOD: "jwt",
508                PLAN_CONTEXT_METHOD_ARGS: method_args,
509                PLAN_CONTEXT_APPLICATION: app,
510            },
511        ).from_http(request, user=self.user)
512
513    def __post_init_client_credentials_generated(self, request: HttpRequest):
514        # Authorize user access
515        app = Application.objects.filter(provider=self.provider).first()
516        if not app or not app.provider:
517            raise TokenError("invalid_grant")
518        with audit_ignore():
519            self.user, _ = User.objects.update_or_create(
520                # trim username to ensure the entire username is max 150 chars
521                # (22 chars being the length of the "template")
522                username=f"ak-{self.provider.name[: USERNAME_MAX_LENGTH - 22]}-client_credentials",
523                defaults={
524                    "last_login": timezone.now(),
525                    "name": f"Autogenerated user from application {app.name} (client credentials)",
526                    "path": f"{USER_PATH_SYSTEM_PREFIX}/apps/{app.slug}",
527                    "type": UserTypes.SERVICE_ACCOUNT,
528                },
529            )
530            self.user.attributes[USER_ATTRIBUTE_GENERATED] = True
531            self.user.save()
532        self.__check_policy_access(app, request)
533
534        Event.new(
535            action=EventAction.LOGIN,
536            **{
537                PLAN_CONTEXT_METHOD: "oauth_client_secret",
538                PLAN_CONTEXT_APPLICATION: app,
539            },
540        ).from_http(request, user=self.user)
541
542    def __post_init_device_code(self, request: HttpRequest):
543        device_code = request.POST.get("device_code", "")
544        code = DeviceToken.objects.filter(device_code=device_code, provider=self.provider).first()
545        if not code:
546            raise TokenError("invalid_grant")
547        self.device_code = code
548
549    def __create_user_from_jwt(
550        self, token: dict[str, Any], app: Application, source: OAuthSource, request: HttpRequest
551    ):
552        """Create user from JWT"""
553        with audit_ignore():
554            # Run the JWT payload through the core mapping engine
555            mapped = SourceMapper(source).build_object_properties(
556                User, request=request, info=token, oauth_userinfo=token
557            )
558
559            self.user, created = User.objects.update_or_create(
560                username=mapped.get("username", f"{self.provider.name}-{token.get('sub')}")[
561                    :USERNAME_MAX_LENGTH
562                ],
563                defaults={
564                    "last_login": timezone.now(),
565                    "name": mapped.get(
566                        "name",
567                        f"Autogenerated user from application {app.name} (client credentials JWT)",
568                    ),
569                    "email": mapped.get("email", ""),
570                    "path": source.get_user_path(),
571                    "type": UserTypes.SERVICE_ACCOUNT,
572                    "attributes": mapped.get("attributes", {}),
573                },
574            )
575            self.user.attributes[USER_ATTRIBUTE_GENERATED] = True
576            self.user.save()
577            exp = token.get("exp")
578            if created and exp:
579                self.user.attributes[USER_ATTRIBUTE_EXPIRES] = exp
580                self.user.save()

Token params

TokenParams( client_id: str, client_secret: str, redirect_uri: str, grant_type: str, state: str, scope: set[str], provider: authentik.providers.oauth2.models.OAuth2Provider, authorization_code: authentik.providers.oauth2.models.AuthorizationCode | None = None, refresh_token: authentik.providers.oauth2.models.RefreshToken | None = None, device_code: authentik.providers.oauth2.models.DeviceToken | None = None, user: authentik.core.models.User | None = None, code_verifier: str | None = None, raw_code: dataclasses.InitVar[str] = '', raw_token: dataclasses.InitVar[str] = '', request: dataclasses.InitVar[django.http.request.HttpRequest | None] = None)
client_id: str
client_secret: str
redirect_uri: str
grant_type: str
state: str
scope: set[str]
code_verifier: str | None
raw_code: dataclasses.InitVar[str] = ''
raw_token: dataclasses.InitVar[str] = ''
request: dataclasses.InitVar[django.http.request.HttpRequest | None] = None
@staticmethod
def parse( request: django.http.request.HttpRequest, provider: authentik.providers.oauth2.models.OAuth2Provider, client_id: str, client_secret: str) -> TokenParams:
106    @staticmethod
107    def parse(
108        request: HttpRequest,
109        provider: OAuth2Provider,
110        client_id: str,
111        client_secret: str,
112    ) -> TokenParams:
113        """Parse params for request"""
114        return TokenParams(
115            # Init vars
116            raw_code=request.POST.get("code", ""),
117            raw_token=request.POST.get("refresh_token", ""),
118            request=request,
119            # Regular params
120            provider=provider,
121            client_id=client_id,
122            client_secret=client_secret,
123            redirect_uri=request.POST.get("redirect_uri", ""),
124            grant_type=request.POST.get("grant_type", ""),
125            state=request.POST.get("state", ""),
126            scope=set(request.POST.get("scope", "").split()),
127            # PKCE parameter.
128            code_verifier=request.POST.get("code_verifier"),
129        )

Parse params for request

@method_decorator(csrf_exempt, name='dispatch')
class TokenView(django.views.generic.base.View):
583@method_decorator(csrf_exempt, name="dispatch")
584class TokenView(View):
585    """Generate tokens for clients"""
586
587    provider: OAuth2Provider | None = None
588    params: TokenParams | None = None
589    params_class = TokenParams
590    provider_class = OAuth2Provider
591
592    def dispatch(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse:
593        response = super().dispatch(request, *args, **kwargs)
594        allowed_origins = []
595        if self.provider:
596            allowed_origins = [x.url for x in self.provider.redirect_uris]
597        cors_allow(self.request, response, *allowed_origins)
598        return response
599
600    def options(self, request: HttpRequest) -> HttpResponse:
601        return TokenResponse({})
602
603    def post(self, request: HttpRequest) -> HttpResponse:
604        """Generate tokens for clients"""
605        try:
606            with start_span(
607                op="authentik.providers.oauth2.post.parse",
608            ):
609                client_id, client_secret = extract_client_auth(request)
610                self.provider = self.provider_class.objects.filter(client_id=client_id).first()
611                if not self.provider:
612                    LOGGER.warning("OAuth2Provider does not exist", client_id=client_id)
613                    raise TokenError("invalid_client")
614                self.params = self.params_class.parse(
615                    request, self.provider, client_id, client_secret
616                )
617                CTX_AUTH_VIA.set("oauth_client_secret")
618
619            with start_span(
620                op="authentik.providers.oauth2.post.response",
621            ):
622                if self.params.grant_type == GRANT_TYPE_AUTHORIZATION_CODE:
623                    LOGGER.debug("Converting authorization code to access token")
624                    return TokenResponse(self.create_code_response())
625                if self.params.grant_type == GRANT_TYPE_REFRESH_TOKEN:
626                    LOGGER.debug("Refreshing refresh token")
627                    return TokenResponse(self.create_refresh_response())
628                if self.params.grant_type in [GRANT_TYPE_CLIENT_CREDENTIALS, GRANT_TYPE_PASSWORD]:
629                    LOGGER.debug("Client credentials/password grant")
630                    return TokenResponse(self.create_client_credentials_response())
631                if self.params.grant_type == GRANT_TYPE_DEVICE_CODE:
632                    LOGGER.debug("Device code grant")
633                    return TokenResponse(self.create_device_code_response())
634                raise TokenError("unsupported_grant_type")
635        except (TokenError, DeviceCodeError) as error:
636            return TokenResponse(error.create_dict(request), status=400)
637        except UserAuthError as error:
638            return TokenResponse(error.create_dict(request), status=403)
639
640    def create_code_response(self) -> dict[str, Any]:
641        """See https://datatracker.ietf.org/doc/html/rfc6749#section-4.1"""
642        now = timezone.now()
643        access_token_expiry = now + timedelta_from_string(self.provider.access_token_validity)
644        access_token = AccessToken(
645            provider=self.provider,
646            user=self.params.authorization_code.user,
647            expires=access_token_expiry,
648            # Keep same scopes as previous token
649            scope=self.params.authorization_code.scope,
650            auth_time=self.params.authorization_code.auth_time,
651            session=self.params.authorization_code.session,
652        )
653        access_id_token = IDToken.new(
654            self.provider,
655            access_token,
656            self.request,
657        )
658        access_id_token.nonce = self.params.authorization_code.nonce
659        access_token.id_token = access_id_token
660        access_token.save()
661
662        response = {
663            "access_token": access_token.token,
664            "token_type": TOKEN_TYPE,
665            "scope": " ".join(access_token.scope),
666            "expires_in": int(
667                timedelta_from_string(self.provider.access_token_validity).total_seconds()
668            ),
669            "id_token": access_token.id_token.to_jwt(self.provider),
670        }
671
672        if SCOPE_OFFLINE_ACCESS in self.params.authorization_code.scope:
673            refresh_token_expiry = now + timedelta_from_string(self.provider.refresh_token_validity)
674            refresh_token = RefreshToken(
675                user=self.params.authorization_code.user,
676                scope=self.params.authorization_code.scope,
677                expires=refresh_token_expiry,
678                provider=self.provider,
679                auth_time=self.params.authorization_code.auth_time,
680                session=self.params.authorization_code.session,
681            )
682            id_token = IDToken.new(
683                self.provider,
684                refresh_token,
685                self.request,
686            )
687            id_token.nonce = self.params.authorization_code.nonce
688            id_token.at_hash = access_token.at_hash
689            refresh_token.id_token = id_token
690            refresh_token.save()
691            response["refresh_token"] = refresh_token.token
692
693        # Delete old code
694        self.params.authorization_code.delete()
695        return response
696
697    def create_refresh_response(self) -> dict[str, Any]:
698        """See https://datatracker.ietf.org/doc/html/rfc6749#section-6"""
699        unauthorized_scopes = set(self.params.scope) - set(self.params.refresh_token.scope)
700        if unauthorized_scopes:
701            raise TokenError("invalid_scope")
702        if SCOPE_OFFLINE_ACCESS not in self.params.scope:
703            raise TokenError("invalid_scope")
704        now = timezone.now()
705        access_token_expiry = now + timedelta_from_string(self.provider.access_token_validity)
706        access_token = AccessToken(
707            provider=self.provider,
708            user=self.params.refresh_token.user,
709            expires=access_token_expiry,
710            # Keep same scopes as previous token
711            scope=self.params.refresh_token.scope,
712            auth_time=self.params.refresh_token.auth_time,
713            session=self.params.refresh_token.session,
714        )
715        access_token.id_token = IDToken.new(
716            self.provider,
717            access_token,
718            self.request,
719        )
720        access_token.save()
721
722        res = {
723            "access_token": access_token.token,
724            "token_type": TOKEN_TYPE,
725            "scope": " ".join(access_token.scope),
726            "expires_in": int(
727                timedelta_from_string(self.provider.access_token_validity).total_seconds()
728            ),
729            "id_token": access_token.id_token.to_jwt(self.provider),
730        }
731
732        refresh_token_threshold = timedelta_from_string(self.provider.refresh_token_threshold)
733        if (
734            refresh_token_threshold.total_seconds() == 0
735            or (self.params.refresh_token.expires - now) < refresh_token_threshold
736        ):
737            refresh_token_expiry = now + timedelta_from_string(self.provider.refresh_token_validity)
738            refresh_token = RefreshToken(
739                user=self.params.refresh_token.user,
740                scope=self.params.refresh_token.scope,
741                expires=refresh_token_expiry,
742                provider=self.provider,
743                auth_time=self.params.refresh_token.auth_time,
744                session=self.params.refresh_token.session,
745            )
746            id_token = IDToken.new(
747                self.provider,
748                refresh_token,
749                self.request,
750            )
751            id_token.nonce = self.params.refresh_token.id_token.nonce
752            id_token.at_hash = access_token.at_hash
753            refresh_token.id_token = id_token
754            refresh_token.save()
755
756            # Mark old token as revoked
757            self.params.refresh_token.revoked = True
758            self.params.refresh_token.save()
759            res["refresh_token"] = refresh_token.token
760
761        return res
762
763    def create_client_credentials_response(self) -> dict[str, Any]:
764        """See https://datatracker.ietf.org/doc/html/rfc6749#section-4.4"""
765        now = timezone.now()
766        access_token_expiry = now + timedelta_from_string(self.provider.access_token_validity)
767        access_token = AccessToken(
768            provider=self.provider,
769            user=self.params.user,
770            expires=access_token_expiry,
771            scope=self.params.scope,
772            auth_time=now,
773        )
774        access_token.id_token = IDToken.new(
775            self.provider,
776            access_token,
777            self.request,
778        )
779        access_token.save()
780        return {
781            "access_token": access_token.token,
782            "token_type": TOKEN_TYPE,
783            "scope": " ".join(access_token.scope),
784            "expires_in": int(
785                timedelta_from_string(self.provider.access_token_validity).total_seconds()
786            ),
787            "id_token": access_token.id_token.to_jwt(self.provider),
788        }
789
790    def create_device_code_response(self) -> dict[str, Any]:
791        """See https://datatracker.ietf.org/doc/html/rfc8628"""
792        if not self.params.device_code.user:
793            raise DeviceCodeError("authorization_pending")
794        now = timezone.now()
795        access_token_expiry = now + timedelta_from_string(self.provider.access_token_validity)
796        auth_event = get_login_event(self.params.device_code.session)
797        access_token = AccessToken(
798            provider=self.provider,
799            user=self.params.device_code.user,
800            expires=access_token_expiry,
801            scope=self.params.device_code.scope,
802            auth_time=auth_event.created if auth_event else now,
803            session=self.params.device_code.session,
804        )
805        access_token.id_token = IDToken.new(
806            self.provider,
807            access_token,
808            self.request,
809        )
810        access_token.save()
811
812        response = {
813            "access_token": access_token.token,
814            "token_type": TOKEN_TYPE,
815            "scope": " ".join(access_token.scope),
816            "expires_in": int(
817                timedelta_from_string(self.provider.access_token_validity).total_seconds()
818            ),
819            "id_token": access_token.id_token.to_jwt(self.provider),
820        }
821
822        if SCOPE_OFFLINE_ACCESS in self.params.device_code.scope:
823            refresh_token_expiry = now + timedelta_from_string(self.provider.refresh_token_validity)
824            refresh_token = RefreshToken(
825                user=self.params.device_code.user,
826                scope=self.params.device_code.scope,
827                expires=refresh_token_expiry,
828                provider=self.provider,
829                auth_time=auth_event.created if auth_event else now,
830            )
831            id_token = IDToken.new(
832                self.provider,
833                refresh_token,
834                self.request,
835            )
836            id_token.at_hash = access_token.at_hash
837            refresh_token.id_token = id_token
838            refresh_token.save()
839            response["refresh_token"] = refresh_token.token
840
841        # Delete device code
842        self.params.device_code.delete()
843        return response

Generate tokens for clients

params: TokenParams | None = None
params_class = <class 'TokenParams'>
def dispatch( self, request: django.http.request.HttpRequest, *args: Any, **kwargs: Any) -> django.http.response.HttpResponse:
592    def dispatch(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse:
593        response = super().dispatch(request, *args, **kwargs)
594        allowed_origins = []
595        if self.provider:
596            allowed_origins = [x.url for x in self.provider.redirect_uris]
597        cors_allow(self.request, response, *allowed_origins)
598        return response
def options( self, request: django.http.request.HttpRequest) -> django.http.response.HttpResponse:
600    def options(self, request: HttpRequest) -> HttpResponse:
601        return TokenResponse({})

Handle responding to requests for the OPTIONS HTTP verb.

def post( self, request: django.http.request.HttpRequest) -> django.http.response.HttpResponse:
603    def post(self, request: HttpRequest) -> HttpResponse:
604        """Generate tokens for clients"""
605        try:
606            with start_span(
607                op="authentik.providers.oauth2.post.parse",
608            ):
609                client_id, client_secret = extract_client_auth(request)
610                self.provider = self.provider_class.objects.filter(client_id=client_id).first()
611                if not self.provider:
612                    LOGGER.warning("OAuth2Provider does not exist", client_id=client_id)
613                    raise TokenError("invalid_client")
614                self.params = self.params_class.parse(
615                    request, self.provider, client_id, client_secret
616                )
617                CTX_AUTH_VIA.set("oauth_client_secret")
618
619            with start_span(
620                op="authentik.providers.oauth2.post.response",
621            ):
622                if self.params.grant_type == GRANT_TYPE_AUTHORIZATION_CODE:
623                    LOGGER.debug("Converting authorization code to access token")
624                    return TokenResponse(self.create_code_response())
625                if self.params.grant_type == GRANT_TYPE_REFRESH_TOKEN:
626                    LOGGER.debug("Refreshing refresh token")
627                    return TokenResponse(self.create_refresh_response())
628                if self.params.grant_type in [GRANT_TYPE_CLIENT_CREDENTIALS, GRANT_TYPE_PASSWORD]:
629                    LOGGER.debug("Client credentials/password grant")
630                    return TokenResponse(self.create_client_credentials_response())
631                if self.params.grant_type == GRANT_TYPE_DEVICE_CODE:
632                    LOGGER.debug("Device code grant")
633                    return TokenResponse(self.create_device_code_response())
634                raise TokenError("unsupported_grant_type")
635        except (TokenError, DeviceCodeError) as error:
636            return TokenResponse(error.create_dict(request), status=400)
637        except UserAuthError as error:
638            return TokenResponse(error.create_dict(request), status=403)

Generate tokens for clients

def create_code_response(self) -> dict[str, typing.Any]:
640    def create_code_response(self) -> dict[str, Any]:
641        """See https://datatracker.ietf.org/doc/html/rfc6749#section-4.1"""
642        now = timezone.now()
643        access_token_expiry = now + timedelta_from_string(self.provider.access_token_validity)
644        access_token = AccessToken(
645            provider=self.provider,
646            user=self.params.authorization_code.user,
647            expires=access_token_expiry,
648            # Keep same scopes as previous token
649            scope=self.params.authorization_code.scope,
650            auth_time=self.params.authorization_code.auth_time,
651            session=self.params.authorization_code.session,
652        )
653        access_id_token = IDToken.new(
654            self.provider,
655            access_token,
656            self.request,
657        )
658        access_id_token.nonce = self.params.authorization_code.nonce
659        access_token.id_token = access_id_token
660        access_token.save()
661
662        response = {
663            "access_token": access_token.token,
664            "token_type": TOKEN_TYPE,
665            "scope": " ".join(access_token.scope),
666            "expires_in": int(
667                timedelta_from_string(self.provider.access_token_validity).total_seconds()
668            ),
669            "id_token": access_token.id_token.to_jwt(self.provider),
670        }
671
672        if SCOPE_OFFLINE_ACCESS in self.params.authorization_code.scope:
673            refresh_token_expiry = now + timedelta_from_string(self.provider.refresh_token_validity)
674            refresh_token = RefreshToken(
675                user=self.params.authorization_code.user,
676                scope=self.params.authorization_code.scope,
677                expires=refresh_token_expiry,
678                provider=self.provider,
679                auth_time=self.params.authorization_code.auth_time,
680                session=self.params.authorization_code.session,
681            )
682            id_token = IDToken.new(
683                self.provider,
684                refresh_token,
685                self.request,
686            )
687            id_token.nonce = self.params.authorization_code.nonce
688            id_token.at_hash = access_token.at_hash
689            refresh_token.id_token = id_token
690            refresh_token.save()
691            response["refresh_token"] = refresh_token.token
692
693        # Delete old code
694        self.params.authorization_code.delete()
695        return response
def create_refresh_response(self) -> dict[str, typing.Any]:
697    def create_refresh_response(self) -> dict[str, Any]:
698        """See https://datatracker.ietf.org/doc/html/rfc6749#section-6"""
699        unauthorized_scopes = set(self.params.scope) - set(self.params.refresh_token.scope)
700        if unauthorized_scopes:
701            raise TokenError("invalid_scope")
702        if SCOPE_OFFLINE_ACCESS not in self.params.scope:
703            raise TokenError("invalid_scope")
704        now = timezone.now()
705        access_token_expiry = now + timedelta_from_string(self.provider.access_token_validity)
706        access_token = AccessToken(
707            provider=self.provider,
708            user=self.params.refresh_token.user,
709            expires=access_token_expiry,
710            # Keep same scopes as previous token
711            scope=self.params.refresh_token.scope,
712            auth_time=self.params.refresh_token.auth_time,
713            session=self.params.refresh_token.session,
714        )
715        access_token.id_token = IDToken.new(
716            self.provider,
717            access_token,
718            self.request,
719        )
720        access_token.save()
721
722        res = {
723            "access_token": access_token.token,
724            "token_type": TOKEN_TYPE,
725            "scope": " ".join(access_token.scope),
726            "expires_in": int(
727                timedelta_from_string(self.provider.access_token_validity).total_seconds()
728            ),
729            "id_token": access_token.id_token.to_jwt(self.provider),
730        }
731
732        refresh_token_threshold = timedelta_from_string(self.provider.refresh_token_threshold)
733        if (
734            refresh_token_threshold.total_seconds() == 0
735            or (self.params.refresh_token.expires - now) < refresh_token_threshold
736        ):
737            refresh_token_expiry = now + timedelta_from_string(self.provider.refresh_token_validity)
738            refresh_token = RefreshToken(
739                user=self.params.refresh_token.user,
740                scope=self.params.refresh_token.scope,
741                expires=refresh_token_expiry,
742                provider=self.provider,
743                auth_time=self.params.refresh_token.auth_time,
744                session=self.params.refresh_token.session,
745            )
746            id_token = IDToken.new(
747                self.provider,
748                refresh_token,
749                self.request,
750            )
751            id_token.nonce = self.params.refresh_token.id_token.nonce
752            id_token.at_hash = access_token.at_hash
753            refresh_token.id_token = id_token
754            refresh_token.save()
755
756            # Mark old token as revoked
757            self.params.refresh_token.revoked = True
758            self.params.refresh_token.save()
759            res["refresh_token"] = refresh_token.token
760
761        return res
def create_client_credentials_response(self) -> dict[str, typing.Any]:
763    def create_client_credentials_response(self) -> dict[str, Any]:
764        """See https://datatracker.ietf.org/doc/html/rfc6749#section-4.4"""
765        now = timezone.now()
766        access_token_expiry = now + timedelta_from_string(self.provider.access_token_validity)
767        access_token = AccessToken(
768            provider=self.provider,
769            user=self.params.user,
770            expires=access_token_expiry,
771            scope=self.params.scope,
772            auth_time=now,
773        )
774        access_token.id_token = IDToken.new(
775            self.provider,
776            access_token,
777            self.request,
778        )
779        access_token.save()
780        return {
781            "access_token": access_token.token,
782            "token_type": TOKEN_TYPE,
783            "scope": " ".join(access_token.scope),
784            "expires_in": int(
785                timedelta_from_string(self.provider.access_token_validity).total_seconds()
786            ),
787            "id_token": access_token.id_token.to_jwt(self.provider),
788        }
def create_device_code_response(self) -> dict[str, typing.Any]:
790    def create_device_code_response(self) -> dict[str, Any]:
791        """See https://datatracker.ietf.org/doc/html/rfc8628"""
792        if not self.params.device_code.user:
793            raise DeviceCodeError("authorization_pending")
794        now = timezone.now()
795        access_token_expiry = now + timedelta_from_string(self.provider.access_token_validity)
796        auth_event = get_login_event(self.params.device_code.session)
797        access_token = AccessToken(
798            provider=self.provider,
799            user=self.params.device_code.user,
800            expires=access_token_expiry,
801            scope=self.params.device_code.scope,
802            auth_time=auth_event.created if auth_event else now,
803            session=self.params.device_code.session,
804        )
805        access_token.id_token = IDToken.new(
806            self.provider,
807            access_token,
808            self.request,
809        )
810        access_token.save()
811
812        response = {
813            "access_token": access_token.token,
814            "token_type": TOKEN_TYPE,
815            "scope": " ".join(access_token.scope),
816            "expires_in": int(
817                timedelta_from_string(self.provider.access_token_validity).total_seconds()
818            ),
819            "id_token": access_token.id_token.to_jwt(self.provider),
820        }
821
822        if SCOPE_OFFLINE_ACCESS in self.params.device_code.scope:
823            refresh_token_expiry = now + timedelta_from_string(self.provider.refresh_token_validity)
824            refresh_token = RefreshToken(
825                user=self.params.device_code.user,
826                scope=self.params.device_code.scope,
827                expires=refresh_token_expiry,
828                provider=self.provider,
829                auth_time=auth_event.created if auth_event else now,
830            )
831            id_token = IDToken.new(
832                self.provider,
833                refresh_token,
834                self.request,
835            )
836            id_token.at_hash = access_token.at_hash
837            refresh_token.id_token = id_token
838            refresh_token.save()
839            response["refresh_token"] = refresh_token.token
840
841        # Delete device code
842        self.params.device_code.delete()
843        return response