authentik.providers.oauth2.models

OAuth Provider Models

  1"""OAuth Provider Models"""
  2
  3import base64
  4import binascii
  5import json
  6from dataclasses import asdict, dataclass
  7from functools import cached_property
  8from hashlib import sha256
  9from typing import TYPE_CHECKING, Any
 10from urllib.parse import urlparse, urlunparse
 11
 12from cryptography.hazmat.primitives.asymmetric.ec import (
 13    SECP256R1,
 14    SECP384R1,
 15    SECP521R1,
 16    EllipticCurvePrivateKey,
 17)
 18from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey
 19from cryptography.hazmat.primitives.asymmetric.types import PrivateKeyTypes
 20from dacite import Config
 21from dacite.core import from_dict
 22from django.contrib.postgres.indexes import HashIndex
 23from django.db import models
 24from django.http import HttpRequest
 25from django.templatetags.static import static
 26from django.urls import reverse
 27from django.utils.translation import gettext_lazy as _
 28from jwcrypto.common import json_encode
 29from jwcrypto.jwe import JWE
 30from jwcrypto.jwk import JWK
 31from jwt import encode
 32from rest_framework.serializers import Serializer
 33from structlog.stdlib import get_logger
 34
 35from authentik.brands.models import WebfingerProvider
 36from authentik.common.oauth.constants import SubModes
 37from authentik.core.models import (
 38    AuthenticatedSession,
 39    ExpiringModel,
 40    PropertyMapping,
 41    Provider,
 42    User,
 43)
 44from authentik.crypto.models import CertificateKeyPair
 45from authentik.lib.generators import generate_code_fixed_length, generate_id, generate_key
 46from authentik.lib.models import DomainlessURLValidator, InternallyManagedMixin, SerializerModel
 47from authentik.lib.utils.time import timedelta_string_validator
 48from authentik.sources.oauth.models import OAuthSource
 49
 50if TYPE_CHECKING:
 51    from authentik.providers.oauth2.id_token import IDToken
 52
 53LOGGER = get_logger()
 54
 55
 56def generate_client_secret() -> str:
 57    """Generate client secret with adequate length"""
 58    return generate_id(128)
 59
 60
 61class ClientTypes(models.TextChoices):
 62    """Confidential clients are capable of maintaining the confidentiality
 63    of their credentials. Public clients are incapable."""
 64
 65    CONFIDENTIAL = "confidential", _("Confidential")
 66    PUBLIC = "public", _("Public")
 67
 68
 69class GrantTypes(models.TextChoices):
 70    """OAuth2 Grant types we support"""
 71
 72    AUTHORIZATION_CODE = "authorization_code"
 73    IMPLICIT = "implicit"
 74    HYBRID = "hybrid"
 75
 76
 77class ResponseMode(models.TextChoices):
 78    """https://openid.net/specs/oauth-v2-multiple-response-types-1_0.html#OAuth.Post"""
 79
 80    QUERY = "query"
 81    FRAGMENT = "fragment"
 82    FORM_POST = "form_post"
 83
 84
 85class IssuerMode(models.TextChoices):
 86    """Configure how the `iss` field is created."""
 87
 88    GLOBAL = "global", _("Same identifier is used for all providers")
 89    PER_PROVIDER = (
 90        "per_provider",
 91        _("Each provider has a different issuer, based on the application slug."),
 92    )
 93
 94
 95class RedirectURIMatchingMode(models.TextChoices):
 96    STRICT = "strict", _("Strict URL comparison")
 97    REGEX = "regex", _("Regular Expression URL matching")
 98
 99
100class OAuth2LogoutMethod(models.TextChoices):
101    """OAuth2/OIDC Logout methods"""
102
103    BACKCHANNEL = "backchannel", _("Back-channel")
104    FRONTCHANNEL = "frontchannel", _("Front-channel")
105
106
107@dataclass
108class RedirectURI:
109    """A single redirect URI entry"""
110
111    matching_mode: RedirectURIMatchingMode
112    url: str
113
114
115class ResponseTypes(models.TextChoices):
116    """Response Type required by the client."""
117
118    CODE = "code", _("code (Authorization Code Flow)")
119    ID_TOKEN = "id_token", _("id_token (Implicit Flow)")
120    ID_TOKEN_TOKEN = "id_token token", _("id_token token (Implicit Flow)")
121    CODE_TOKEN = "code token", _("code token (Hybrid Flow)")
122    CODE_ID_TOKEN = "code id_token", _("code id_token (Hybrid Flow)")
123    CODE_ID_TOKEN_TOKEN = "code id_token token", _("code id_token token (Hybrid Flow)")
124
125
126class JWTAlgorithms(models.TextChoices):
127    """Algorithm used to sign the JWT Token"""
128
129    HS256 = "HS256", _("HS256 (Symmetric Encryption)")
130    RS256 = "RS256", _("RS256 (Asymmetric Encryption)")
131    ES256 = "ES256", _("ES256 (Asymmetric Encryption)")
132    ES384 = "ES384", _("ES384 (Asymmetric Encryption)")
133    ES512 = "ES512", _("ES512 (Asymmetric Encryption)")
134
135    @classmethod
136    def from_private_key(cls, private_key: PrivateKeyTypes | None) -> str:
137        if isinstance(private_key, RSAPrivateKey):
138            return cls.RS256
139        if isinstance(private_key, EllipticCurvePrivateKey):
140            curve = private_key.curve
141            if isinstance(curve, SECP256R1):
142                return cls.ES256
143            if isinstance(curve, SECP384R1):
144                return cls.ES384
145            if isinstance(curve, SECP521R1):
146                return cls.ES512
147        raise ValueError(f"Invalid private key type: {type(private_key)}")
148
149
150class ScopeMapping(PropertyMapping):
151    """Map an OAuth Scope to users properties"""
152
153    scope_name = models.TextField(help_text=_("Scope used by the client"))
154    description = models.TextField(
155        blank=True,
156        help_text=_(
157            "Description shown to the user when consenting. "
158            "If left empty, the user won't be informed."
159        ),
160    )
161
162    @property
163    def component(self) -> str:
164        return "ak-property-mapping-provider-scope-form"
165
166    @property
167    def serializer(self) -> type[Serializer]:
168        from authentik.providers.oauth2.api.scopes import ScopeMappingSerializer
169
170        return ScopeMappingSerializer
171
172    def __str__(self):
173        return f"Scope Mapping {self.name} ({self.scope_name})"
174
175    class Meta:
176        verbose_name = _("Scope Mapping")
177        verbose_name_plural = _("Scope Mappings")
178
179
180class OAuth2Provider(WebfingerProvider, Provider):
181    """OAuth2 Provider for generic OAuth and OpenID Connect Applications."""
182
183    client_type = models.CharField(
184        max_length=30,
185        choices=ClientTypes.choices,
186        default=ClientTypes.CONFIDENTIAL,
187        verbose_name=_("Client Type"),
188        help_text=_(
189            "Confidential clients are capable of maintaining the confidentiality "
190            "of their credentials. Public clients are incapable"
191        ),
192    )
193    client_id = models.CharField(
194        max_length=255,
195        unique=True,
196        verbose_name=_("Client ID"),
197        default=generate_id,
198    )
199    client_secret = models.CharField(
200        max_length=255,
201        blank=True,
202        verbose_name=_("Client Secret"),
203        default=generate_client_secret,
204    )
205    _redirect_uris = models.JSONField(
206        default=list,
207        verbose_name=_("Redirect URIs"),
208    )
209    logout_uri = models.TextField(
210        validators=[DomainlessURLValidator(schemes=("http", "https"))],
211        verbose_name=_("Logout URI"),
212        blank=True,
213    )
214    logout_method = models.TextField(
215        choices=OAuth2LogoutMethod.choices,
216        default=OAuth2LogoutMethod.BACKCHANNEL,
217        verbose_name=_("Logout Method"),
218        help_text=_(
219            "Backchannel logs out with server to server calls. "
220            "Frontchannel uses iframes in your browser"
221        ),
222    )
223
224    include_claims_in_id_token = models.BooleanField(
225        default=True,
226        verbose_name=_("Include claims in id_token"),
227        help_text=_(
228            "Include User claims from scopes in the id_token, for applications "
229            "that don't access the userinfo endpoint."
230        ),
231    )
232
233    access_code_validity = models.TextField(
234        default="minutes=1",
235        validators=[timedelta_string_validator],
236        help_text=_(
237            "Access codes not valid on or after current time + this value "
238            "(Format: hours=1;minutes=2;seconds=3)."
239        ),
240    )
241    access_token_validity = models.TextField(
242        default="hours=1",
243        validators=[timedelta_string_validator],
244        help_text=_(
245            "Tokens not valid on or after current time + this value "
246            "(Format: hours=1;minutes=2;seconds=3)."
247        ),
248    )
249    refresh_token_validity = models.TextField(
250        default="days=30",
251        validators=[timedelta_string_validator],
252        help_text=_(
253            "Tokens not valid on or after current time + this value "
254            "(Format: hours=1;minutes=2;seconds=3)."
255        ),
256    )
257    refresh_token_threshold = models.TextField(
258        default="seconds=0",
259        validators=[timedelta_string_validator],
260        help_text=_(
261            "When refreshing a token, if the refresh token is valid for less than "
262            "this duration, it will be renewed. "
263            "When set to seconds=0, token will always be renewed. "
264            "(Format: hours=1;minutes=2;seconds=3)."
265        ),
266    )
267
268    sub_mode = models.TextField(
269        choices=SubModes.choices,
270        default=SubModes.HASHED_USER_ID,
271        help_text=_(
272            "Configure what data should be used as unique User Identifier. For most cases, "
273            "the default should be fine."
274        ),
275    )
276    issuer_mode = models.TextField(
277        choices=IssuerMode.choices,
278        default=IssuerMode.PER_PROVIDER,
279        help_text=_("Configure how the issuer field of the ID Token should be filled."),
280    )
281
282    signing_key = models.ForeignKey(
283        CertificateKeyPair,
284        verbose_name=_("Signing Key"),
285        on_delete=models.SET_NULL,
286        null=True,
287        help_text=_("Key used to sign the tokens."),
288        related_name="oauth2provider_signing_key_set",
289    )
290    encryption_key = models.ForeignKey(
291        CertificateKeyPair,
292        verbose_name=_("Encryption Key"),
293        on_delete=models.SET_NULL,
294        null=True,
295        help_text=_(
296            "Key used to encrypt the tokens. When set, "
297            "tokens will be encrypted and returned as JWEs."
298        ),
299        related_name="oauth2provider_encryption_key_set",
300    )
301
302    jwt_federation_sources = models.ManyToManyField(
303        OAuthSource,
304        verbose_name=_(
305            "Any JWT signed by the JWK of the selected source can be used to authenticate."
306        ),
307        related_name="oauth2_providers",
308        default=None,
309        blank=True,
310    )
311    jwt_federation_providers = models.ManyToManyField("OAuth2Provider", blank=True, default=None)
312
313    @cached_property
314    def jwt_key(self) -> tuple[str | PrivateKeyTypes, str]:
315        """Get either the configured certificate or the client secret"""
316        if not self.signing_key:
317            # No Certificate at all, assume HS256
318            return self.client_secret, JWTAlgorithms.HS256
319        key: CertificateKeyPair = self.signing_key
320        private_key = key.private_key
321        return private_key, JWTAlgorithms.from_private_key(private_key)
322
323    def get_issuer(self, request: HttpRequest) -> str | None:
324        """Get issuer, based on request"""
325        if self.issuer_mode == IssuerMode.GLOBAL:
326            return request.build_absolute_uri(reverse("authentik_core:root-redirect"))
327        try:
328            url = reverse(
329                "authentik_providers_oauth2:provider-root",
330                kwargs={
331                    "application_slug": self.application.slug,
332                },
333            )
334            return request.build_absolute_uri(url)
335        except Provider.application.RelatedObjectDoesNotExist:
336            return None
337
338    @property
339    def redirect_uris(self) -> list[RedirectURI]:
340        uris = []
341        for entry in self._redirect_uris:
342            uris.append(
343                from_dict(
344                    RedirectURI,
345                    entry,
346                    config=Config(type_hooks={RedirectURIMatchingMode: RedirectURIMatchingMode}),
347                )
348            )
349        return uris
350
351    @redirect_uris.setter
352    def redirect_uris(self, value: list[RedirectURI]):
353        cleansed = []
354        for entry in value:
355            cleansed.append(asdict(entry))
356        self._redirect_uris = cleansed
357
358    @property
359    def launch_url(self) -> str | None:
360        """Guess launch_url based on first redirect_uri"""
361        redirects = self.redirect_uris
362        if len(redirects) < 1:
363            return None
364        main_url = redirects[0].url
365        try:
366            launch_url = urlparse(main_url)._replace(path="")
367            return urlunparse(launch_url)
368        except ValueError as exc:
369            LOGGER.warning("Failed to format launch url", exc=exc)
370            return None
371
372    @property
373    def icon_url(self) -> str | None:
374        return static("authentik/sources/openidconnect.svg")
375
376    @property
377    def component(self) -> str:
378        return "ak-provider-oauth2-form"
379
380    @property
381    def serializer(self) -> type[Serializer]:
382        from authentik.providers.oauth2.api.providers import OAuth2ProviderSerializer
383
384        return OAuth2ProviderSerializer
385
386    def __str__(self):
387        return f"OAuth2 Provider {self.name}"
388
389    def encode(self, payload: dict[str, Any], jwt_type: str | None = None) -> str:
390        """Represent the ID Token as a JSON Web Token (JWT).
391
392        :param payload The payload to encode into the JWT
393        :param jwt_type The type of the JWT. This will be put in the JWT header using the `typ`
394            parameter. See RFC7515 Section 4.1.9. If not set fallback to the default of `JWT`.
395        """
396        headers = {}
397        if self.signing_key:
398            headers["kid"] = self.signing_key.kid
399        if jwt_type is not None:
400            headers["typ"] = jwt_type
401        key, alg = self.jwt_key
402        encoded = encode(payload, key, algorithm=alg, headers=headers)
403        if self.encryption_key:
404            return self.encrypt(encoded)
405        return encoded
406
407    def encrypt(self, raw: str) -> str:
408        """Encrypt JWT"""
409        key = JWK.from_pem(self.encryption_key.certificate_data.encode())
410        jwe = JWE(
411            raw,
412            json_encode(
413                {
414                    "alg": "RSA-OAEP-256",
415                    "enc": "A256CBC-HS512",
416                    "typ": "JWE",
417                    "kid": self.encryption_key.kid,
418                }
419            ),
420        )
421        jwe.add_recipient(key)
422        return jwe.serialize(compact=True)
423
424    def webfinger(self, resource: str, request: HttpRequest):
425        return {
426            "subject": resource,
427            "links": [
428                {
429                    "rel": "http://openid.net/specs/connect/1.0/issuer",
430                    "href": request.build_absolute_uri(
431                        reverse(
432                            "authentik_providers_oauth2:provider-root",
433                            kwargs={
434                                "application_slug": self.application.slug,
435                            },
436                        )
437                    ),
438                },
439            ],
440        }
441
442    class Meta:
443        verbose_name = _("OAuth2/OpenID Provider")
444        verbose_name_plural = _("OAuth2/OpenID Providers")
445
446
447class BaseGrantModel(models.Model):
448    """Base Model for all grants"""
449
450    provider = models.ForeignKey(OAuth2Provider, on_delete=models.CASCADE)
451    user = models.ForeignKey(User, verbose_name=_("User"), on_delete=models.CASCADE)
452    revoked = models.BooleanField(default=False)
453    _scope = models.TextField(default="", verbose_name=_("Scopes"))
454    auth_time = models.DateTimeField(verbose_name="Authentication time")
455    session = models.ForeignKey(
456        AuthenticatedSession, null=True, on_delete=models.CASCADE, default=None
457    )
458
459    class Meta:
460        abstract = True
461
462    @property
463    def scope(self) -> list[str]:
464        """Return scopes as list of strings"""
465        return self._scope.split()
466
467    @scope.setter
468    def scope(self, value):
469        self._scope = " ".join(value)
470
471
472class AuthorizationCode(InternallyManagedMixin, SerializerModel, ExpiringModel, BaseGrantModel):
473    """OAuth2 Authorization Code"""
474
475    code = models.CharField(max_length=255, unique=True, verbose_name=_("Code"))
476    nonce = models.TextField(null=True, default=None, verbose_name=_("Nonce"))
477    code_challenge = models.CharField(max_length=255, null=True, verbose_name=_("Code Challenge"))
478    code_challenge_method = models.CharField(
479        max_length=255, null=True, verbose_name=_("Code Challenge Method")
480    )
481
482    class Meta:
483        verbose_name = _("Authorization Code")
484        verbose_name_plural = _("Authorization Codes")
485        indexes = ExpiringModel.Meta.indexes
486
487    def __str__(self):
488        return f"Authorization code for {self.provider_id} for user {self.user_id}"
489
490    @property
491    def serializer(self) -> Serializer:
492        from authentik.providers.oauth2.api.tokens import ExpiringBaseGrantModelSerializer
493
494        return ExpiringBaseGrantModelSerializer
495
496    @property
497    def c_hash(self):
498        """https://openid.net/specs/openid-connect-core-1_0.html#IDToken"""
499        hashed_code = sha256(self.code.encode("ascii")).hexdigest().encode("ascii")
500        return (
501            base64.urlsafe_b64encode(binascii.unhexlify(hashed_code[: len(hashed_code) // 2]))
502            .rstrip(b"=")
503            .decode("ascii")
504        )
505
506
507class AccessToken(InternallyManagedMixin, SerializerModel, ExpiringModel, BaseGrantModel):
508    """OAuth2 access token, non-opaque using a JWT as identifier"""
509
510    token = models.TextField()
511    _id_token = models.TextField()
512
513    class Meta:
514        indexes = ExpiringModel.Meta.indexes + [
515            HashIndex(fields=["token"]),
516        ]
517        verbose_name = _("OAuth2 Access Token")
518        verbose_name_plural = _("OAuth2 Access Tokens")
519
520    def __str__(self):
521        return f"Access Token for {self.provider_id} for user {self.user_id}"
522
523    @property
524    def id_token(self) -> IDToken:
525        """Load ID Token from json"""
526        from authentik.providers.oauth2.id_token import IDToken
527
528        raw_token = json.loads(self._id_token)
529        return from_dict(IDToken, raw_token)
530
531    @id_token.setter
532    def id_token(self, value: IDToken):
533        self.token = value.to_access_token(self.provider, self)
534        self._id_token = json.dumps(asdict(value))
535
536    @property
537    def at_hash(self):
538        """Get hashed access_token"""
539        hashed_access_token = sha256(self.token.encode("ascii")).hexdigest().encode("ascii")
540        return (
541            base64.urlsafe_b64encode(
542                binascii.unhexlify(hashed_access_token[: len(hashed_access_token) // 2])
543            )
544            .rstrip(b"=")
545            .decode("ascii")
546        )
547
548    @property
549    def serializer(self) -> Serializer:
550        from authentik.providers.oauth2.api.tokens import TokenModelSerializer
551
552        return TokenModelSerializer
553
554
555class RefreshToken(InternallyManagedMixin, SerializerModel, ExpiringModel, BaseGrantModel):
556    """OAuth2 Refresh Token, opaque"""
557
558    token = models.TextField(default=generate_client_secret)
559    _id_token = models.TextField(verbose_name=_("ID Token"))
560    # Shadow the `session` field from `BaseGrantModel` as we want refresh tokens to persist even
561    # when the session is terminated.
562    session = models.ForeignKey(
563        AuthenticatedSession, null=True, on_delete=models.SET_DEFAULT, default=None
564    )
565
566    class Meta:
567        indexes = ExpiringModel.Meta.indexes + [
568            HashIndex(fields=["token"]),
569        ]
570        verbose_name = _("OAuth2 Refresh Token")
571        verbose_name_plural = _("OAuth2 Refresh Tokens")
572
573    def __str__(self):
574        return f"Refresh Token for {self.provider_id} for user {self.user_id}"
575
576    @property
577    def id_token(self) -> IDToken:
578        """Load ID Token from json"""
579        from authentik.providers.oauth2.id_token import IDToken
580
581        raw_token = json.loads(self._id_token)
582        return from_dict(IDToken, raw_token)
583
584    @id_token.setter
585    def id_token(self, value: IDToken):
586        self._id_token = json.dumps(asdict(value))
587
588    @property
589    def serializer(self) -> Serializer:
590        from authentik.providers.oauth2.api.tokens import TokenModelSerializer
591
592        return TokenModelSerializer
593
594
595class DeviceToken(InternallyManagedMixin, ExpiringModel):
596    """Temporary device token for OAuth device flow"""
597
598    user = models.ForeignKey(
599        "authentik_core.User", default=None, on_delete=models.CASCADE, null=True
600    )
601    provider = models.ForeignKey(OAuth2Provider, on_delete=models.CASCADE)
602    device_code = models.TextField(default=generate_key)
603    user_code = models.TextField(default=generate_code_fixed_length)
604    _scope = models.TextField(default="", verbose_name=_("Scopes"))
605    session = models.ForeignKey(
606        AuthenticatedSession, null=True, on_delete=models.SET_DEFAULT, default=None
607    )
608
609    @property
610    def scope(self) -> list[str]:
611        """Return scopes as list of strings"""
612        return self._scope.split()
613
614    @scope.setter
615    def scope(self, value):
616        self._scope = " ".join(value)
617
618    class Meta:
619        verbose_name = _("Device Token")
620        verbose_name_plural = _("Device Tokens")
621        indexes = ExpiringModel.Meta.indexes
622
623    def __str__(self):
624        return f"Device Token for {self.provider_id}"
LOGGER = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
def generate_client_secret() -> str:
57def generate_client_secret() -> str:
58    """Generate client secret with adequate length"""
59    return generate_id(128)

Generate client secret with adequate length

class ClientTypes(django.db.models.enums.TextChoices):
62class ClientTypes(models.TextChoices):
63    """Confidential clients are capable of maintaining the confidentiality
64    of their credentials. Public clients are incapable."""
65
66    CONFIDENTIAL = "confidential", _("Confidential")
67    PUBLIC = "public", _("Public")

Confidential clients are capable of maintaining the confidentiality of their credentials. Public clients are incapable.

CONFIDENTIAL = ClientTypes.CONFIDENTIAL
class GrantTypes(django.db.models.enums.TextChoices):
70class GrantTypes(models.TextChoices):
71    """OAuth2 Grant types we support"""
72
73    AUTHORIZATION_CODE = "authorization_code"
74    IMPLICIT = "implicit"
75    HYBRID = "hybrid"

OAuth2 Grant types we support

AUTHORIZATION_CODE = GrantTypes.AUTHORIZATION_CODE
IMPLICIT = GrantTypes.IMPLICIT
class ResponseMode(django.db.models.enums.TextChoices):
78class ResponseMode(models.TextChoices):
79    """https://openid.net/specs/oauth-v2-multiple-response-types-1_0.html#OAuth.Post"""
80
81    QUERY = "query"
82    FRAGMENT = "fragment"
83    FORM_POST = "form_post"
class IssuerMode(django.db.models.enums.TextChoices):
86class IssuerMode(models.TextChoices):
87    """Configure how the `iss` field is created."""
88
89    GLOBAL = "global", _("Same identifier is used for all providers")
90    PER_PROVIDER = (
91        "per_provider",
92        _("Each provider has a different issuer, based on the application slug."),
93    )

Configure how the iss field is created.

PER_PROVIDER = IssuerMode.PER_PROVIDER
class RedirectURIMatchingMode(django.db.models.enums.TextChoices):
96class RedirectURIMatchingMode(models.TextChoices):
97    STRICT = "strict", _("Strict URL comparison")
98    REGEX = "regex", _("Regular Expression URL matching")

Class for creating enumerated string choices.

class OAuth2LogoutMethod(django.db.models.enums.TextChoices):
101class OAuth2LogoutMethod(models.TextChoices):
102    """OAuth2/OIDC Logout methods"""
103
104    BACKCHANNEL = "backchannel", _("Back-channel")
105    FRONTCHANNEL = "frontchannel", _("Front-channel")

OAuth2/OIDC Logout methods

@dataclass
class RedirectURI:
108@dataclass
109class RedirectURI:
110    """A single redirect URI entry"""
111
112    matching_mode: RedirectURIMatchingMode
113    url: str

A single redirect URI entry

RedirectURI( matching_mode: RedirectURIMatchingMode, url: str)
matching_mode: RedirectURIMatchingMode
url: str
class ResponseTypes(django.db.models.enums.TextChoices):
116class ResponseTypes(models.TextChoices):
117    """Response Type required by the client."""
118
119    CODE = "code", _("code (Authorization Code Flow)")
120    ID_TOKEN = "id_token", _("id_token (Implicit Flow)")
121    ID_TOKEN_TOKEN = "id_token token", _("id_token token (Implicit Flow)")
122    CODE_TOKEN = "code token", _("code token (Hybrid Flow)")
123    CODE_ID_TOKEN = "code id_token", _("code id_token (Hybrid Flow)")
124    CODE_ID_TOKEN_TOKEN = "code id_token token", _("code id_token token (Hybrid Flow)")

Response Type required by the client.

ID_TOKEN_TOKEN = ResponseTypes.ID_TOKEN_TOKEN
CODE_ID_TOKEN = ResponseTypes.CODE_ID_TOKEN
CODE_ID_TOKEN_TOKEN = ResponseTypes.CODE_ID_TOKEN_TOKEN
class JWTAlgorithms(django.db.models.enums.TextChoices):
127class JWTAlgorithms(models.TextChoices):
128    """Algorithm used to sign the JWT Token"""
129
130    HS256 = "HS256", _("HS256 (Symmetric Encryption)")
131    RS256 = "RS256", _("RS256 (Asymmetric Encryption)")
132    ES256 = "ES256", _("ES256 (Asymmetric Encryption)")
133    ES384 = "ES384", _("ES384 (Asymmetric Encryption)")
134    ES512 = "ES512", _("ES512 (Asymmetric Encryption)")
135
136    @classmethod
137    def from_private_key(cls, private_key: PrivateKeyTypes | None) -> str:
138        if isinstance(private_key, RSAPrivateKey):
139            return cls.RS256
140        if isinstance(private_key, EllipticCurvePrivateKey):
141            curve = private_key.curve
142            if isinstance(curve, SECP256R1):
143                return cls.ES256
144            if isinstance(curve, SECP384R1):
145                return cls.ES384
146            if isinstance(curve, SECP521R1):
147                return cls.ES512
148        raise ValueError(f"Invalid private key type: {type(private_key)}")

Algorithm used to sign the JWT Token

@classmethod
def from_private_key( cls, private_key: cryptography.hazmat.primitives.asymmetric.dh.DHPrivateKey | cryptography.hazmat.primitives.asymmetric.ed25519.Ed25519PrivateKey | cryptography.hazmat.primitives.asymmetric.ed448.Ed448PrivateKey | cryptography.hazmat.primitives.asymmetric.rsa.RSAPrivateKey | cryptography.hazmat.primitives.asymmetric.dsa.DSAPrivateKey | cryptography.hazmat.primitives.asymmetric.ec.EllipticCurvePrivateKey | cryptography.hazmat.primitives.asymmetric.x25519.X25519PrivateKey | cryptography.hazmat.primitives.asymmetric.x448.X448PrivateKey | None) -> str:
136    @classmethod
137    def from_private_key(cls, private_key: PrivateKeyTypes | None) -> str:
138        if isinstance(private_key, RSAPrivateKey):
139            return cls.RS256
140        if isinstance(private_key, EllipticCurvePrivateKey):
141            curve = private_key.curve
142            if isinstance(curve, SECP256R1):
143                return cls.ES256
144            if isinstance(curve, SECP384R1):
145                return cls.ES384
146            if isinstance(curve, SECP521R1):
147                return cls.ES512
148        raise ValueError(f"Invalid private key type: {type(private_key)}")
class ScopeMapping(authentik.core.models.PropertyMapping):
151class ScopeMapping(PropertyMapping):
152    """Map an OAuth Scope to users properties"""
153
154    scope_name = models.TextField(help_text=_("Scope used by the client"))
155    description = models.TextField(
156        blank=True,
157        help_text=_(
158            "Description shown to the user when consenting. "
159            "If left empty, the user won't be informed."
160        ),
161    )
162
163    @property
164    def component(self) -> str:
165        return "ak-property-mapping-provider-scope-form"
166
167    @property
168    def serializer(self) -> type[Serializer]:
169        from authentik.providers.oauth2.api.scopes import ScopeMappingSerializer
170
171        return ScopeMappingSerializer
172
173    def __str__(self):
174        return f"Scope Mapping {self.name} ({self.scope_name})"
175
176    class Meta:
177        verbose_name = _("Scope Mapping")
178        verbose_name_plural = _("Scope Mappings")

Map an OAuth Scope to users properties

def scope_name(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def description(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

component: str
163    @property
164    def component(self) -> str:
165        return "ak-property-mapping-provider-scope-form"

Return component used to edit this object

serializer: type[rest_framework.serializers.Serializer]
167    @property
168    def serializer(self) -> type[Serializer]:
169        from authentik.providers.oauth2.api.scopes import ScopeMappingSerializer
170
171        return ScopeMappingSerializer

Get serializer for this model

propertymapping_ptr_id
propertymapping_ptr

Accessor to the related object on the forward side of a one-to-one relation.

In the example::

class Restaurant(Model):
    place = OneToOneField(Place, related_name='restaurant')

Restaurant.place is a ForwardOneToOneDescriptor instance.

class ScopeMapping.DoesNotExist(authentik.core.models.PropertyMapping.DoesNotExist):

The requested object does not exist

class ScopeMapping.MultipleObjectsReturned(authentik.core.models.PropertyMapping.MultipleObjectsReturned):

The query returned multiple objects when only one was expected.

181class OAuth2Provider(WebfingerProvider, Provider):
182    """OAuth2 Provider for generic OAuth and OpenID Connect Applications."""
183
184    client_type = models.CharField(
185        max_length=30,
186        choices=ClientTypes.choices,
187        default=ClientTypes.CONFIDENTIAL,
188        verbose_name=_("Client Type"),
189        help_text=_(
190            "Confidential clients are capable of maintaining the confidentiality "
191            "of their credentials. Public clients are incapable"
192        ),
193    )
194    client_id = models.CharField(
195        max_length=255,
196        unique=True,
197        verbose_name=_("Client ID"),
198        default=generate_id,
199    )
200    client_secret = models.CharField(
201        max_length=255,
202        blank=True,
203        verbose_name=_("Client Secret"),
204        default=generate_client_secret,
205    )
206    _redirect_uris = models.JSONField(
207        default=list,
208        verbose_name=_("Redirect URIs"),
209    )
210    logout_uri = models.TextField(
211        validators=[DomainlessURLValidator(schemes=("http", "https"))],
212        verbose_name=_("Logout URI"),
213        blank=True,
214    )
215    logout_method = models.TextField(
216        choices=OAuth2LogoutMethod.choices,
217        default=OAuth2LogoutMethod.BACKCHANNEL,
218        verbose_name=_("Logout Method"),
219        help_text=_(
220            "Backchannel logs out with server to server calls. "
221            "Frontchannel uses iframes in your browser"
222        ),
223    )
224
225    include_claims_in_id_token = models.BooleanField(
226        default=True,
227        verbose_name=_("Include claims in id_token"),
228        help_text=_(
229            "Include User claims from scopes in the id_token, for applications "
230            "that don't access the userinfo endpoint."
231        ),
232    )
233
234    access_code_validity = models.TextField(
235        default="minutes=1",
236        validators=[timedelta_string_validator],
237        help_text=_(
238            "Access codes not valid on or after current time + this value "
239            "(Format: hours=1;minutes=2;seconds=3)."
240        ),
241    )
242    access_token_validity = models.TextField(
243        default="hours=1",
244        validators=[timedelta_string_validator],
245        help_text=_(
246            "Tokens not valid on or after current time + this value "
247            "(Format: hours=1;minutes=2;seconds=3)."
248        ),
249    )
250    refresh_token_validity = models.TextField(
251        default="days=30",
252        validators=[timedelta_string_validator],
253        help_text=_(
254            "Tokens not valid on or after current time + this value "
255            "(Format: hours=1;minutes=2;seconds=3)."
256        ),
257    )
258    refresh_token_threshold = models.TextField(
259        default="seconds=0",
260        validators=[timedelta_string_validator],
261        help_text=_(
262            "When refreshing a token, if the refresh token is valid for less than "
263            "this duration, it will be renewed. "
264            "When set to seconds=0, token will always be renewed. "
265            "(Format: hours=1;minutes=2;seconds=3)."
266        ),
267    )
268
269    sub_mode = models.TextField(
270        choices=SubModes.choices,
271        default=SubModes.HASHED_USER_ID,
272        help_text=_(
273            "Configure what data should be used as unique User Identifier. For most cases, "
274            "the default should be fine."
275        ),
276    )
277    issuer_mode = models.TextField(
278        choices=IssuerMode.choices,
279        default=IssuerMode.PER_PROVIDER,
280        help_text=_("Configure how the issuer field of the ID Token should be filled."),
281    )
282
283    signing_key = models.ForeignKey(
284        CertificateKeyPair,
285        verbose_name=_("Signing Key"),
286        on_delete=models.SET_NULL,
287        null=True,
288        help_text=_("Key used to sign the tokens."),
289        related_name="oauth2provider_signing_key_set",
290    )
291    encryption_key = models.ForeignKey(
292        CertificateKeyPair,
293        verbose_name=_("Encryption Key"),
294        on_delete=models.SET_NULL,
295        null=True,
296        help_text=_(
297            "Key used to encrypt the tokens. When set, "
298            "tokens will be encrypted and returned as JWEs."
299        ),
300        related_name="oauth2provider_encryption_key_set",
301    )
302
303    jwt_federation_sources = models.ManyToManyField(
304        OAuthSource,
305        verbose_name=_(
306            "Any JWT signed by the JWK of the selected source can be used to authenticate."
307        ),
308        related_name="oauth2_providers",
309        default=None,
310        blank=True,
311    )
312    jwt_federation_providers = models.ManyToManyField("OAuth2Provider", blank=True, default=None)
313
314    @cached_property
315    def jwt_key(self) -> tuple[str | PrivateKeyTypes, str]:
316        """Get either the configured certificate or the client secret"""
317        if not self.signing_key:
318            # No Certificate at all, assume HS256
319            return self.client_secret, JWTAlgorithms.HS256
320        key: CertificateKeyPair = self.signing_key
321        private_key = key.private_key
322        return private_key, JWTAlgorithms.from_private_key(private_key)
323
324    def get_issuer(self, request: HttpRequest) -> str | None:
325        """Get issuer, based on request"""
326        if self.issuer_mode == IssuerMode.GLOBAL:
327            return request.build_absolute_uri(reverse("authentik_core:root-redirect"))
328        try:
329            url = reverse(
330                "authentik_providers_oauth2:provider-root",
331                kwargs={
332                    "application_slug": self.application.slug,
333                },
334            )
335            return request.build_absolute_uri(url)
336        except Provider.application.RelatedObjectDoesNotExist:
337            return None
338
339    @property
340    def redirect_uris(self) -> list[RedirectURI]:
341        uris = []
342        for entry in self._redirect_uris:
343            uris.append(
344                from_dict(
345                    RedirectURI,
346                    entry,
347                    config=Config(type_hooks={RedirectURIMatchingMode: RedirectURIMatchingMode}),
348                )
349            )
350        return uris
351
352    @redirect_uris.setter
353    def redirect_uris(self, value: list[RedirectURI]):
354        cleansed = []
355        for entry in value:
356            cleansed.append(asdict(entry))
357        self._redirect_uris = cleansed
358
359    @property
360    def launch_url(self) -> str | None:
361        """Guess launch_url based on first redirect_uri"""
362        redirects = self.redirect_uris
363        if len(redirects) < 1:
364            return None
365        main_url = redirects[0].url
366        try:
367            launch_url = urlparse(main_url)._replace(path="")
368            return urlunparse(launch_url)
369        except ValueError as exc:
370            LOGGER.warning("Failed to format launch url", exc=exc)
371            return None
372
373    @property
374    def icon_url(self) -> str | None:
375        return static("authentik/sources/openidconnect.svg")
376
377    @property
378    def component(self) -> str:
379        return "ak-provider-oauth2-form"
380
381    @property
382    def serializer(self) -> type[Serializer]:
383        from authentik.providers.oauth2.api.providers import OAuth2ProviderSerializer
384
385        return OAuth2ProviderSerializer
386
387    def __str__(self):
388        return f"OAuth2 Provider {self.name}"
389
390    def encode(self, payload: dict[str, Any], jwt_type: str | None = None) -> str:
391        """Represent the ID Token as a JSON Web Token (JWT).
392
393        :param payload The payload to encode into the JWT
394        :param jwt_type The type of the JWT. This will be put in the JWT header using the `typ`
395            parameter. See RFC7515 Section 4.1.9. If not set fallback to the default of `JWT`.
396        """
397        headers = {}
398        if self.signing_key:
399            headers["kid"] = self.signing_key.kid
400        if jwt_type is not None:
401            headers["typ"] = jwt_type
402        key, alg = self.jwt_key
403        encoded = encode(payload, key, algorithm=alg, headers=headers)
404        if self.encryption_key:
405            return self.encrypt(encoded)
406        return encoded
407
408    def encrypt(self, raw: str) -> str:
409        """Encrypt JWT"""
410        key = JWK.from_pem(self.encryption_key.certificate_data.encode())
411        jwe = JWE(
412            raw,
413            json_encode(
414                {
415                    "alg": "RSA-OAEP-256",
416                    "enc": "A256CBC-HS512",
417                    "typ": "JWE",
418                    "kid": self.encryption_key.kid,
419                }
420            ),
421        )
422        jwe.add_recipient(key)
423        return jwe.serialize(compact=True)
424
425    def webfinger(self, resource: str, request: HttpRequest):
426        return {
427            "subject": resource,
428            "links": [
429                {
430                    "rel": "http://openid.net/specs/connect/1.0/issuer",
431                    "href": request.build_absolute_uri(
432                        reverse(
433                            "authentik_providers_oauth2:provider-root",
434                            kwargs={
435                                "application_slug": self.application.slug,
436                            },
437                        )
438                    ),
439                },
440            ],
441        }
442
443    class Meta:
444        verbose_name = _("OAuth2/OpenID Provider")
445        verbose_name_plural = _("OAuth2/OpenID Providers")

OAuth2 Provider for generic OAuth and OpenID Connect Applications.

def client_type(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def client_id(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def client_secret(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def logout_uri(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def logout_method(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def include_claims_in_id_token(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def access_code_validity(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def access_token_validity(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def refresh_token_validity(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def refresh_token_threshold(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def sub_mode(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def issuer_mode(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

signing_key

Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.

In the example::

class Child(Model):
    parent = ForeignKey(Parent, related_name='children')

Child.parent is a ForwardManyToOneDescriptor instance.

encryption_key

Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.

In the example::

class Child(Model):
    parent = ForeignKey(Parent, related_name='children')

Child.parent is a ForwardManyToOneDescriptor instance.

jwt_federation_sources

Accessor to the related objects manager on the forward and reverse sides of a many-to-many relation.

In the example::

class Pizza(Model):
    toppings = ManyToManyField(Topping, related_name='pizzas')

Pizza.toppings and Topping.pizzas are ManyToManyDescriptor instances.

Most of the implementation is delegated to a dynamically defined manager class built by create_forward_many_to_many_manager() defined below.

jwt_federation_providers

Accessor to the related objects manager on the forward and reverse sides of a many-to-many relation.

In the example::

class Pizza(Model):
    toppings = ManyToManyField(Topping, related_name='pizzas')

Pizza.toppings and Topping.pizzas are ManyToManyDescriptor instances.

Most of the implementation is delegated to a dynamically defined manager class built by create_forward_many_to_many_manager() defined below.

jwt_key: tuple[str | cryptography.hazmat.primitives.asymmetric.dh.DHPrivateKey | cryptography.hazmat.primitives.asymmetric.ed25519.Ed25519PrivateKey | cryptography.hazmat.primitives.asymmetric.ed448.Ed448PrivateKey | cryptography.hazmat.primitives.asymmetric.rsa.RSAPrivateKey | cryptography.hazmat.primitives.asymmetric.dsa.DSAPrivateKey | cryptography.hazmat.primitives.asymmetric.ec.EllipticCurvePrivateKey | cryptography.hazmat.primitives.asymmetric.x25519.X25519PrivateKey | cryptography.hazmat.primitives.asymmetric.x448.X448PrivateKey, str]
314    @cached_property
315    def jwt_key(self) -> tuple[str | PrivateKeyTypes, str]:
316        """Get either the configured certificate or the client secret"""
317        if not self.signing_key:
318            # No Certificate at all, assume HS256
319            return self.client_secret, JWTAlgorithms.HS256
320        key: CertificateKeyPair = self.signing_key
321        private_key = key.private_key
322        return private_key, JWTAlgorithms.from_private_key(private_key)

Get either the configured certificate or the client secret

def get_issuer(self, request: django.http.request.HttpRequest) -> str | None:
324    def get_issuer(self, request: HttpRequest) -> str | None:
325        """Get issuer, based on request"""
326        if self.issuer_mode == IssuerMode.GLOBAL:
327            return request.build_absolute_uri(reverse("authentik_core:root-redirect"))
328        try:
329            url = reverse(
330                "authentik_providers_oauth2:provider-root",
331                kwargs={
332                    "application_slug": self.application.slug,
333                },
334            )
335            return request.build_absolute_uri(url)
336        except Provider.application.RelatedObjectDoesNotExist:
337            return None

Get issuer, based on request

redirect_uris: list[RedirectURI]
339    @property
340    def redirect_uris(self) -> list[RedirectURI]:
341        uris = []
342        for entry in self._redirect_uris:
343            uris.append(
344                from_dict(
345                    RedirectURI,
346                    entry,
347                    config=Config(type_hooks={RedirectURIMatchingMode: RedirectURIMatchingMode}),
348                )
349            )
350        return uris
launch_url: str | None
359    @property
360    def launch_url(self) -> str | None:
361        """Guess launch_url based on first redirect_uri"""
362        redirects = self.redirect_uris
363        if len(redirects) < 1:
364            return None
365        main_url = redirects[0].url
366        try:
367            launch_url = urlparse(main_url)._replace(path="")
368            return urlunparse(launch_url)
369        except ValueError as exc:
370            LOGGER.warning("Failed to format launch url", exc=exc)
371            return None

Guess launch_url based on first redirect_uri

icon_url: str | None
373    @property
374    def icon_url(self) -> str | None:
375        return static("authentik/sources/openidconnect.svg")
component: str
377    @property
378    def component(self) -> str:
379        return "ak-provider-oauth2-form"

Return component used to edit this object

serializer: type[rest_framework.serializers.Serializer]
381    @property
382    def serializer(self) -> type[Serializer]:
383        from authentik.providers.oauth2.api.providers import OAuth2ProviderSerializer
384
385        return OAuth2ProviderSerializer

Get serializer for this model

def encode(self, payload: dict[str, typing.Any], jwt_type: str | None = None) -> str:
390    def encode(self, payload: dict[str, Any], jwt_type: str | None = None) -> str:
391        """Represent the ID Token as a JSON Web Token (JWT).
392
393        :param payload The payload to encode into the JWT
394        :param jwt_type The type of the JWT. This will be put in the JWT header using the `typ`
395            parameter. See RFC7515 Section 4.1.9. If not set fallback to the default of `JWT`.
396        """
397        headers = {}
398        if self.signing_key:
399            headers["kid"] = self.signing_key.kid
400        if jwt_type is not None:
401            headers["typ"] = jwt_type
402        key, alg = self.jwt_key
403        encoded = encode(payload, key, algorithm=alg, headers=headers)
404        if self.encryption_key:
405            return self.encrypt(encoded)
406        return encoded

Represent the ID Token as a JSON Web Token (JWT).

:param payload The payload to encode into the JWT :param jwt_type The type of the JWT. This will be put in the JWT header using the typ parameter. See RFC7515 Section 4.1.9. If not set fallback to the default of JWT.

def encrypt(self, raw: str) -> str:
408    def encrypt(self, raw: str) -> str:
409        """Encrypt JWT"""
410        key = JWK.from_pem(self.encryption_key.certificate_data.encode())
411        jwe = JWE(
412            raw,
413            json_encode(
414                {
415                    "alg": "RSA-OAEP-256",
416                    "enc": "A256CBC-HS512",
417                    "typ": "JWE",
418                    "kid": self.encryption_key.kid,
419                }
420            ),
421        )
422        jwe.add_recipient(key)
423        return jwe.serialize(compact=True)

Encrypt JWT

def webfinger(self, resource: str, request: django.http.request.HttpRequest):
425    def webfinger(self, resource: str, request: HttpRequest):
426        return {
427            "subject": resource,
428            "links": [
429                {
430                    "rel": "http://openid.net/specs/connect/1.0/issuer",
431                    "href": request.build_absolute_uri(
432                        reverse(
433                            "authentik_providers_oauth2:provider-root",
434                            kwargs={
435                                "application_slug": self.application.slug,
436                            },
437                        )
438                    ),
439                },
440            ],
441        }
def get_client_type_display(unknown):

Method descriptor with partial application of the given arguments and keywords.

Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.

def get_logout_method_display(unknown):

Method descriptor with partial application of the given arguments and keywords.

Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.

def get_sub_mode_display(unknown):

Method descriptor with partial application of the given arguments and keywords.

Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.

def get_issuer_mode_display(unknown):

Method descriptor with partial application of the given arguments and keywords.

Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.

signing_key_id
encryption_key_id
provider_ptr_id
provider_ptr

Accessor to the related object on the forward side of a one-to-one relation.

In the example::

class Restaurant(Model):
    place = OneToOneField(Place, related_name='restaurant')

Restaurant.place is a ForwardOneToOneDescriptor instance.

agentconnector_set

Accessor to the related objects manager on the forward and reverse sides of a many-to-many relation.

In the example::

class Pizza(Model):
    toppings = ManyToManyField(Topping, related_name='pizzas')

Pizza.toppings and Topping.pizzas are ManyToManyDescriptor instances.

Most of the implementation is delegated to a dynamically defined manager class built by create_forward_many_to_many_manager() defined below.

oauth2provider_set

Accessor to the related objects manager on the forward and reverse sides of a many-to-many relation.

In the example::

class Pizza(Model):
    toppings = ManyToManyField(Topping, related_name='pizzas')

Pizza.toppings and Topping.pizzas are ManyToManyDescriptor instances.

Most of the implementation is delegated to a dynamically defined manager class built by create_forward_many_to_many_manager() defined below.

authorizationcode_set

Accessor to the related objects manager on the reverse side of a many-to-one relation.

In the example::

class Child(Model):
    parent = ForeignKey(Parent, related_name='children')

Parent.children is a ReverseManyToOneDescriptor instance.

Most of the implementation is delegated to a dynamically defined manager class built by create_forward_many_to_many_manager() defined below.

accesstoken_set

Accessor to the related objects manager on the reverse side of a many-to-one relation.

In the example::

class Child(Model):
    parent = ForeignKey(Parent, related_name='children')

Parent.children is a ReverseManyToOneDescriptor instance.

Most of the implementation is delegated to a dynamically defined manager class built by create_forward_many_to_many_manager() defined below.

refreshtoken_set

Accessor to the related objects manager on the reverse side of a many-to-one relation.

In the example::

class Child(Model):
    parent = ForeignKey(Parent, related_name='children')

Parent.children is a ReverseManyToOneDescriptor instance.

Most of the implementation is delegated to a dynamically defined manager class built by create_forward_many_to_many_manager() defined below.

devicetoken_set

Accessor to the related objects manager on the reverse side of a many-to-one relation.

In the example::

class Child(Model):
    parent = ForeignKey(Parent, related_name='children')

Parent.children is a ReverseManyToOneDescriptor instance.

Most of the implementation is delegated to a dynamically defined manager class built by create_forward_many_to_many_manager() defined below.

proxyprovider

Accessor to the related object on the reverse side of a one-to-one relation.

In the example::

class Restaurant(Model):
    place = OneToOneField(Place, related_name='restaurant')

Place.restaurant is a ReverseOneToOneDescriptor instance.

ssfprovider_set

Accessor to the related objects manager on the forward and reverse sides of a many-to-many relation.

In the example::

class Pizza(Model):
    toppings = ManyToManyField(Topping, related_name='pizzas')

Pizza.toppings and Topping.pizzas are ManyToManyDescriptor instances.

Most of the implementation is delegated to a dynamically defined manager class built by create_forward_many_to_many_manager() defined below.

class OAuth2Provider.DoesNotExist(authentik.core.models.Provider.DoesNotExist):

The requested object does not exist

class OAuth2Provider.MultipleObjectsReturned(authentik.core.models.Provider.MultipleObjectsReturned):

The query returned multiple objects when only one was expected.

class BaseGrantModel(django.db.models.base.Model):
448class BaseGrantModel(models.Model):
449    """Base Model for all grants"""
450
451    provider = models.ForeignKey(OAuth2Provider, on_delete=models.CASCADE)
452    user = models.ForeignKey(User, verbose_name=_("User"), on_delete=models.CASCADE)
453    revoked = models.BooleanField(default=False)
454    _scope = models.TextField(default="", verbose_name=_("Scopes"))
455    auth_time = models.DateTimeField(verbose_name="Authentication time")
456    session = models.ForeignKey(
457        AuthenticatedSession, null=True, on_delete=models.CASCADE, default=None
458    )
459
460    class Meta:
461        abstract = True
462
463    @property
464    def scope(self) -> list[str]:
465        """Return scopes as list of strings"""
466        return self._scope.split()
467
468    @scope.setter
469    def scope(self, value):
470        self._scope = " ".join(value)

Base Model for all grants

provider

Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.

In the example::

class Child(Model):
    parent = ForeignKey(Parent, related_name='children')

Child.parent is a ForwardManyToOneDescriptor instance.

user

Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.

In the example::

class Child(Model):
    parent = ForeignKey(Parent, related_name='children')

Child.parent is a ForwardManyToOneDescriptor instance.

def revoked(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def auth_time(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

session

Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.

In the example::

class Child(Model):
    parent = ForeignKey(Parent, related_name='children')

Child.parent is a ForwardManyToOneDescriptor instance.

scope: list[str]
463    @property
464    def scope(self) -> list[str]:
465        """Return scopes as list of strings"""
466        return self._scope.split()

Return scopes as list of strings

provider_id
user_id
def get_next_by_auth_time(unknown):

Method descriptor with partial application of the given arguments and keywords.

Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.

def get_previous_by_auth_time(unknown):

Method descriptor with partial application of the given arguments and keywords.

Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.

session_id
class BaseGrantModel.Meta:
460    class Meta:
461        abstract = True
abstract = False
473class AuthorizationCode(InternallyManagedMixin, SerializerModel, ExpiringModel, BaseGrantModel):
474    """OAuth2 Authorization Code"""
475
476    code = models.CharField(max_length=255, unique=True, verbose_name=_("Code"))
477    nonce = models.TextField(null=True, default=None, verbose_name=_("Nonce"))
478    code_challenge = models.CharField(max_length=255, null=True, verbose_name=_("Code Challenge"))
479    code_challenge_method = models.CharField(
480        max_length=255, null=True, verbose_name=_("Code Challenge Method")
481    )
482
483    class Meta:
484        verbose_name = _("Authorization Code")
485        verbose_name_plural = _("Authorization Codes")
486        indexes = ExpiringModel.Meta.indexes
487
488    def __str__(self):
489        return f"Authorization code for {self.provider_id} for user {self.user_id}"
490
491    @property
492    def serializer(self) -> Serializer:
493        from authentik.providers.oauth2.api.tokens import ExpiringBaseGrantModelSerializer
494
495        return ExpiringBaseGrantModelSerializer
496
497    @property
498    def c_hash(self):
499        """https://openid.net/specs/openid-connect-core-1_0.html#IDToken"""
500        hashed_code = sha256(self.code.encode("ascii")).hexdigest().encode("ascii")
501        return (
502            base64.urlsafe_b64encode(binascii.unhexlify(hashed_code[: len(hashed_code) // 2]))
503            .rstrip(b"=")
504            .decode("ascii")
505        )

OAuth2 Authorization Code

def code(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def nonce(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def code_challenge(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def code_challenge_method(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

serializer: rest_framework.serializers.Serializer
491    @property
492    def serializer(self) -> Serializer:
493        from authentik.providers.oauth2.api.tokens import ExpiringBaseGrantModelSerializer
494
495        return ExpiringBaseGrantModelSerializer

Get serializer for this model

c_hash
497    @property
498    def c_hash(self):
499        """https://openid.net/specs/openid-connect-core-1_0.html#IDToken"""
500        hashed_code = sha256(self.code.encode("ascii")).hexdigest().encode("ascii")
501        return (
502            base64.urlsafe_b64encode(binascii.unhexlify(hashed_code[: len(hashed_code) // 2]))
503            .rstrip(b"=")
504            .decode("ascii")
505        )
def expires(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def expiring(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

provider

Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.

In the example::

class Child(Model):
    parent = ForeignKey(Parent, related_name='children')

Child.parent is a ForwardManyToOneDescriptor instance.

user

Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.

In the example::

class Child(Model):
    parent = ForeignKey(Parent, related_name='children')

Child.parent is a ForwardManyToOneDescriptor instance.

def revoked(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def auth_time(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

session

Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.

In the example::

class Child(Model):
    parent = ForeignKey(Parent, related_name='children')

Child.parent is a ForwardManyToOneDescriptor instance.

provider_id
user_id
def get_next_by_auth_time(unknown):

Method descriptor with partial application of the given arguments and keywords.

Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.

def get_previous_by_auth_time(unknown):

Method descriptor with partial application of the given arguments and keywords.

Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.

session_id
def id(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

class AuthorizationCode.DoesNotExist(django.core.exceptions.ObjectDoesNotExist):

The requested object does not exist

class AuthorizationCode.MultipleObjectsReturned(django.core.exceptions.MultipleObjectsReturned):

The query returned multiple objects when only one was expected.

508class AccessToken(InternallyManagedMixin, SerializerModel, ExpiringModel, BaseGrantModel):
509    """OAuth2 access token, non-opaque using a JWT as identifier"""
510
511    token = models.TextField()
512    _id_token = models.TextField()
513
514    class Meta:
515        indexes = ExpiringModel.Meta.indexes + [
516            HashIndex(fields=["token"]),
517        ]
518        verbose_name = _("OAuth2 Access Token")
519        verbose_name_plural = _("OAuth2 Access Tokens")
520
521    def __str__(self):
522        return f"Access Token for {self.provider_id} for user {self.user_id}"
523
524    @property
525    def id_token(self) -> IDToken:
526        """Load ID Token from json"""
527        from authentik.providers.oauth2.id_token import IDToken
528
529        raw_token = json.loads(self._id_token)
530        return from_dict(IDToken, raw_token)
531
532    @id_token.setter
533    def id_token(self, value: IDToken):
534        self.token = value.to_access_token(self.provider, self)
535        self._id_token = json.dumps(asdict(value))
536
537    @property
538    def at_hash(self):
539        """Get hashed access_token"""
540        hashed_access_token = sha256(self.token.encode("ascii")).hexdigest().encode("ascii")
541        return (
542            base64.urlsafe_b64encode(
543                binascii.unhexlify(hashed_access_token[: len(hashed_access_token) // 2])
544            )
545            .rstrip(b"=")
546            .decode("ascii")
547        )
548
549    @property
550    def serializer(self) -> Serializer:
551        from authentik.providers.oauth2.api.tokens import TokenModelSerializer
552
553        return TokenModelSerializer

OAuth2 access token, non-opaque using a JWT as identifier

def token(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

id_token
524    @property
525    def id_token(self) -> IDToken:
526        """Load ID Token from json"""
527        from authentik.providers.oauth2.id_token import IDToken
528
529        raw_token = json.loads(self._id_token)
530        return from_dict(IDToken, raw_token)

Load ID Token from json

at_hash
537    @property
538    def at_hash(self):
539        """Get hashed access_token"""
540        hashed_access_token = sha256(self.token.encode("ascii")).hexdigest().encode("ascii")
541        return (
542            base64.urlsafe_b64encode(
543                binascii.unhexlify(hashed_access_token[: len(hashed_access_token) // 2])
544            )
545            .rstrip(b"=")
546            .decode("ascii")
547        )

Get hashed access_token

serializer: rest_framework.serializers.Serializer
549    @property
550    def serializer(self) -> Serializer:
551        from authentik.providers.oauth2.api.tokens import TokenModelSerializer
552
553        return TokenModelSerializer

Get serializer for this model

def expires(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def expiring(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

provider

Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.

In the example::

class Child(Model):
    parent = ForeignKey(Parent, related_name='children')

Child.parent is a ForwardManyToOneDescriptor instance.

user

Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.

In the example::

class Child(Model):
    parent = ForeignKey(Parent, related_name='children')

Child.parent is a ForwardManyToOneDescriptor instance.

def revoked(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def auth_time(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

session

Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.

In the example::

class Child(Model):
    parent = ForeignKey(Parent, related_name='children')

Child.parent is a ForwardManyToOneDescriptor instance.

provider_id
user_id
def get_next_by_auth_time(unknown):

Method descriptor with partial application of the given arguments and keywords.

Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.

def get_previous_by_auth_time(unknown):

Method descriptor with partial application of the given arguments and keywords.

Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.

session_id
def id(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

class AccessToken.DoesNotExist(django.core.exceptions.ObjectDoesNotExist):

The requested object does not exist

class AccessToken.MultipleObjectsReturned(django.core.exceptions.MultipleObjectsReturned):

The query returned multiple objects when only one was expected.

556class RefreshToken(InternallyManagedMixin, SerializerModel, ExpiringModel, BaseGrantModel):
557    """OAuth2 Refresh Token, opaque"""
558
559    token = models.TextField(default=generate_client_secret)
560    _id_token = models.TextField(verbose_name=_("ID Token"))
561    # Shadow the `session` field from `BaseGrantModel` as we want refresh tokens to persist even
562    # when the session is terminated.
563    session = models.ForeignKey(
564        AuthenticatedSession, null=True, on_delete=models.SET_DEFAULT, default=None
565    )
566
567    class Meta:
568        indexes = ExpiringModel.Meta.indexes + [
569            HashIndex(fields=["token"]),
570        ]
571        verbose_name = _("OAuth2 Refresh Token")
572        verbose_name_plural = _("OAuth2 Refresh Tokens")
573
574    def __str__(self):
575        return f"Refresh Token for {self.provider_id} for user {self.user_id}"
576
577    @property
578    def id_token(self) -> IDToken:
579        """Load ID Token from json"""
580        from authentik.providers.oauth2.id_token import IDToken
581
582        raw_token = json.loads(self._id_token)
583        return from_dict(IDToken, raw_token)
584
585    @id_token.setter
586    def id_token(self, value: IDToken):
587        self._id_token = json.dumps(asdict(value))
588
589    @property
590    def serializer(self) -> Serializer:
591        from authentik.providers.oauth2.api.tokens import TokenModelSerializer
592
593        return TokenModelSerializer

OAuth2 Refresh Token, opaque

def token(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

session

Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.

In the example::

class Child(Model):
    parent = ForeignKey(Parent, related_name='children')

Child.parent is a ForwardManyToOneDescriptor instance.

id_token
577    @property
578    def id_token(self) -> IDToken:
579        """Load ID Token from json"""
580        from authentik.providers.oauth2.id_token import IDToken
581
582        raw_token = json.loads(self._id_token)
583        return from_dict(IDToken, raw_token)

Load ID Token from json

serializer: rest_framework.serializers.Serializer
589    @property
590    def serializer(self) -> Serializer:
591        from authentik.providers.oauth2.api.tokens import TokenModelSerializer
592
593        return TokenModelSerializer

Get serializer for this model

def expires(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def expiring(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

provider

Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.

In the example::

class Child(Model):
    parent = ForeignKey(Parent, related_name='children')

Child.parent is a ForwardManyToOneDescriptor instance.

user

Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.

In the example::

class Child(Model):
    parent = ForeignKey(Parent, related_name='children')

Child.parent is a ForwardManyToOneDescriptor instance.

def revoked(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def auth_time(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

session_id
provider_id
user_id
def get_next_by_auth_time(unknown):

Method descriptor with partial application of the given arguments and keywords.

Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.

def get_previous_by_auth_time(unknown):

Method descriptor with partial application of the given arguments and keywords.

Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.

def id(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

class RefreshToken.DoesNotExist(django.core.exceptions.ObjectDoesNotExist):

The requested object does not exist

class RefreshToken.MultipleObjectsReturned(django.core.exceptions.MultipleObjectsReturned):

The query returned multiple objects when only one was expected.

596class DeviceToken(InternallyManagedMixin, ExpiringModel):
597    """Temporary device token for OAuth device flow"""
598
599    user = models.ForeignKey(
600        "authentik_core.User", default=None, on_delete=models.CASCADE, null=True
601    )
602    provider = models.ForeignKey(OAuth2Provider, on_delete=models.CASCADE)
603    device_code = models.TextField(default=generate_key)
604    user_code = models.TextField(default=generate_code_fixed_length)
605    _scope = models.TextField(default="", verbose_name=_("Scopes"))
606    session = models.ForeignKey(
607        AuthenticatedSession, null=True, on_delete=models.SET_DEFAULT, default=None
608    )
609
610    @property
611    def scope(self) -> list[str]:
612        """Return scopes as list of strings"""
613        return self._scope.split()
614
615    @scope.setter
616    def scope(self, value):
617        self._scope = " ".join(value)
618
619    class Meta:
620        verbose_name = _("Device Token")
621        verbose_name_plural = _("Device Tokens")
622        indexes = ExpiringModel.Meta.indexes
623
624    def __str__(self):
625        return f"Device Token for {self.provider_id}"

Temporary device token for OAuth device flow

user

Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.

In the example::

class Child(Model):
    parent = ForeignKey(Parent, related_name='children')

Child.parent is a ForwardManyToOneDescriptor instance.

provider

Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.

In the example::

class Child(Model):
    parent = ForeignKey(Parent, related_name='children')

Child.parent is a ForwardManyToOneDescriptor instance.

def device_code(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def user_code(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

session

Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.

In the example::

class Child(Model):
    parent = ForeignKey(Parent, related_name='children')

Child.parent is a ForwardManyToOneDescriptor instance.

scope: list[str]
610    @property
611    def scope(self) -> list[str]:
612        """Return scopes as list of strings"""
613        return self._scope.split()

Return scopes as list of strings

def expires(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def expiring(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

user_id
provider_id
session_id
def id(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

class DeviceToken.DoesNotExist(django.core.exceptions.ObjectDoesNotExist):

The requested object does not exist

class DeviceToken.MultipleObjectsReturned(django.core.exceptions.MultipleObjectsReturned):

The query returned multiple objects when only one was expected.