authentik.providers.oauth2.views.token

authentik OAuth2 Token views

  1"""authentik OAuth2 Token views"""
  2
  3from base64 import b64decode
  4from binascii import Error
  5from dataclasses import InitVar, dataclass
  6from datetime import datetime
  7from hmac import compare_digest
  8from re import error as RegexError
  9from re import fullmatch
 10from typing import Any
 11from urllib.parse import urlparse
 12
 13from django.http import HttpRequest, HttpResponse
 14from django.utils import timezone
 15from django.utils.decorators import method_decorator
 16from django.views import View
 17from django.views.decorators.csrf import csrf_exempt
 18from guardian.shortcuts import get_anonymous_user
 19from jwt import PyJWK, PyJWT, PyJWTError, decode
 20from sentry_sdk import start_span
 21from structlog.stdlib import get_logger
 22
 23from authentik.common.oauth.constants import (
 24    CLIENT_ASSERTION,
 25    CLIENT_ASSERTION_TYPE,
 26    CLIENT_ASSERTION_TYPE_JWT,
 27    GRANT_TYPE_AUTHORIZATION_CODE,
 28    GRANT_TYPE_CLIENT_CREDENTIALS,
 29    GRANT_TYPE_DEVICE_CODE,
 30    GRANT_TYPE_PASSWORD,
 31    GRANT_TYPE_REFRESH_TOKEN,
 32    PKCE_METHOD_S256,
 33    SCOPE_OFFLINE_ACCESS,
 34    TOKEN_TYPE,
 35)
 36from authentik.core.apps import AppAccessWithoutBindings
 37from authentik.core.middleware import CTX_AUTH_VIA
 38from authentik.core.models import (
 39    USER_ATTRIBUTE_EXPIRES,
 40    USER_ATTRIBUTE_GENERATED,
 41    USER_PATH_SYSTEM_PREFIX,
 42    USERNAME_MAX_LENGTH,
 43    Application,
 44    Token,
 45    TokenIntents,
 46    User,
 47    UserTypes,
 48)
 49from authentik.core.sources.mapper import SourceMapper
 50from authentik.events.middleware import audit_ignore
 51from authentik.events.models import Event, EventAction
 52from authentik.events.signals import get_login_event
 53from authentik.flows.planner import PLAN_CONTEXT_APPLICATION
 54from authentik.lib.utils.time import timedelta_from_string
 55from authentik.policies.engine import PolicyEngine
 56from authentik.providers.oauth2.errors import DeviceCodeError, TokenError, UserAuthError
 57from authentik.providers.oauth2.id_token import IDToken
 58from authentik.providers.oauth2.models import (
 59    AccessToken,
 60    AuthorizationCode,
 61    ClientTypes,
 62    DeviceToken,
 63    OAuth2Provider,
 64    RedirectURIMatchingMode,
 65    RefreshToken,
 66    ScopeMapping,
 67)
 68from authentik.providers.oauth2.utils import (
 69    TokenResponse,
 70    cors_allow,
 71    extract_client_auth,
 72    pkce_s256_challenge,
 73)
 74from authentik.providers.oauth2.views.authorize import FORBIDDEN_URI_SCHEMES
 75from authentik.sources.oauth.models import OAuthSource
 76from authentik.stages.password.stage import PLAN_CONTEXT_METHOD, PLAN_CONTEXT_METHOD_ARGS
 77
 78LOGGER = get_logger()
 79
 80
 81@dataclass(slots=True)
 82class TokenParams:
 83    """Token params"""
 84
 85    client_id: str
 86    client_secret: str
 87    redirect_uri: str
 88    grant_type: str
 89    state: str
 90    scope: set[str]
 91
 92    provider: OAuth2Provider
 93
 94    authorization_code: AuthorizationCode | None = None
 95    refresh_token: RefreshToken | None = None
 96    device_code: DeviceToken | None = None
 97    user: User | None = None
 98
 99    code_verifier: str | None = None
100
101    raw_code: InitVar[str] = ""
102    raw_token: InitVar[str] = ""
103    request: InitVar[HttpRequest | None] = None
104
105    @staticmethod
106    def parse(
107        request: HttpRequest,
108        provider: OAuth2Provider,
109        client_id: str,
110        client_secret: str,
111    ) -> TokenParams:
112        """Parse params for request"""
113        return TokenParams(
114            # Init vars
115            raw_code=request.POST.get("code", ""),
116            raw_token=request.POST.get("refresh_token", ""),
117            request=request,
118            # Regular params
119            provider=provider,
120            client_id=client_id,
121            client_secret=client_secret,
122            redirect_uri=request.POST.get("redirect_uri", ""),
123            grant_type=request.POST.get("grant_type", ""),
124            state=request.POST.get("state", ""),
125            scope=set(request.POST.get("scope", "").split()),
126            # PKCE parameter.
127            code_verifier=request.POST.get("code_verifier"),
128        )
129
130    def __check_scopes(self):
131        allowed_scope_names = set(
132            ScopeMapping.objects.filter(provider__in=[self.provider]).values_list(
133                "scope_name", flat=True
134            )
135        )
136        scopes_to_check = self.scope
137        if not scopes_to_check.issubset(allowed_scope_names):
138            LOGGER.info(
139                "Application requested scopes not configured, setting to overlap",
140                scope_allowed=allowed_scope_names,
141                scope_given=self.scope,
142            )
143            self.scope = self.scope.intersection(allowed_scope_names)
144
145    def __check_policy_access(self, app: Application, request: HttpRequest, **kwargs):
146        with start_span(
147            op="authentik.providers.oauth2.token.policy",
148        ):
149            user = self.user if self.user else get_anonymous_user()
150            engine = PolicyEngine(app, user, request)
151            engine.empty_result = AppAccessWithoutBindings.get()
152            # Don't cache as for client_credentials flows the user will not be set
153            # so we'll get generic cache results
154            engine.use_cache = False
155            engine.request.context["oauth_scopes"] = self.scope
156            engine.request.context["oauth_grant_type"] = self.grant_type
157            engine.request.context["oauth_code_verifier"] = self.code_verifier
158            engine.request.context.update(kwargs)
159            engine.build()
160            result = engine.result
161            if not result.passing:
162                LOGGER.info(
163                    "User not authenticated for application", user=self.user, app_slug=app.slug
164                )
165                raise TokenError("invalid_grant")
166
167    def __post_init__(self, raw_code: str, raw_token: str, request: HttpRequest):
168        if self.grant_type in [GRANT_TYPE_AUTHORIZATION_CODE, GRANT_TYPE_REFRESH_TOKEN]:
169            if self.provider.client_type == ClientTypes.CONFIDENTIAL and not compare_digest(
170                self.provider.client_secret, self.client_secret
171            ):
172                LOGGER.warning(
173                    "Invalid client secret",
174                    client_id=self.provider.client_id,
175                )
176                raise TokenError("invalid_client")
177        self.__check_scopes()
178        if self.grant_type == GRANT_TYPE_AUTHORIZATION_CODE:
179            with start_span(
180                op="authentik.providers.oauth2.post.parse.code",
181            ):
182                self.__post_init_code(raw_code, request)
183        elif self.grant_type == GRANT_TYPE_REFRESH_TOKEN:
184            with start_span(
185                op="authentik.providers.oauth2.post.parse.refresh",
186            ):
187                self.__post_init_refresh(raw_token, request)
188        elif self.grant_type in [GRANT_TYPE_CLIENT_CREDENTIALS, GRANT_TYPE_PASSWORD]:
189            with start_span(
190                op="authentik.providers.oauth2.post.parse.client_credentials",
191            ):
192                self.__post_init_client_credentials(request)
193        elif self.grant_type == GRANT_TYPE_DEVICE_CODE:
194            with start_span(
195                op="authentik.providers.oauth2.post.parse.device_code",
196            ):
197                self.__post_init_device_code(request)
198        else:
199            LOGGER.warning("Invalid grant type", grant_type=self.grant_type)
200            raise TokenError("unsupported_grant_type")
201
202    def __post_init_code(self, raw_code: str, request: HttpRequest):
203        if not raw_code:
204            LOGGER.warning("Missing authorization code")
205            raise TokenError("invalid_grant")
206
207        self.__check_redirect_uri(request)
208
209        self.authorization_code = AuthorizationCode.objects.filter(code=raw_code).first()
210        if not self.authorization_code:
211            LOGGER.warning("Code does not exist", code=raw_code)
212            raise TokenError("invalid_grant")
213
214        if self.authorization_code.is_expired:
215            LOGGER.warning(
216                "Code is expired",
217                token=raw_code,
218            )
219            raise TokenError("invalid_grant")
220
221        if self.authorization_code.provider != self.provider or self.authorization_code.is_expired:
222            LOGGER.warning("Invalid code: invalid client or code has expired")
223            raise TokenError("invalid_grant")
224
225        # Validate PKCE parameters.
226        if self.authorization_code.code_challenge:
227            # Authorization code had PKCE but we didn't get one
228            if not self.code_verifier:
229                raise TokenError("invalid_grant")
230            if self.authorization_code.code_challenge_method == PKCE_METHOD_S256:
231                new_code_challenge = pkce_s256_challenge(self.code_verifier)
232            else:
233                new_code_challenge = self.code_verifier
234
235            if new_code_challenge != self.authorization_code.code_challenge:
236                LOGGER.warning("Code challenge not matching")
237                raise TokenError("invalid_grant")
238        # Token request had a code_verifier but code did not have a code challenge
239        # Prevent downgrade
240        if not self.authorization_code.code_challenge and self.code_verifier:
241            raise TokenError("invalid_grant")
242
243    def __check_redirect_uri(self, request: HttpRequest):
244        allowed_redirect_urls = self.provider.redirect_uris
245        # At this point, no provider should have a blank redirect_uri, in case they do
246        # this will check an empty array and raise an error
247
248        match_found = False
249        for allowed in allowed_redirect_urls:
250            if allowed.matching_mode == RedirectURIMatchingMode.STRICT:
251                if self.redirect_uri == allowed.url:
252                    match_found = True
253                    break
254            if allowed.matching_mode == RedirectURIMatchingMode.REGEX:
255                try:
256                    if fullmatch(allowed.url, self.redirect_uri):
257                        match_found = True
258                        break
259                except RegexError as exc:
260                    LOGGER.warning(
261                        "Failed to parse regular expression",
262                        exc=exc,
263                        url=allowed.url,
264                        provider=self.provider,
265                    )
266                    Event.new(
267                        EventAction.CONFIGURATION_ERROR,
268                        message="Invalid redirect_uri configured",
269                        provider=self.provider,
270                    ).from_http(request)
271        if not match_found:
272            Event.new(
273                EventAction.CONFIGURATION_ERROR,
274                message="Invalid redirect URI used by provider",
275                provider=self.provider,
276                redirect_uri=self.redirect_uri,
277                expected=allowed_redirect_urls,
278            ).from_http(request)
279            raise TokenError("invalid_client")
280
281        # Check against forbidden schemes
282        if urlparse(self.redirect_uri).scheme in FORBIDDEN_URI_SCHEMES:
283            raise TokenError("invalid_request")
284
285    def __post_init_refresh(self, raw_token: str, request: HttpRequest):
286        if not raw_token:
287            LOGGER.warning("Missing refresh token")
288            raise TokenError("invalid_grant")
289
290        self.refresh_token = RefreshToken.objects.filter(
291            token=raw_token, provider=self.provider
292        ).first()
293        if not self.refresh_token:
294            LOGGER.warning(
295                "Refresh token does not exist",
296                token=raw_token,
297            )
298            raise TokenError("invalid_grant")
299        if self.refresh_token.is_expired:
300            LOGGER.warning(
301                "Refresh token is expired",
302                token=raw_token,
303            )
304            raise TokenError("invalid_grant")
305        # https://datatracker.ietf.org/doc/html/rfc6749#section-6
306        # Fallback to original token's scopes when none are given
307        if not self.scope:
308            self.scope = self.refresh_token.scope
309        if self.refresh_token.revoked:
310            LOGGER.warning("Refresh token is revoked", token=raw_token)
311            Event.new(
312                action=EventAction.SUSPICIOUS_REQUEST,
313                message="Revoked refresh token was used",
314                token=self.refresh_token,
315                provider=self.refresh_token.provider,
316            ).from_http(request, user=self.refresh_token.user)
317            raise TokenError("invalid_grant")
318
319    def __post_init_client_credentials(self, request: HttpRequest):
320        # client_credentials flow with client assertion
321        if request.POST.get(CLIENT_ASSERTION_TYPE, "") != "":
322            return self.__post_init_client_credentials_jwt(request)
323        # authentik-custom-ish client credentials flow
324        if request.POST.get("username", "") != "":
325            return self.__post_init_client_credentials_creds(
326                request, request.POST.get("username"), request.POST.get("password")
327            )
328        # Standard method which creates an automatic user
329        if self.client_secret == self.provider.client_secret:
330            return self.__post_init_client_credentials_generated(request)
331        # Standard workaround method which stores username:password
332        # as client_secret
333        try:
334            user, _, password = b64decode(self.client_secret).decode("utf-8").partition(":")
335            return self.__post_init_client_credentials_creds(request, user, password)
336        except ValueError, Error:
337            raise TokenError("invalid_grant") from None
338
339    def __post_init_client_credentials_creds(
340        self, request: HttpRequest, username: str, password: str
341    ):
342        # Authenticate user based on credentials
343        user = User.objects.filter(username=username, is_active=True).first()
344        if not user:
345            raise TokenError("invalid_grant")
346        token: Token = Token.objects.filter(
347            key=password, intent=TokenIntents.INTENT_APP_PASSWORD, user=user
348        ).first()
349        if not token or token.user.uid != user.uid:
350            raise TokenError("invalid_grant")
351        self.user = user
352        # Authorize user access
353        app = Application.objects.filter(provider=self.provider).first()
354        if not app or not app.provider:
355            raise TokenError("invalid_grant")
356        self.__check_policy_access(app, request)
357
358        Event.new(
359            action=EventAction.LOGIN,
360            **{
361                PLAN_CONTEXT_METHOD: "token",
362                PLAN_CONTEXT_METHOD_ARGS: {
363                    "identifier": token.identifier,
364                },
365                PLAN_CONTEXT_APPLICATION: app,
366            },
367        ).from_http(request, user=user)
368
369    def __validate_jwt_from_source(
370        self, assertion: str
371    ) -> tuple[dict, OAuthSource] | tuple[None, None]:
372        # Fully decode the JWT without verifying the signature, so we can get access to
373        # the header.
374        # Get the Key ID from the header, and use that to optimize our source query to only find
375        # sources that have a JWK for that Key ID
376        # The Key ID doesn't have a fixed format, but must match between an issued JWT
377        # and whatever is returned by the JWKS endpoint
378        try:
379            decode_unvalidated = PyJWT().decode_complete(
380                assertion, options={"verify_signature": False}
381            )
382        except (PyJWTError, ValueError, TypeError, AttributeError) as exc:
383            LOGGER.warning("failed to parse JWT for kid lookup", exc=exc)
384            raise TokenError("invalid_grant") from None
385        expected_kid = decode_unvalidated["header"].get("kid")
386        fallback_alg = decode_unvalidated["header"].get("alg")
387        token = source = None
388        if not expected_kid or not fallback_alg:
389            return None, None
390        for source in self.provider.jwt_federation_sources.filter(
391            oidc_jwks__keys__contains=[{"kid": expected_kid}]
392        ):
393            LOGGER.debug("verifying JWT with source", source=source.slug)
394            keys = source.oidc_jwks.get("keys", [])
395            for key in keys:
396                if key.get("kid") and key.get("kid") != expected_kid:
397                    continue
398                LOGGER.debug("verifying JWT with key", source=source.slug, key=key.get("kid"))
399                try:
400                    parsed_key = PyJWK.from_dict(key).key
401                    token = decode(
402                        assertion,
403                        parsed_key,
404                        algorithms=[key.get("alg")] if "alg" in key else [fallback_alg],
405                        options={
406                            "verify_aud": False,
407                        },
408                    )
409                # AttributeError is raised when the configured JWK is a private key
410                # and not a public key
411                except (PyJWTError, ValueError, TypeError, AttributeError) as exc:
412                    LOGGER.warning("failed to verify JWT", exc=exc, source=source.slug)
413        if token:
414            LOGGER.info("successfully verified JWT with source", source=source.slug)
415        return token, source
416
417    def __validate_jwt_from_provider(
418        self, assertion: str
419    ) -> tuple[dict, OAuth2Provider] | tuple[None, None]:
420        token = provider = _key = None
421        federated_token = AccessToken.objects.filter(
422            token=assertion, provider__in=self.provider.jwt_federation_providers.all()
423        ).first()
424        if federated_token:
425            _key, _alg = federated_token.provider.jwt_key
426            try:
427                token = decode(
428                    assertion,
429                    _key.public_key(),
430                    algorithms=[_alg],
431                    options={
432                        "verify_aud": False,
433                    },
434                )
435                provider = federated_token.provider
436                self.user = federated_token.user
437            except (PyJWTError, ValueError, TypeError, AttributeError) as exc:
438                LOGGER.warning(
439                    "failed to verify JWT", exc=exc, provider=federated_token.provider.name
440                )
441
442        if token:
443            LOGGER.info("successfully verified JWT with provider", provider=provider.name)
444        return token, provider
445
446    def __post_init_client_credentials_jwt(self, request: HttpRequest):
447        assertion_type = request.POST.get(CLIENT_ASSERTION_TYPE, "")
448        if assertion_type != CLIENT_ASSERTION_TYPE_JWT:
449            LOGGER.warning("Invalid assertion type", assertion_type=assertion_type)
450            raise TokenError("invalid_grant")
451
452        client_secret = request.POST.get("client_secret", None)
453        assertion = request.POST.get(CLIENT_ASSERTION, client_secret)
454        if not assertion:
455            LOGGER.warning("Missing client assertion")
456            raise TokenError("invalid_grant")
457
458        source = provider = None
459
460        token, source = self.__validate_jwt_from_source(assertion)
461        if not token:
462            token, provider = self.__validate_jwt_from_provider(assertion)
463
464        if not token:
465            LOGGER.warning("No token could be verified")
466            raise TokenError("invalid_grant")
467
468        if "exp" in token:
469            exp = datetime.fromtimestamp(token["exp"])
470            # Non-timezone aware check since we assume `exp` is in UTC
471            if datetime.now() >= exp:
472                LOGGER.info("JWT token expired")
473                raise TokenError("invalid_grant")
474
475        app = Application.objects.filter(provider=self.provider).first()
476        if not app or not app.provider:
477            LOGGER.info("client_credentials grant for provider without application")
478            raise TokenError("invalid_grant")
479
480        self.__check_policy_access(app, request, oauth_jwt=token)
481        if not provider:
482            self.__create_user_from_jwt(token, app, source, request)
483
484        method_args = {
485            "jwt": token,
486        }
487        if source:
488            method_args["source"] = source
489        if provider:
490            method_args["provider"] = provider
491        Event.new(
492            action=EventAction.LOGIN,
493            **{
494                PLAN_CONTEXT_METHOD: "jwt",
495                PLAN_CONTEXT_METHOD_ARGS: method_args,
496                PLAN_CONTEXT_APPLICATION: app,
497            },
498        ).from_http(request, user=self.user)
499
500    def __post_init_client_credentials_generated(self, request: HttpRequest):
501        # Authorize user access
502        app = Application.objects.filter(provider=self.provider).first()
503        if not app or not app.provider:
504            raise TokenError("invalid_grant")
505        with audit_ignore():
506            self.user, _ = User.objects.update_or_create(
507                # trim username to ensure the entire username is max 150 chars
508                # (22 chars being the length of the "template")
509                username=f"ak-{self.provider.name[: USERNAME_MAX_LENGTH - 22]}-client_credentials",
510                defaults={
511                    "last_login": timezone.now(),
512                    "name": f"Autogenerated user from application {app.name} (client credentials)",
513                    "path": f"{USER_PATH_SYSTEM_PREFIX}/apps/{app.slug}",
514                    "type": UserTypes.SERVICE_ACCOUNT,
515                },
516            )
517            self.user.attributes[USER_ATTRIBUTE_GENERATED] = True
518            self.user.save()
519        self.__check_policy_access(app, request)
520
521        Event.new(
522            action=EventAction.LOGIN,
523            **{
524                PLAN_CONTEXT_METHOD: "oauth_client_secret",
525                PLAN_CONTEXT_APPLICATION: app,
526            },
527        ).from_http(request, user=self.user)
528
529    def __post_init_device_code(self, request: HttpRequest):
530        device_code = request.POST.get("device_code", "")
531        code = DeviceToken.objects.filter(device_code=device_code, provider=self.provider).first()
532        if not code:
533            raise TokenError("invalid_grant")
534        self.device_code = code
535
536    def __create_user_from_jwt(
537        self, token: dict[str, Any], app: Application, source: OAuthSource, request: HttpRequest
538    ):
539        """Create user from JWT"""
540        with audit_ignore():
541            # Run the JWT payload through the core mapping engine
542            mapped = SourceMapper(source).build_object_properties(
543                User, request=request, info=token, oauth_userinfo=token
544            )
545
546            self.user, created = User.objects.update_or_create(
547                username=mapped.get("username", f"{self.provider.name}-{token.get('sub')}")[
548                    :USERNAME_MAX_LENGTH
549                ],
550                defaults={
551                    "last_login": timezone.now(),
552                    "name": mapped.get(
553                        "name",
554                        f"Autogenerated user from application {app.name} (client credentials JWT)",
555                    ),
556                    "email": mapped.get("email", ""),
557                    "path": source.get_user_path(),
558                    "type": UserTypes.SERVICE_ACCOUNT,
559                    "attributes": mapped.get("attributes", {}),
560                },
561            )
562            self.user.attributes[USER_ATTRIBUTE_GENERATED] = True
563            self.user.save()
564            exp = token.get("exp")
565            if created and exp:
566                self.user.attributes[USER_ATTRIBUTE_EXPIRES] = exp
567                self.user.save()
568
569
570@method_decorator(csrf_exempt, name="dispatch")
571class TokenView(View):
572    """Generate tokens for clients"""
573
574    provider: OAuth2Provider | None = None
575    params: TokenParams | None = None
576    params_class = TokenParams
577    provider_class = OAuth2Provider
578
579    def dispatch(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse:
580        response = super().dispatch(request, *args, **kwargs)
581        allowed_origins = []
582        if self.provider:
583            allowed_origins = [x.url for x in self.provider.redirect_uris]
584        cors_allow(self.request, response, *allowed_origins)
585        return response
586
587    def options(self, request: HttpRequest) -> HttpResponse:
588        return TokenResponse({})
589
590    def post(self, request: HttpRequest) -> HttpResponse:
591        """Generate tokens for clients"""
592        try:
593            with start_span(
594                op="authentik.providers.oauth2.post.parse",
595            ):
596                client_id, client_secret = extract_client_auth(request)
597                self.provider = self.provider_class.objects.filter(client_id=client_id).first()
598                if not self.provider:
599                    LOGGER.warning("OAuth2Provider does not exist", client_id=client_id)
600                    raise TokenError("invalid_client")
601                CTX_AUTH_VIA.set("oauth_client_secret")
602                self.params = self.params_class.parse(
603                    request, self.provider, client_id, client_secret
604                )
605
606            with start_span(
607                op="authentik.providers.oauth2.post.response",
608            ):
609                if self.params.grant_type == GRANT_TYPE_AUTHORIZATION_CODE:
610                    LOGGER.debug("Converting authorization code to access token")
611                    return TokenResponse(self.create_code_response())
612                if self.params.grant_type == GRANT_TYPE_REFRESH_TOKEN:
613                    LOGGER.debug("Refreshing refresh token")
614                    return TokenResponse(self.create_refresh_response())
615                if self.params.grant_type in [GRANT_TYPE_CLIENT_CREDENTIALS, GRANT_TYPE_PASSWORD]:
616                    LOGGER.debug("Client credentials/password grant")
617                    return TokenResponse(self.create_client_credentials_response())
618                if self.params.grant_type == GRANT_TYPE_DEVICE_CODE:
619                    LOGGER.debug("Device code grant")
620                    return TokenResponse(self.create_device_code_response())
621                raise TokenError("unsupported_grant_type")
622        except (TokenError, DeviceCodeError) as error:
623            return TokenResponse(error.create_dict(request), status=400)
624        except UserAuthError as error:
625            return TokenResponse(error.create_dict(request), status=403)
626
627    def create_code_response(self) -> dict[str, Any]:
628        """See https://datatracker.ietf.org/doc/html/rfc6749#section-4.1"""
629        now = timezone.now()
630        access_token_expiry = now + timedelta_from_string(self.provider.access_token_validity)
631        access_token = AccessToken(
632            provider=self.provider,
633            user=self.params.authorization_code.user,
634            expires=access_token_expiry,
635            # Keep same scopes as previous token
636            scope=self.params.authorization_code.scope,
637            auth_time=self.params.authorization_code.auth_time,
638            session=self.params.authorization_code.session,
639        )
640        access_id_token = IDToken.new(
641            self.provider,
642            access_token,
643            self.request,
644        )
645        access_id_token.nonce = self.params.authorization_code.nonce
646        access_token.id_token = access_id_token
647        access_token.save()
648
649        response = {
650            "access_token": access_token.token,
651            "token_type": TOKEN_TYPE,
652            "scope": " ".join(access_token.scope),
653            "expires_in": int(
654                timedelta_from_string(self.provider.access_token_validity).total_seconds()
655            ),
656            "id_token": access_token.id_token.to_jwt(self.provider),
657        }
658
659        if SCOPE_OFFLINE_ACCESS in self.params.authorization_code.scope:
660            refresh_token_expiry = now + timedelta_from_string(self.provider.refresh_token_validity)
661            refresh_token = RefreshToken(
662                user=self.params.authorization_code.user,
663                scope=self.params.authorization_code.scope,
664                expires=refresh_token_expiry,
665                provider=self.provider,
666                auth_time=self.params.authorization_code.auth_time,
667                session=self.params.authorization_code.session,
668            )
669            id_token = IDToken.new(
670                self.provider,
671                refresh_token,
672                self.request,
673            )
674            id_token.nonce = self.params.authorization_code.nonce
675            id_token.at_hash = access_token.at_hash
676            refresh_token.id_token = id_token
677            refresh_token.save()
678            response["refresh_token"] = refresh_token.token
679
680        # Delete old code
681        self.params.authorization_code.delete()
682        return response
683
684    def create_refresh_response(self) -> dict[str, Any]:
685        """See https://datatracker.ietf.org/doc/html/rfc6749#section-6"""
686        unauthorized_scopes = set(self.params.scope) - set(self.params.refresh_token.scope)
687        if unauthorized_scopes:
688            raise TokenError("invalid_scope")
689        if SCOPE_OFFLINE_ACCESS not in self.params.scope:
690            raise TokenError("invalid_scope")
691        now = timezone.now()
692        access_token_expiry = now + timedelta_from_string(self.provider.access_token_validity)
693        access_token = AccessToken(
694            provider=self.provider,
695            user=self.params.refresh_token.user,
696            expires=access_token_expiry,
697            # Keep same scopes as previous token
698            scope=self.params.refresh_token.scope,
699            auth_time=self.params.refresh_token.auth_time,
700            session=self.params.refresh_token.session,
701        )
702        access_token.id_token = IDToken.new(
703            self.provider,
704            access_token,
705            self.request,
706        )
707        access_token.save()
708
709        res = {
710            "access_token": access_token.token,
711            "token_type": TOKEN_TYPE,
712            "scope": " ".join(access_token.scope),
713            "expires_in": int(
714                timedelta_from_string(self.provider.access_token_validity).total_seconds()
715            ),
716            "id_token": access_token.id_token.to_jwt(self.provider),
717        }
718
719        refresh_token_threshold = timedelta_from_string(self.provider.refresh_token_threshold)
720        if (
721            refresh_token_threshold.total_seconds() == 0
722            or (now - self.params.refresh_token.expires) > refresh_token_threshold
723        ):
724            refresh_token_expiry = now + timedelta_from_string(self.provider.refresh_token_validity)
725            refresh_token = RefreshToken(
726                user=self.params.refresh_token.user,
727                scope=self.params.refresh_token.scope,
728                expires=refresh_token_expiry,
729                provider=self.provider,
730                auth_time=self.params.refresh_token.auth_time,
731                session=self.params.refresh_token.session,
732            )
733            id_token = IDToken.new(
734                self.provider,
735                refresh_token,
736                self.request,
737            )
738            id_token.nonce = self.params.refresh_token.id_token.nonce
739            id_token.at_hash = access_token.at_hash
740            refresh_token.id_token = id_token
741            refresh_token.save()
742
743            # Mark old token as revoked
744            self.params.refresh_token.revoked = True
745            self.params.refresh_token.save()
746            res["refresh_token"] = refresh_token.token
747
748        return res
749
750    def create_client_credentials_response(self) -> dict[str, Any]:
751        """See https://datatracker.ietf.org/doc/html/rfc6749#section-4.4"""
752        now = timezone.now()
753        access_token_expiry = now + timedelta_from_string(self.provider.access_token_validity)
754        access_token = AccessToken(
755            provider=self.provider,
756            user=self.params.user,
757            expires=access_token_expiry,
758            scope=self.params.scope,
759            auth_time=now,
760        )
761        access_token.id_token = IDToken.new(
762            self.provider,
763            access_token,
764            self.request,
765        )
766        access_token.save()
767        return {
768            "access_token": access_token.token,
769            "token_type": TOKEN_TYPE,
770            "scope": " ".join(access_token.scope),
771            "expires_in": int(
772                timedelta_from_string(self.provider.access_token_validity).total_seconds()
773            ),
774            "id_token": access_token.id_token.to_jwt(self.provider),
775        }
776
777    def create_device_code_response(self) -> dict[str, Any]:
778        """See https://datatracker.ietf.org/doc/html/rfc8628"""
779        if not self.params.device_code.user:
780            raise DeviceCodeError("authorization_pending")
781        now = timezone.now()
782        access_token_expiry = now + timedelta_from_string(self.provider.access_token_validity)
783        auth_event = get_login_event(self.params.device_code.session)
784        access_token = AccessToken(
785            provider=self.provider,
786            user=self.params.device_code.user,
787            expires=access_token_expiry,
788            scope=self.params.device_code.scope,
789            auth_time=auth_event.created if auth_event else now,
790            session=self.params.device_code.session,
791        )
792        access_token.id_token = IDToken.new(
793            self.provider,
794            access_token,
795            self.request,
796        )
797        access_token.save()
798
799        response = {
800            "access_token": access_token.token,
801            "token_type": TOKEN_TYPE,
802            "scope": " ".join(access_token.scope),
803            "expires_in": int(
804                timedelta_from_string(self.provider.access_token_validity).total_seconds()
805            ),
806            "id_token": access_token.id_token.to_jwt(self.provider),
807        }
808
809        if SCOPE_OFFLINE_ACCESS in self.params.device_code.scope:
810            refresh_token_expiry = now + timedelta_from_string(self.provider.refresh_token_validity)
811            refresh_token = RefreshToken(
812                user=self.params.device_code.user,
813                scope=self.params.device_code.scope,
814                expires=refresh_token_expiry,
815                provider=self.provider,
816                auth_time=auth_event.created if auth_event else now,
817            )
818            id_token = IDToken.new(
819                self.provider,
820                refresh_token,
821                self.request,
822            )
823            id_token.at_hash = access_token.at_hash
824            refresh_token.id_token = id_token
825            refresh_token.save()
826            response["refresh_token"] = refresh_token.token
827
828        # Delete device code
829        self.params.device_code.delete()
830        return response
LOGGER = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
@dataclass(slots=True)
class TokenParams:
 82@dataclass(slots=True)
 83class TokenParams:
 84    """Token params"""
 85
 86    client_id: str
 87    client_secret: str
 88    redirect_uri: str
 89    grant_type: str
 90    state: str
 91    scope: set[str]
 92
 93    provider: OAuth2Provider
 94
 95    authorization_code: AuthorizationCode | None = None
 96    refresh_token: RefreshToken | None = None
 97    device_code: DeviceToken | None = None
 98    user: User | None = None
 99
100    code_verifier: str | None = None
101
102    raw_code: InitVar[str] = ""
103    raw_token: InitVar[str] = ""
104    request: InitVar[HttpRequest | None] = None
105
106    @staticmethod
107    def parse(
108        request: HttpRequest,
109        provider: OAuth2Provider,
110        client_id: str,
111        client_secret: str,
112    ) -> TokenParams:
113        """Parse params for request"""
114        return TokenParams(
115            # Init vars
116            raw_code=request.POST.get("code", ""),
117            raw_token=request.POST.get("refresh_token", ""),
118            request=request,
119            # Regular params
120            provider=provider,
121            client_id=client_id,
122            client_secret=client_secret,
123            redirect_uri=request.POST.get("redirect_uri", ""),
124            grant_type=request.POST.get("grant_type", ""),
125            state=request.POST.get("state", ""),
126            scope=set(request.POST.get("scope", "").split()),
127            # PKCE parameter.
128            code_verifier=request.POST.get("code_verifier"),
129        )
130
131    def __check_scopes(self):
132        allowed_scope_names = set(
133            ScopeMapping.objects.filter(provider__in=[self.provider]).values_list(
134                "scope_name", flat=True
135            )
136        )
137        scopes_to_check = self.scope
138        if not scopes_to_check.issubset(allowed_scope_names):
139            LOGGER.info(
140                "Application requested scopes not configured, setting to overlap",
141                scope_allowed=allowed_scope_names,
142                scope_given=self.scope,
143            )
144            self.scope = self.scope.intersection(allowed_scope_names)
145
146    def __check_policy_access(self, app: Application, request: HttpRequest, **kwargs):
147        with start_span(
148            op="authentik.providers.oauth2.token.policy",
149        ):
150            user = self.user if self.user else get_anonymous_user()
151            engine = PolicyEngine(app, user, request)
152            engine.empty_result = AppAccessWithoutBindings.get()
153            # Don't cache as for client_credentials flows the user will not be set
154            # so we'll get generic cache results
155            engine.use_cache = False
156            engine.request.context["oauth_scopes"] = self.scope
157            engine.request.context["oauth_grant_type"] = self.grant_type
158            engine.request.context["oauth_code_verifier"] = self.code_verifier
159            engine.request.context.update(kwargs)
160            engine.build()
161            result = engine.result
162            if not result.passing:
163                LOGGER.info(
164                    "User not authenticated for application", user=self.user, app_slug=app.slug
165                )
166                raise TokenError("invalid_grant")
167
168    def __post_init__(self, raw_code: str, raw_token: str, request: HttpRequest):
169        if self.grant_type in [GRANT_TYPE_AUTHORIZATION_CODE, GRANT_TYPE_REFRESH_TOKEN]:
170            if self.provider.client_type == ClientTypes.CONFIDENTIAL and not compare_digest(
171                self.provider.client_secret, self.client_secret
172            ):
173                LOGGER.warning(
174                    "Invalid client secret",
175                    client_id=self.provider.client_id,
176                )
177                raise TokenError("invalid_client")
178        self.__check_scopes()
179        if self.grant_type == GRANT_TYPE_AUTHORIZATION_CODE:
180            with start_span(
181                op="authentik.providers.oauth2.post.parse.code",
182            ):
183                self.__post_init_code(raw_code, request)
184        elif self.grant_type == GRANT_TYPE_REFRESH_TOKEN:
185            with start_span(
186                op="authentik.providers.oauth2.post.parse.refresh",
187            ):
188                self.__post_init_refresh(raw_token, request)
189        elif self.grant_type in [GRANT_TYPE_CLIENT_CREDENTIALS, GRANT_TYPE_PASSWORD]:
190            with start_span(
191                op="authentik.providers.oauth2.post.parse.client_credentials",
192            ):
193                self.__post_init_client_credentials(request)
194        elif self.grant_type == GRANT_TYPE_DEVICE_CODE:
195            with start_span(
196                op="authentik.providers.oauth2.post.parse.device_code",
197            ):
198                self.__post_init_device_code(request)
199        else:
200            LOGGER.warning("Invalid grant type", grant_type=self.grant_type)
201            raise TokenError("unsupported_grant_type")
202
203    def __post_init_code(self, raw_code: str, request: HttpRequest):
204        if not raw_code:
205            LOGGER.warning("Missing authorization code")
206            raise TokenError("invalid_grant")
207
208        self.__check_redirect_uri(request)
209
210        self.authorization_code = AuthorizationCode.objects.filter(code=raw_code).first()
211        if not self.authorization_code:
212            LOGGER.warning("Code does not exist", code=raw_code)
213            raise TokenError("invalid_grant")
214
215        if self.authorization_code.is_expired:
216            LOGGER.warning(
217                "Code is expired",
218                token=raw_code,
219            )
220            raise TokenError("invalid_grant")
221
222        if self.authorization_code.provider != self.provider or self.authorization_code.is_expired:
223            LOGGER.warning("Invalid code: invalid client or code has expired")
224            raise TokenError("invalid_grant")
225
226        # Validate PKCE parameters.
227        if self.authorization_code.code_challenge:
228            # Authorization code had PKCE but we didn't get one
229            if not self.code_verifier:
230                raise TokenError("invalid_grant")
231            if self.authorization_code.code_challenge_method == PKCE_METHOD_S256:
232                new_code_challenge = pkce_s256_challenge(self.code_verifier)
233            else:
234                new_code_challenge = self.code_verifier
235
236            if new_code_challenge != self.authorization_code.code_challenge:
237                LOGGER.warning("Code challenge not matching")
238                raise TokenError("invalid_grant")
239        # Token request had a code_verifier but code did not have a code challenge
240        # Prevent downgrade
241        if not self.authorization_code.code_challenge and self.code_verifier:
242            raise TokenError("invalid_grant")
243
244    def __check_redirect_uri(self, request: HttpRequest):
245        allowed_redirect_urls = self.provider.redirect_uris
246        # At this point, no provider should have a blank redirect_uri, in case they do
247        # this will check an empty array and raise an error
248
249        match_found = False
250        for allowed in allowed_redirect_urls:
251            if allowed.matching_mode == RedirectURIMatchingMode.STRICT:
252                if self.redirect_uri == allowed.url:
253                    match_found = True
254                    break
255            if allowed.matching_mode == RedirectURIMatchingMode.REGEX:
256                try:
257                    if fullmatch(allowed.url, self.redirect_uri):
258                        match_found = True
259                        break
260                except RegexError as exc:
261                    LOGGER.warning(
262                        "Failed to parse regular expression",
263                        exc=exc,
264                        url=allowed.url,
265                        provider=self.provider,
266                    )
267                    Event.new(
268                        EventAction.CONFIGURATION_ERROR,
269                        message="Invalid redirect_uri configured",
270                        provider=self.provider,
271                    ).from_http(request)
272        if not match_found:
273            Event.new(
274                EventAction.CONFIGURATION_ERROR,
275                message="Invalid redirect URI used by provider",
276                provider=self.provider,
277                redirect_uri=self.redirect_uri,
278                expected=allowed_redirect_urls,
279            ).from_http(request)
280            raise TokenError("invalid_client")
281
282        # Check against forbidden schemes
283        if urlparse(self.redirect_uri).scheme in FORBIDDEN_URI_SCHEMES:
284            raise TokenError("invalid_request")
285
286    def __post_init_refresh(self, raw_token: str, request: HttpRequest):
287        if not raw_token:
288            LOGGER.warning("Missing refresh token")
289            raise TokenError("invalid_grant")
290
291        self.refresh_token = RefreshToken.objects.filter(
292            token=raw_token, provider=self.provider
293        ).first()
294        if not self.refresh_token:
295            LOGGER.warning(
296                "Refresh token does not exist",
297                token=raw_token,
298            )
299            raise TokenError("invalid_grant")
300        if self.refresh_token.is_expired:
301            LOGGER.warning(
302                "Refresh token is expired",
303                token=raw_token,
304            )
305            raise TokenError("invalid_grant")
306        # https://datatracker.ietf.org/doc/html/rfc6749#section-6
307        # Fallback to original token's scopes when none are given
308        if not self.scope:
309            self.scope = self.refresh_token.scope
310        if self.refresh_token.revoked:
311            LOGGER.warning("Refresh token is revoked", token=raw_token)
312            Event.new(
313                action=EventAction.SUSPICIOUS_REQUEST,
314                message="Revoked refresh token was used",
315                token=self.refresh_token,
316                provider=self.refresh_token.provider,
317            ).from_http(request, user=self.refresh_token.user)
318            raise TokenError("invalid_grant")
319
320    def __post_init_client_credentials(self, request: HttpRequest):
321        # client_credentials flow with client assertion
322        if request.POST.get(CLIENT_ASSERTION_TYPE, "") != "":
323            return self.__post_init_client_credentials_jwt(request)
324        # authentik-custom-ish client credentials flow
325        if request.POST.get("username", "") != "":
326            return self.__post_init_client_credentials_creds(
327                request, request.POST.get("username"), request.POST.get("password")
328            )
329        # Standard method which creates an automatic user
330        if self.client_secret == self.provider.client_secret:
331            return self.__post_init_client_credentials_generated(request)
332        # Standard workaround method which stores username:password
333        # as client_secret
334        try:
335            user, _, password = b64decode(self.client_secret).decode("utf-8").partition(":")
336            return self.__post_init_client_credentials_creds(request, user, password)
337        except ValueError, Error:
338            raise TokenError("invalid_grant") from None
339
340    def __post_init_client_credentials_creds(
341        self, request: HttpRequest, username: str, password: str
342    ):
343        # Authenticate user based on credentials
344        user = User.objects.filter(username=username, is_active=True).first()
345        if not user:
346            raise TokenError("invalid_grant")
347        token: Token = Token.objects.filter(
348            key=password, intent=TokenIntents.INTENT_APP_PASSWORD, user=user
349        ).first()
350        if not token or token.user.uid != user.uid:
351            raise TokenError("invalid_grant")
352        self.user = user
353        # Authorize user access
354        app = Application.objects.filter(provider=self.provider).first()
355        if not app or not app.provider:
356            raise TokenError("invalid_grant")
357        self.__check_policy_access(app, request)
358
359        Event.new(
360            action=EventAction.LOGIN,
361            **{
362                PLAN_CONTEXT_METHOD: "token",
363                PLAN_CONTEXT_METHOD_ARGS: {
364                    "identifier": token.identifier,
365                },
366                PLAN_CONTEXT_APPLICATION: app,
367            },
368        ).from_http(request, user=user)
369
370    def __validate_jwt_from_source(
371        self, assertion: str
372    ) -> tuple[dict, OAuthSource] | tuple[None, None]:
373        # Fully decode the JWT without verifying the signature, so we can get access to
374        # the header.
375        # Get the Key ID from the header, and use that to optimize our source query to only find
376        # sources that have a JWK for that Key ID
377        # The Key ID doesn't have a fixed format, but must match between an issued JWT
378        # and whatever is returned by the JWKS endpoint
379        try:
380            decode_unvalidated = PyJWT().decode_complete(
381                assertion, options={"verify_signature": False}
382            )
383        except (PyJWTError, ValueError, TypeError, AttributeError) as exc:
384            LOGGER.warning("failed to parse JWT for kid lookup", exc=exc)
385            raise TokenError("invalid_grant") from None
386        expected_kid = decode_unvalidated["header"].get("kid")
387        fallback_alg = decode_unvalidated["header"].get("alg")
388        token = source = None
389        if not expected_kid or not fallback_alg:
390            return None, None
391        for source in self.provider.jwt_federation_sources.filter(
392            oidc_jwks__keys__contains=[{"kid": expected_kid}]
393        ):
394            LOGGER.debug("verifying JWT with source", source=source.slug)
395            keys = source.oidc_jwks.get("keys", [])
396            for key in keys:
397                if key.get("kid") and key.get("kid") != expected_kid:
398                    continue
399                LOGGER.debug("verifying JWT with key", source=source.slug, key=key.get("kid"))
400                try:
401                    parsed_key = PyJWK.from_dict(key).key
402                    token = decode(
403                        assertion,
404                        parsed_key,
405                        algorithms=[key.get("alg")] if "alg" in key else [fallback_alg],
406                        options={
407                            "verify_aud": False,
408                        },
409                    )
410                # AttributeError is raised when the configured JWK is a private key
411                # and not a public key
412                except (PyJWTError, ValueError, TypeError, AttributeError) as exc:
413                    LOGGER.warning("failed to verify JWT", exc=exc, source=source.slug)
414        if token:
415            LOGGER.info("successfully verified JWT with source", source=source.slug)
416        return token, source
417
418    def __validate_jwt_from_provider(
419        self, assertion: str
420    ) -> tuple[dict, OAuth2Provider] | tuple[None, None]:
421        token = provider = _key = None
422        federated_token = AccessToken.objects.filter(
423            token=assertion, provider__in=self.provider.jwt_federation_providers.all()
424        ).first()
425        if federated_token:
426            _key, _alg = federated_token.provider.jwt_key
427            try:
428                token = decode(
429                    assertion,
430                    _key.public_key(),
431                    algorithms=[_alg],
432                    options={
433                        "verify_aud": False,
434                    },
435                )
436                provider = federated_token.provider
437                self.user = federated_token.user
438            except (PyJWTError, ValueError, TypeError, AttributeError) as exc:
439                LOGGER.warning(
440                    "failed to verify JWT", exc=exc, provider=federated_token.provider.name
441                )
442
443        if token:
444            LOGGER.info("successfully verified JWT with provider", provider=provider.name)
445        return token, provider
446
447    def __post_init_client_credentials_jwt(self, request: HttpRequest):
448        assertion_type = request.POST.get(CLIENT_ASSERTION_TYPE, "")
449        if assertion_type != CLIENT_ASSERTION_TYPE_JWT:
450            LOGGER.warning("Invalid assertion type", assertion_type=assertion_type)
451            raise TokenError("invalid_grant")
452
453        client_secret = request.POST.get("client_secret", None)
454        assertion = request.POST.get(CLIENT_ASSERTION, client_secret)
455        if not assertion:
456            LOGGER.warning("Missing client assertion")
457            raise TokenError("invalid_grant")
458
459        source = provider = None
460
461        token, source = self.__validate_jwt_from_source(assertion)
462        if not token:
463            token, provider = self.__validate_jwt_from_provider(assertion)
464
465        if not token:
466            LOGGER.warning("No token could be verified")
467            raise TokenError("invalid_grant")
468
469        if "exp" in token:
470            exp = datetime.fromtimestamp(token["exp"])
471            # Non-timezone aware check since we assume `exp` is in UTC
472            if datetime.now() >= exp:
473                LOGGER.info("JWT token expired")
474                raise TokenError("invalid_grant")
475
476        app = Application.objects.filter(provider=self.provider).first()
477        if not app or not app.provider:
478            LOGGER.info("client_credentials grant for provider without application")
479            raise TokenError("invalid_grant")
480
481        self.__check_policy_access(app, request, oauth_jwt=token)
482        if not provider:
483            self.__create_user_from_jwt(token, app, source, request)
484
485        method_args = {
486            "jwt": token,
487        }
488        if source:
489            method_args["source"] = source
490        if provider:
491            method_args["provider"] = provider
492        Event.new(
493            action=EventAction.LOGIN,
494            **{
495                PLAN_CONTEXT_METHOD: "jwt",
496                PLAN_CONTEXT_METHOD_ARGS: method_args,
497                PLAN_CONTEXT_APPLICATION: app,
498            },
499        ).from_http(request, user=self.user)
500
501    def __post_init_client_credentials_generated(self, request: HttpRequest):
502        # Authorize user access
503        app = Application.objects.filter(provider=self.provider).first()
504        if not app or not app.provider:
505            raise TokenError("invalid_grant")
506        with audit_ignore():
507            self.user, _ = User.objects.update_or_create(
508                # trim username to ensure the entire username is max 150 chars
509                # (22 chars being the length of the "template")
510                username=f"ak-{self.provider.name[: USERNAME_MAX_LENGTH - 22]}-client_credentials",
511                defaults={
512                    "last_login": timezone.now(),
513                    "name": f"Autogenerated user from application {app.name} (client credentials)",
514                    "path": f"{USER_PATH_SYSTEM_PREFIX}/apps/{app.slug}",
515                    "type": UserTypes.SERVICE_ACCOUNT,
516                },
517            )
518            self.user.attributes[USER_ATTRIBUTE_GENERATED] = True
519            self.user.save()
520        self.__check_policy_access(app, request)
521
522        Event.new(
523            action=EventAction.LOGIN,
524            **{
525                PLAN_CONTEXT_METHOD: "oauth_client_secret",
526                PLAN_CONTEXT_APPLICATION: app,
527            },
528        ).from_http(request, user=self.user)
529
530    def __post_init_device_code(self, request: HttpRequest):
531        device_code = request.POST.get("device_code", "")
532        code = DeviceToken.objects.filter(device_code=device_code, provider=self.provider).first()
533        if not code:
534            raise TokenError("invalid_grant")
535        self.device_code = code
536
537    def __create_user_from_jwt(
538        self, token: dict[str, Any], app: Application, source: OAuthSource, request: HttpRequest
539    ):
540        """Create user from JWT"""
541        with audit_ignore():
542            # Run the JWT payload through the core mapping engine
543            mapped = SourceMapper(source).build_object_properties(
544                User, request=request, info=token, oauth_userinfo=token
545            )
546
547            self.user, created = User.objects.update_or_create(
548                username=mapped.get("username", f"{self.provider.name}-{token.get('sub')}")[
549                    :USERNAME_MAX_LENGTH
550                ],
551                defaults={
552                    "last_login": timezone.now(),
553                    "name": mapped.get(
554                        "name",
555                        f"Autogenerated user from application {app.name} (client credentials JWT)",
556                    ),
557                    "email": mapped.get("email", ""),
558                    "path": source.get_user_path(),
559                    "type": UserTypes.SERVICE_ACCOUNT,
560                    "attributes": mapped.get("attributes", {}),
561                },
562            )
563            self.user.attributes[USER_ATTRIBUTE_GENERATED] = True
564            self.user.save()
565            exp = token.get("exp")
566            if created and exp:
567                self.user.attributes[USER_ATTRIBUTE_EXPIRES] = exp
568                self.user.save()

Token params

TokenParams( client_id: str, client_secret: str, redirect_uri: str, grant_type: str, state: str, scope: set[str], provider: authentik.providers.oauth2.models.OAuth2Provider, authorization_code: authentik.providers.oauth2.models.AuthorizationCode | None = None, refresh_token: authentik.providers.oauth2.models.RefreshToken | None = None, device_code: authentik.providers.oauth2.models.DeviceToken | None = None, user: authentik.core.models.User | None = None, code_verifier: str | None = None, raw_code: dataclasses.InitVar[str] = '', raw_token: dataclasses.InitVar[str] = '', request: dataclasses.InitVar[django.http.request.HttpRequest | None] = None)
client_id: str
client_secret: str
redirect_uri: str
grant_type: str
state: str
scope: set[str]
code_verifier: str | None
raw_code: dataclasses.InitVar[str] = ''
raw_token: dataclasses.InitVar[str] = ''
request: dataclasses.InitVar[django.http.request.HttpRequest | None] = None
@staticmethod
def parse( request: django.http.request.HttpRequest, provider: authentik.providers.oauth2.models.OAuth2Provider, client_id: str, client_secret: str) -> TokenParams:
106    @staticmethod
107    def parse(
108        request: HttpRequest,
109        provider: OAuth2Provider,
110        client_id: str,
111        client_secret: str,
112    ) -> TokenParams:
113        """Parse params for request"""
114        return TokenParams(
115            # Init vars
116            raw_code=request.POST.get("code", ""),
117            raw_token=request.POST.get("refresh_token", ""),
118            request=request,
119            # Regular params
120            provider=provider,
121            client_id=client_id,
122            client_secret=client_secret,
123            redirect_uri=request.POST.get("redirect_uri", ""),
124            grant_type=request.POST.get("grant_type", ""),
125            state=request.POST.get("state", ""),
126            scope=set(request.POST.get("scope", "").split()),
127            # PKCE parameter.
128            code_verifier=request.POST.get("code_verifier"),
129        )

Parse params for request

@method_decorator(csrf_exempt, name='dispatch')
class TokenView(django.views.generic.base.View):
571@method_decorator(csrf_exempt, name="dispatch")
572class TokenView(View):
573    """Generate tokens for clients"""
574
575    provider: OAuth2Provider | None = None
576    params: TokenParams | None = None
577    params_class = TokenParams
578    provider_class = OAuth2Provider
579
580    def dispatch(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse:
581        response = super().dispatch(request, *args, **kwargs)
582        allowed_origins = []
583        if self.provider:
584            allowed_origins = [x.url for x in self.provider.redirect_uris]
585        cors_allow(self.request, response, *allowed_origins)
586        return response
587
588    def options(self, request: HttpRequest) -> HttpResponse:
589        return TokenResponse({})
590
591    def post(self, request: HttpRequest) -> HttpResponse:
592        """Generate tokens for clients"""
593        try:
594            with start_span(
595                op="authentik.providers.oauth2.post.parse",
596            ):
597                client_id, client_secret = extract_client_auth(request)
598                self.provider = self.provider_class.objects.filter(client_id=client_id).first()
599                if not self.provider:
600                    LOGGER.warning("OAuth2Provider does not exist", client_id=client_id)
601                    raise TokenError("invalid_client")
602                CTX_AUTH_VIA.set("oauth_client_secret")
603                self.params = self.params_class.parse(
604                    request, self.provider, client_id, client_secret
605                )
606
607            with start_span(
608                op="authentik.providers.oauth2.post.response",
609            ):
610                if self.params.grant_type == GRANT_TYPE_AUTHORIZATION_CODE:
611                    LOGGER.debug("Converting authorization code to access token")
612                    return TokenResponse(self.create_code_response())
613                if self.params.grant_type == GRANT_TYPE_REFRESH_TOKEN:
614                    LOGGER.debug("Refreshing refresh token")
615                    return TokenResponse(self.create_refresh_response())
616                if self.params.grant_type in [GRANT_TYPE_CLIENT_CREDENTIALS, GRANT_TYPE_PASSWORD]:
617                    LOGGER.debug("Client credentials/password grant")
618                    return TokenResponse(self.create_client_credentials_response())
619                if self.params.grant_type == GRANT_TYPE_DEVICE_CODE:
620                    LOGGER.debug("Device code grant")
621                    return TokenResponse(self.create_device_code_response())
622                raise TokenError("unsupported_grant_type")
623        except (TokenError, DeviceCodeError) as error:
624            return TokenResponse(error.create_dict(request), status=400)
625        except UserAuthError as error:
626            return TokenResponse(error.create_dict(request), status=403)
627
628    def create_code_response(self) -> dict[str, Any]:
629        """See https://datatracker.ietf.org/doc/html/rfc6749#section-4.1"""
630        now = timezone.now()
631        access_token_expiry = now + timedelta_from_string(self.provider.access_token_validity)
632        access_token = AccessToken(
633            provider=self.provider,
634            user=self.params.authorization_code.user,
635            expires=access_token_expiry,
636            # Keep same scopes as previous token
637            scope=self.params.authorization_code.scope,
638            auth_time=self.params.authorization_code.auth_time,
639            session=self.params.authorization_code.session,
640        )
641        access_id_token = IDToken.new(
642            self.provider,
643            access_token,
644            self.request,
645        )
646        access_id_token.nonce = self.params.authorization_code.nonce
647        access_token.id_token = access_id_token
648        access_token.save()
649
650        response = {
651            "access_token": access_token.token,
652            "token_type": TOKEN_TYPE,
653            "scope": " ".join(access_token.scope),
654            "expires_in": int(
655                timedelta_from_string(self.provider.access_token_validity).total_seconds()
656            ),
657            "id_token": access_token.id_token.to_jwt(self.provider),
658        }
659
660        if SCOPE_OFFLINE_ACCESS in self.params.authorization_code.scope:
661            refresh_token_expiry = now + timedelta_from_string(self.provider.refresh_token_validity)
662            refresh_token = RefreshToken(
663                user=self.params.authorization_code.user,
664                scope=self.params.authorization_code.scope,
665                expires=refresh_token_expiry,
666                provider=self.provider,
667                auth_time=self.params.authorization_code.auth_time,
668                session=self.params.authorization_code.session,
669            )
670            id_token = IDToken.new(
671                self.provider,
672                refresh_token,
673                self.request,
674            )
675            id_token.nonce = self.params.authorization_code.nonce
676            id_token.at_hash = access_token.at_hash
677            refresh_token.id_token = id_token
678            refresh_token.save()
679            response["refresh_token"] = refresh_token.token
680
681        # Delete old code
682        self.params.authorization_code.delete()
683        return response
684
685    def create_refresh_response(self) -> dict[str, Any]:
686        """See https://datatracker.ietf.org/doc/html/rfc6749#section-6"""
687        unauthorized_scopes = set(self.params.scope) - set(self.params.refresh_token.scope)
688        if unauthorized_scopes:
689            raise TokenError("invalid_scope")
690        if SCOPE_OFFLINE_ACCESS not in self.params.scope:
691            raise TokenError("invalid_scope")
692        now = timezone.now()
693        access_token_expiry = now + timedelta_from_string(self.provider.access_token_validity)
694        access_token = AccessToken(
695            provider=self.provider,
696            user=self.params.refresh_token.user,
697            expires=access_token_expiry,
698            # Keep same scopes as previous token
699            scope=self.params.refresh_token.scope,
700            auth_time=self.params.refresh_token.auth_time,
701            session=self.params.refresh_token.session,
702        )
703        access_token.id_token = IDToken.new(
704            self.provider,
705            access_token,
706            self.request,
707        )
708        access_token.save()
709
710        res = {
711            "access_token": access_token.token,
712            "token_type": TOKEN_TYPE,
713            "scope": " ".join(access_token.scope),
714            "expires_in": int(
715                timedelta_from_string(self.provider.access_token_validity).total_seconds()
716            ),
717            "id_token": access_token.id_token.to_jwt(self.provider),
718        }
719
720        refresh_token_threshold = timedelta_from_string(self.provider.refresh_token_threshold)
721        if (
722            refresh_token_threshold.total_seconds() == 0
723            or (now - self.params.refresh_token.expires) > refresh_token_threshold
724        ):
725            refresh_token_expiry = now + timedelta_from_string(self.provider.refresh_token_validity)
726            refresh_token = RefreshToken(
727                user=self.params.refresh_token.user,
728                scope=self.params.refresh_token.scope,
729                expires=refresh_token_expiry,
730                provider=self.provider,
731                auth_time=self.params.refresh_token.auth_time,
732                session=self.params.refresh_token.session,
733            )
734            id_token = IDToken.new(
735                self.provider,
736                refresh_token,
737                self.request,
738            )
739            id_token.nonce = self.params.refresh_token.id_token.nonce
740            id_token.at_hash = access_token.at_hash
741            refresh_token.id_token = id_token
742            refresh_token.save()
743
744            # Mark old token as revoked
745            self.params.refresh_token.revoked = True
746            self.params.refresh_token.save()
747            res["refresh_token"] = refresh_token.token
748
749        return res
750
751    def create_client_credentials_response(self) -> dict[str, Any]:
752        """See https://datatracker.ietf.org/doc/html/rfc6749#section-4.4"""
753        now = timezone.now()
754        access_token_expiry = now + timedelta_from_string(self.provider.access_token_validity)
755        access_token = AccessToken(
756            provider=self.provider,
757            user=self.params.user,
758            expires=access_token_expiry,
759            scope=self.params.scope,
760            auth_time=now,
761        )
762        access_token.id_token = IDToken.new(
763            self.provider,
764            access_token,
765            self.request,
766        )
767        access_token.save()
768        return {
769            "access_token": access_token.token,
770            "token_type": TOKEN_TYPE,
771            "scope": " ".join(access_token.scope),
772            "expires_in": int(
773                timedelta_from_string(self.provider.access_token_validity).total_seconds()
774            ),
775            "id_token": access_token.id_token.to_jwt(self.provider),
776        }
777
778    def create_device_code_response(self) -> dict[str, Any]:
779        """See https://datatracker.ietf.org/doc/html/rfc8628"""
780        if not self.params.device_code.user:
781            raise DeviceCodeError("authorization_pending")
782        now = timezone.now()
783        access_token_expiry = now + timedelta_from_string(self.provider.access_token_validity)
784        auth_event = get_login_event(self.params.device_code.session)
785        access_token = AccessToken(
786            provider=self.provider,
787            user=self.params.device_code.user,
788            expires=access_token_expiry,
789            scope=self.params.device_code.scope,
790            auth_time=auth_event.created if auth_event else now,
791            session=self.params.device_code.session,
792        )
793        access_token.id_token = IDToken.new(
794            self.provider,
795            access_token,
796            self.request,
797        )
798        access_token.save()
799
800        response = {
801            "access_token": access_token.token,
802            "token_type": TOKEN_TYPE,
803            "scope": " ".join(access_token.scope),
804            "expires_in": int(
805                timedelta_from_string(self.provider.access_token_validity).total_seconds()
806            ),
807            "id_token": access_token.id_token.to_jwt(self.provider),
808        }
809
810        if SCOPE_OFFLINE_ACCESS in self.params.device_code.scope:
811            refresh_token_expiry = now + timedelta_from_string(self.provider.refresh_token_validity)
812            refresh_token = RefreshToken(
813                user=self.params.device_code.user,
814                scope=self.params.device_code.scope,
815                expires=refresh_token_expiry,
816                provider=self.provider,
817                auth_time=auth_event.created if auth_event else now,
818            )
819            id_token = IDToken.new(
820                self.provider,
821                refresh_token,
822                self.request,
823            )
824            id_token.at_hash = access_token.at_hash
825            refresh_token.id_token = id_token
826            refresh_token.save()
827            response["refresh_token"] = refresh_token.token
828
829        # Delete device code
830        self.params.device_code.delete()
831        return response

Generate tokens for clients

params: TokenParams | None = None
params_class = <class 'TokenParams'>
def dispatch( self, request: django.http.request.HttpRequest, *args: Any, **kwargs: Any) -> django.http.response.HttpResponse:
580    def dispatch(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse:
581        response = super().dispatch(request, *args, **kwargs)
582        allowed_origins = []
583        if self.provider:
584            allowed_origins = [x.url for x in self.provider.redirect_uris]
585        cors_allow(self.request, response, *allowed_origins)
586        return response
def options( self, request: django.http.request.HttpRequest) -> django.http.response.HttpResponse:
588    def options(self, request: HttpRequest) -> HttpResponse:
589        return TokenResponse({})

Handle responding to requests for the OPTIONS HTTP verb.

def post( self, request: django.http.request.HttpRequest) -> django.http.response.HttpResponse:
591    def post(self, request: HttpRequest) -> HttpResponse:
592        """Generate tokens for clients"""
593        try:
594            with start_span(
595                op="authentik.providers.oauth2.post.parse",
596            ):
597                client_id, client_secret = extract_client_auth(request)
598                self.provider = self.provider_class.objects.filter(client_id=client_id).first()
599                if not self.provider:
600                    LOGGER.warning("OAuth2Provider does not exist", client_id=client_id)
601                    raise TokenError("invalid_client")
602                CTX_AUTH_VIA.set("oauth_client_secret")
603                self.params = self.params_class.parse(
604                    request, self.provider, client_id, client_secret
605                )
606
607            with start_span(
608                op="authentik.providers.oauth2.post.response",
609            ):
610                if self.params.grant_type == GRANT_TYPE_AUTHORIZATION_CODE:
611                    LOGGER.debug("Converting authorization code to access token")
612                    return TokenResponse(self.create_code_response())
613                if self.params.grant_type == GRANT_TYPE_REFRESH_TOKEN:
614                    LOGGER.debug("Refreshing refresh token")
615                    return TokenResponse(self.create_refresh_response())
616                if self.params.grant_type in [GRANT_TYPE_CLIENT_CREDENTIALS, GRANT_TYPE_PASSWORD]:
617                    LOGGER.debug("Client credentials/password grant")
618                    return TokenResponse(self.create_client_credentials_response())
619                if self.params.grant_type == GRANT_TYPE_DEVICE_CODE:
620                    LOGGER.debug("Device code grant")
621                    return TokenResponse(self.create_device_code_response())
622                raise TokenError("unsupported_grant_type")
623        except (TokenError, DeviceCodeError) as error:
624            return TokenResponse(error.create_dict(request), status=400)
625        except UserAuthError as error:
626            return TokenResponse(error.create_dict(request), status=403)

Generate tokens for clients

def create_code_response(self) -> dict[str, typing.Any]:
628    def create_code_response(self) -> dict[str, Any]:
629        """See https://datatracker.ietf.org/doc/html/rfc6749#section-4.1"""
630        now = timezone.now()
631        access_token_expiry = now + timedelta_from_string(self.provider.access_token_validity)
632        access_token = AccessToken(
633            provider=self.provider,
634            user=self.params.authorization_code.user,
635            expires=access_token_expiry,
636            # Keep same scopes as previous token
637            scope=self.params.authorization_code.scope,
638            auth_time=self.params.authorization_code.auth_time,
639            session=self.params.authorization_code.session,
640        )
641        access_id_token = IDToken.new(
642            self.provider,
643            access_token,
644            self.request,
645        )
646        access_id_token.nonce = self.params.authorization_code.nonce
647        access_token.id_token = access_id_token
648        access_token.save()
649
650        response = {
651            "access_token": access_token.token,
652            "token_type": TOKEN_TYPE,
653            "scope": " ".join(access_token.scope),
654            "expires_in": int(
655                timedelta_from_string(self.provider.access_token_validity).total_seconds()
656            ),
657            "id_token": access_token.id_token.to_jwt(self.provider),
658        }
659
660        if SCOPE_OFFLINE_ACCESS in self.params.authorization_code.scope:
661            refresh_token_expiry = now + timedelta_from_string(self.provider.refresh_token_validity)
662            refresh_token = RefreshToken(
663                user=self.params.authorization_code.user,
664                scope=self.params.authorization_code.scope,
665                expires=refresh_token_expiry,
666                provider=self.provider,
667                auth_time=self.params.authorization_code.auth_time,
668                session=self.params.authorization_code.session,
669            )
670            id_token = IDToken.new(
671                self.provider,
672                refresh_token,
673                self.request,
674            )
675            id_token.nonce = self.params.authorization_code.nonce
676            id_token.at_hash = access_token.at_hash
677            refresh_token.id_token = id_token
678            refresh_token.save()
679            response["refresh_token"] = refresh_token.token
680
681        # Delete old code
682        self.params.authorization_code.delete()
683        return response
def create_refresh_response(self) -> dict[str, typing.Any]:
685    def create_refresh_response(self) -> dict[str, Any]:
686        """See https://datatracker.ietf.org/doc/html/rfc6749#section-6"""
687        unauthorized_scopes = set(self.params.scope) - set(self.params.refresh_token.scope)
688        if unauthorized_scopes:
689            raise TokenError("invalid_scope")
690        if SCOPE_OFFLINE_ACCESS not in self.params.scope:
691            raise TokenError("invalid_scope")
692        now = timezone.now()
693        access_token_expiry = now + timedelta_from_string(self.provider.access_token_validity)
694        access_token = AccessToken(
695            provider=self.provider,
696            user=self.params.refresh_token.user,
697            expires=access_token_expiry,
698            # Keep same scopes as previous token
699            scope=self.params.refresh_token.scope,
700            auth_time=self.params.refresh_token.auth_time,
701            session=self.params.refresh_token.session,
702        )
703        access_token.id_token = IDToken.new(
704            self.provider,
705            access_token,
706            self.request,
707        )
708        access_token.save()
709
710        res = {
711            "access_token": access_token.token,
712            "token_type": TOKEN_TYPE,
713            "scope": " ".join(access_token.scope),
714            "expires_in": int(
715                timedelta_from_string(self.provider.access_token_validity).total_seconds()
716            ),
717            "id_token": access_token.id_token.to_jwt(self.provider),
718        }
719
720        refresh_token_threshold = timedelta_from_string(self.provider.refresh_token_threshold)
721        if (
722            refresh_token_threshold.total_seconds() == 0
723            or (now - self.params.refresh_token.expires) > refresh_token_threshold
724        ):
725            refresh_token_expiry = now + timedelta_from_string(self.provider.refresh_token_validity)
726            refresh_token = RefreshToken(
727                user=self.params.refresh_token.user,
728                scope=self.params.refresh_token.scope,
729                expires=refresh_token_expiry,
730                provider=self.provider,
731                auth_time=self.params.refresh_token.auth_time,
732                session=self.params.refresh_token.session,
733            )
734            id_token = IDToken.new(
735                self.provider,
736                refresh_token,
737                self.request,
738            )
739            id_token.nonce = self.params.refresh_token.id_token.nonce
740            id_token.at_hash = access_token.at_hash
741            refresh_token.id_token = id_token
742            refresh_token.save()
743
744            # Mark old token as revoked
745            self.params.refresh_token.revoked = True
746            self.params.refresh_token.save()
747            res["refresh_token"] = refresh_token.token
748
749        return res
def create_client_credentials_response(self) -> dict[str, typing.Any]:
751    def create_client_credentials_response(self) -> dict[str, Any]:
752        """See https://datatracker.ietf.org/doc/html/rfc6749#section-4.4"""
753        now = timezone.now()
754        access_token_expiry = now + timedelta_from_string(self.provider.access_token_validity)
755        access_token = AccessToken(
756            provider=self.provider,
757            user=self.params.user,
758            expires=access_token_expiry,
759            scope=self.params.scope,
760            auth_time=now,
761        )
762        access_token.id_token = IDToken.new(
763            self.provider,
764            access_token,
765            self.request,
766        )
767        access_token.save()
768        return {
769            "access_token": access_token.token,
770            "token_type": TOKEN_TYPE,
771            "scope": " ".join(access_token.scope),
772            "expires_in": int(
773                timedelta_from_string(self.provider.access_token_validity).total_seconds()
774            ),
775            "id_token": access_token.id_token.to_jwt(self.provider),
776        }
def create_device_code_response(self) -> dict[str, typing.Any]:
778    def create_device_code_response(self) -> dict[str, Any]:
779        """See https://datatracker.ietf.org/doc/html/rfc8628"""
780        if not self.params.device_code.user:
781            raise DeviceCodeError("authorization_pending")
782        now = timezone.now()
783        access_token_expiry = now + timedelta_from_string(self.provider.access_token_validity)
784        auth_event = get_login_event(self.params.device_code.session)
785        access_token = AccessToken(
786            provider=self.provider,
787            user=self.params.device_code.user,
788            expires=access_token_expiry,
789            scope=self.params.device_code.scope,
790            auth_time=auth_event.created if auth_event else now,
791            session=self.params.device_code.session,
792        )
793        access_token.id_token = IDToken.new(
794            self.provider,
795            access_token,
796            self.request,
797        )
798        access_token.save()
799
800        response = {
801            "access_token": access_token.token,
802            "token_type": TOKEN_TYPE,
803            "scope": " ".join(access_token.scope),
804            "expires_in": int(
805                timedelta_from_string(self.provider.access_token_validity).total_seconds()
806            ),
807            "id_token": access_token.id_token.to_jwt(self.provider),
808        }
809
810        if SCOPE_OFFLINE_ACCESS in self.params.device_code.scope:
811            refresh_token_expiry = now + timedelta_from_string(self.provider.refresh_token_validity)
812            refresh_token = RefreshToken(
813                user=self.params.device_code.user,
814                scope=self.params.device_code.scope,
815                expires=refresh_token_expiry,
816                provider=self.provider,
817                auth_time=auth_event.created if auth_event else now,
818            )
819            id_token = IDToken.new(
820                self.provider,
821                refresh_token,
822                self.request,
823            )
824            id_token.at_hash = access_token.at_hash
825            refresh_token.id_token = id_token
826            refresh_token.save()
827            response["refresh_token"] = refresh_token.token
828
829        # Delete device code
830        self.params.device_code.delete()
831        return response