authentik.providers.oauth2.models
OAuth Provider Models
1"""OAuth Provider Models""" 2 3import base64 4import binascii 5import json 6from dataclasses import asdict, dataclass 7from functools import cached_property 8from hashlib import sha256 9from typing import TYPE_CHECKING, Any 10from urllib.parse import urlparse, urlunparse 11 12from cryptography.hazmat.primitives.asymmetric.ec import ( 13 SECP256R1, 14 SECP384R1, 15 SECP521R1, 16 EllipticCurvePrivateKey, 17) 18from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey 19from cryptography.hazmat.primitives.asymmetric.types import PrivateKeyTypes 20from dacite import Config 21from dacite.core import from_dict 22from django.contrib.postgres.indexes import HashIndex 23from django.db import models 24from django.http import HttpRequest 25from django.templatetags.static import static 26from django.urls import reverse 27from django.utils.translation import gettext_lazy as _ 28from jwcrypto.common import json_encode 29from jwcrypto.jwe import JWE 30from jwcrypto.jwk import JWK 31from jwt import encode 32from rest_framework.serializers import Serializer 33from structlog.stdlib import get_logger 34 35from authentik.brands.models import WebfingerProvider 36from authentik.common.oauth.constants import SubModes 37from authentik.core.models import ( 38 AuthenticatedSession, 39 ExpiringModel, 40 PropertyMapping, 41 Provider, 42 User, 43) 44from authentik.crypto.models import CertificateKeyPair 45from authentik.lib.generators import generate_code_fixed_length, generate_id, generate_key 46from authentik.lib.models import DomainlessURLValidator, InternallyManagedMixin, SerializerModel 47from authentik.lib.utils.time import timedelta_string_validator 48from authentik.sources.oauth.models import OAuthSource 49 50if TYPE_CHECKING: 51 from authentik.providers.oauth2.id_token import IDToken 52 53LOGGER = get_logger() 54 55 56def generate_client_secret() -> str: 57 """Generate client secret with adequate length""" 58 return generate_id(128) 59 60 61class ClientTypes(models.TextChoices): 62 """Confidential clients are capable of maintaining the confidentiality 63 of their credentials. Public clients are incapable.""" 64 65 CONFIDENTIAL = "confidential", _("Confidential") 66 PUBLIC = "public", _("Public") 67 68 69class GrantTypes(models.TextChoices): 70 """OAuth2 Grant types we support""" 71 72 AUTHORIZATION_CODE = "authorization_code" 73 IMPLICIT = "implicit" 74 HYBRID = "hybrid" 75 76 77class ResponseMode(models.TextChoices): 78 """https://openid.net/specs/oauth-v2-multiple-response-types-1_0.html#OAuth.Post""" 79 80 QUERY = "query" 81 FRAGMENT = "fragment" 82 FORM_POST = "form_post" 83 84 85class IssuerMode(models.TextChoices): 86 """Configure how the `iss` field is created.""" 87 88 GLOBAL = "global", _("Same identifier is used for all providers") 89 PER_PROVIDER = ( 90 "per_provider", 91 _("Each provider has a different issuer, based on the application slug."), 92 ) 93 94 95class RedirectURIMatchingMode(models.TextChoices): 96 STRICT = "strict", _("Strict URL comparison") 97 REGEX = "regex", _("Regular Expression URL matching") 98 99 100class OAuth2LogoutMethod(models.TextChoices): 101 """OAuth2/OIDC Logout methods""" 102 103 BACKCHANNEL = "backchannel", _("Back-channel") 104 FRONTCHANNEL = "frontchannel", _("Front-channel") 105 106 107@dataclass 108class RedirectURI: 109 """A single redirect URI entry""" 110 111 matching_mode: RedirectURIMatchingMode 112 url: str 113 114 115class ResponseTypes(models.TextChoices): 116 """Response Type required by the client.""" 117 118 CODE = "code", _("code (Authorization Code Flow)") 119 ID_TOKEN = "id_token", _("id_token (Implicit Flow)") 120 ID_TOKEN_TOKEN = "id_token token", _("id_token token (Implicit Flow)") 121 CODE_TOKEN = "code token", _("code token (Hybrid Flow)") 122 CODE_ID_TOKEN = "code id_token", _("code id_token (Hybrid Flow)") 123 CODE_ID_TOKEN_TOKEN = "code id_token token", _("code id_token token (Hybrid Flow)") 124 125 126class JWTAlgorithms(models.TextChoices): 127 """Algorithm used to sign the JWT Token""" 128 129 HS256 = "HS256", _("HS256 (Symmetric Encryption)") 130 RS256 = "RS256", _("RS256 (Asymmetric Encryption)") 131 ES256 = "ES256", _("ES256 (Asymmetric Encryption)") 132 ES384 = "ES384", _("ES384 (Asymmetric Encryption)") 133 ES512 = "ES512", _("ES512 (Asymmetric Encryption)") 134 135 @classmethod 136 def from_private_key(cls, private_key: PrivateKeyTypes | None) -> str: 137 if isinstance(private_key, RSAPrivateKey): 138 return cls.RS256 139 if isinstance(private_key, EllipticCurvePrivateKey): 140 curve = private_key.curve 141 if isinstance(curve, SECP256R1): 142 return cls.ES256 143 if isinstance(curve, SECP384R1): 144 return cls.ES384 145 if isinstance(curve, SECP521R1): 146 return cls.ES512 147 raise ValueError(f"Invalid private key type: {type(private_key)}") 148 149 150class ScopeMapping(PropertyMapping): 151 """Map an OAuth Scope to users properties""" 152 153 scope_name = models.TextField(help_text=_("Scope used by the client")) 154 description = models.TextField( 155 blank=True, 156 help_text=_( 157 "Description shown to the user when consenting. " 158 "If left empty, the user won't be informed." 159 ), 160 ) 161 162 @property 163 def component(self) -> str: 164 return "ak-property-mapping-provider-scope-form" 165 166 @property 167 def serializer(self) -> type[Serializer]: 168 from authentik.providers.oauth2.api.scopes import ScopeMappingSerializer 169 170 return ScopeMappingSerializer 171 172 def __str__(self): 173 return f"Scope Mapping {self.name} ({self.scope_name})" 174 175 class Meta: 176 verbose_name = _("Scope Mapping") 177 verbose_name_plural = _("Scope Mappings") 178 179 180class OAuth2Provider(WebfingerProvider, Provider): 181 """OAuth2 Provider for generic OAuth and OpenID Connect Applications.""" 182 183 client_type = models.CharField( 184 max_length=30, 185 choices=ClientTypes.choices, 186 default=ClientTypes.CONFIDENTIAL, 187 verbose_name=_("Client Type"), 188 help_text=_( 189 "Confidential clients are capable of maintaining the confidentiality " 190 "of their credentials. Public clients are incapable" 191 ), 192 ) 193 client_id = models.CharField( 194 max_length=255, 195 unique=True, 196 verbose_name=_("Client ID"), 197 default=generate_id, 198 ) 199 client_secret = models.CharField( 200 max_length=255, 201 blank=True, 202 verbose_name=_("Client Secret"), 203 default=generate_client_secret, 204 ) 205 _redirect_uris = models.JSONField( 206 default=list, 207 verbose_name=_("Redirect URIs"), 208 ) 209 logout_uri = models.TextField( 210 validators=[DomainlessURLValidator(schemes=("http", "https"))], 211 verbose_name=_("Logout URI"), 212 blank=True, 213 ) 214 logout_method = models.TextField( 215 choices=OAuth2LogoutMethod.choices, 216 default=OAuth2LogoutMethod.BACKCHANNEL, 217 verbose_name=_("Logout Method"), 218 help_text=_( 219 "Backchannel logs out with server to server calls. " 220 "Frontchannel uses iframes in your browser" 221 ), 222 ) 223 224 include_claims_in_id_token = models.BooleanField( 225 default=True, 226 verbose_name=_("Include claims in id_token"), 227 help_text=_( 228 "Include User claims from scopes in the id_token, for applications " 229 "that don't access the userinfo endpoint." 230 ), 231 ) 232 233 access_code_validity = models.TextField( 234 default="minutes=1", 235 validators=[timedelta_string_validator], 236 help_text=_( 237 "Access codes not valid on or after current time + this value " 238 "(Format: hours=1;minutes=2;seconds=3)." 239 ), 240 ) 241 access_token_validity = models.TextField( 242 default="hours=1", 243 validators=[timedelta_string_validator], 244 help_text=_( 245 "Tokens not valid on or after current time + this value " 246 "(Format: hours=1;minutes=2;seconds=3)." 247 ), 248 ) 249 refresh_token_validity = models.TextField( 250 default="days=30", 251 validators=[timedelta_string_validator], 252 help_text=_( 253 "Tokens not valid on or after current time + this value " 254 "(Format: hours=1;minutes=2;seconds=3)." 255 ), 256 ) 257 refresh_token_threshold = models.TextField( 258 default="seconds=0", 259 validators=[timedelta_string_validator], 260 help_text=_( 261 "When refreshing a token, if the refresh token is valid for less than " 262 "this duration, it will be renewed. " 263 "When set to seconds=0, token will always be renewed. " 264 "(Format: hours=1;minutes=2;seconds=3)." 265 ), 266 ) 267 268 sub_mode = models.TextField( 269 choices=SubModes.choices, 270 default=SubModes.HASHED_USER_ID, 271 help_text=_( 272 "Configure what data should be used as unique User Identifier. For most cases, " 273 "the default should be fine." 274 ), 275 ) 276 issuer_mode = models.TextField( 277 choices=IssuerMode.choices, 278 default=IssuerMode.PER_PROVIDER, 279 help_text=_("Configure how the issuer field of the ID Token should be filled."), 280 ) 281 282 signing_key = models.ForeignKey( 283 CertificateKeyPair, 284 verbose_name=_("Signing Key"), 285 on_delete=models.SET_NULL, 286 null=True, 287 help_text=_("Key used to sign the tokens."), 288 related_name="oauth2provider_signing_key_set", 289 ) 290 encryption_key = models.ForeignKey( 291 CertificateKeyPair, 292 verbose_name=_("Encryption Key"), 293 on_delete=models.SET_NULL, 294 null=True, 295 help_text=_( 296 "Key used to encrypt the tokens. When set, " 297 "tokens will be encrypted and returned as JWEs." 298 ), 299 related_name="oauth2provider_encryption_key_set", 300 ) 301 302 jwt_federation_sources = models.ManyToManyField( 303 OAuthSource, 304 verbose_name=_( 305 "Any JWT signed by the JWK of the selected source can be used to authenticate." 306 ), 307 related_name="oauth2_providers", 308 default=None, 309 blank=True, 310 ) 311 jwt_federation_providers = models.ManyToManyField("OAuth2Provider", blank=True, default=None) 312 313 @cached_property 314 def jwt_key(self) -> tuple[str | PrivateKeyTypes, str]: 315 """Get either the configured certificate or the client secret""" 316 if not self.signing_key: 317 # No Certificate at all, assume HS256 318 return self.client_secret, JWTAlgorithms.HS256 319 key: CertificateKeyPair = self.signing_key 320 private_key = key.private_key 321 return private_key, JWTAlgorithms.from_private_key(private_key) 322 323 def get_issuer(self, request: HttpRequest) -> str | None: 324 """Get issuer, based on request""" 325 if self.issuer_mode == IssuerMode.GLOBAL: 326 return request.build_absolute_uri(reverse("authentik_core:root-redirect")) 327 try: 328 url = reverse( 329 "authentik_providers_oauth2:provider-root", 330 kwargs={ 331 "application_slug": self.application.slug, 332 }, 333 ) 334 return request.build_absolute_uri(url) 335 except Provider.application.RelatedObjectDoesNotExist: 336 return None 337 338 @property 339 def redirect_uris(self) -> list[RedirectURI]: 340 uris = [] 341 for entry in self._redirect_uris: 342 uris.append( 343 from_dict( 344 RedirectURI, 345 entry, 346 config=Config(type_hooks={RedirectURIMatchingMode: RedirectURIMatchingMode}), 347 ) 348 ) 349 return uris 350 351 @redirect_uris.setter 352 def redirect_uris(self, value: list[RedirectURI]): 353 cleansed = [] 354 for entry in value: 355 cleansed.append(asdict(entry)) 356 self._redirect_uris = cleansed 357 358 @property 359 def launch_url(self) -> str | None: 360 """Guess launch_url based on first redirect_uri""" 361 redirects = self.redirect_uris 362 if len(redirects) < 1: 363 return None 364 main_url = redirects[0].url 365 try: 366 launch_url = urlparse(main_url)._replace(path="") 367 return urlunparse(launch_url) 368 except ValueError as exc: 369 LOGGER.warning("Failed to format launch url", exc=exc) 370 return None 371 372 @property 373 def icon_url(self) -> str | None: 374 return static("authentik/sources/openidconnect.svg") 375 376 @property 377 def component(self) -> str: 378 return "ak-provider-oauth2-form" 379 380 @property 381 def serializer(self) -> type[Serializer]: 382 from authentik.providers.oauth2.api.providers import OAuth2ProviderSerializer 383 384 return OAuth2ProviderSerializer 385 386 def __str__(self): 387 return f"OAuth2 Provider {self.name}" 388 389 def encode(self, payload: dict[str, Any], jwt_type: str | None = None) -> str: 390 """Represent the ID Token as a JSON Web Token (JWT). 391 392 :param payload The payload to encode into the JWT 393 :param jwt_type The type of the JWT. This will be put in the JWT header using the `typ` 394 parameter. See RFC7515 Section 4.1.9. If not set fallback to the default of `JWT`. 395 """ 396 headers = {} 397 if self.signing_key: 398 headers["kid"] = self.signing_key.kid 399 if jwt_type is not None: 400 headers["typ"] = jwt_type 401 key, alg = self.jwt_key 402 encoded = encode(payload, key, algorithm=alg, headers=headers) 403 if self.encryption_key: 404 return self.encrypt(encoded) 405 return encoded 406 407 def encrypt(self, raw: str) -> str: 408 """Encrypt JWT""" 409 key = JWK.from_pem(self.encryption_key.certificate_data.encode()) 410 jwe = JWE( 411 raw, 412 json_encode( 413 { 414 "alg": "RSA-OAEP-256", 415 "enc": "A256CBC-HS512", 416 "typ": "JWE", 417 "kid": self.encryption_key.kid, 418 } 419 ), 420 ) 421 jwe.add_recipient(key) 422 return jwe.serialize(compact=True) 423 424 def webfinger(self, resource: str, request: HttpRequest): 425 return { 426 "subject": resource, 427 "links": [ 428 { 429 "rel": "http://openid.net/specs/connect/1.0/issuer", 430 "href": request.build_absolute_uri( 431 reverse( 432 "authentik_providers_oauth2:provider-root", 433 kwargs={ 434 "application_slug": self.application.slug, 435 }, 436 ) 437 ), 438 }, 439 ], 440 } 441 442 class Meta: 443 verbose_name = _("OAuth2/OpenID Provider") 444 verbose_name_plural = _("OAuth2/OpenID Providers") 445 446 447class BaseGrantModel(models.Model): 448 """Base Model for all grants""" 449 450 provider = models.ForeignKey(OAuth2Provider, on_delete=models.CASCADE) 451 user = models.ForeignKey(User, verbose_name=_("User"), on_delete=models.CASCADE) 452 revoked = models.BooleanField(default=False) 453 _scope = models.TextField(default="", verbose_name=_("Scopes")) 454 auth_time = models.DateTimeField(verbose_name="Authentication time") 455 session = models.ForeignKey( 456 AuthenticatedSession, null=True, on_delete=models.CASCADE, default=None 457 ) 458 459 class Meta: 460 abstract = True 461 462 @property 463 def scope(self) -> list[str]: 464 """Return scopes as list of strings""" 465 return self._scope.split() 466 467 @scope.setter 468 def scope(self, value): 469 self._scope = " ".join(value) 470 471 472class AuthorizationCode(InternallyManagedMixin, SerializerModel, ExpiringModel, BaseGrantModel): 473 """OAuth2 Authorization Code""" 474 475 code = models.CharField(max_length=255, unique=True, verbose_name=_("Code")) 476 nonce = models.TextField(null=True, default=None, verbose_name=_("Nonce")) 477 code_challenge = models.CharField(max_length=255, null=True, verbose_name=_("Code Challenge")) 478 code_challenge_method = models.CharField( 479 max_length=255, null=True, verbose_name=_("Code Challenge Method") 480 ) 481 482 class Meta: 483 verbose_name = _("Authorization Code") 484 verbose_name_plural = _("Authorization Codes") 485 indexes = ExpiringModel.Meta.indexes 486 487 def __str__(self): 488 return f"Authorization code for {self.provider_id} for user {self.user_id}" 489 490 @property 491 def serializer(self) -> Serializer: 492 from authentik.providers.oauth2.api.tokens import ExpiringBaseGrantModelSerializer 493 494 return ExpiringBaseGrantModelSerializer 495 496 @property 497 def c_hash(self): 498 """https://openid.net/specs/openid-connect-core-1_0.html#IDToken""" 499 hashed_code = sha256(self.code.encode("ascii")).hexdigest().encode("ascii") 500 return ( 501 base64.urlsafe_b64encode(binascii.unhexlify(hashed_code[: len(hashed_code) // 2])) 502 .rstrip(b"=") 503 .decode("ascii") 504 ) 505 506 507class AccessToken(InternallyManagedMixin, SerializerModel, ExpiringModel, BaseGrantModel): 508 """OAuth2 access token, non-opaque using a JWT as identifier""" 509 510 token = models.TextField() 511 _id_token = models.TextField() 512 513 class Meta: 514 indexes = ExpiringModel.Meta.indexes + [ 515 HashIndex(fields=["token"]), 516 ] 517 verbose_name = _("OAuth2 Access Token") 518 verbose_name_plural = _("OAuth2 Access Tokens") 519 520 def __str__(self): 521 return f"Access Token for {self.provider_id} for user {self.user_id}" 522 523 @property 524 def id_token(self) -> IDToken: 525 """Load ID Token from json""" 526 from authentik.providers.oauth2.id_token import IDToken 527 528 raw_token = json.loads(self._id_token) 529 return from_dict(IDToken, raw_token) 530 531 @id_token.setter 532 def id_token(self, value: IDToken): 533 self.token = value.to_access_token(self.provider, self) 534 self._id_token = json.dumps(asdict(value)) 535 536 @property 537 def at_hash(self): 538 """Get hashed access_token""" 539 hashed_access_token = sha256(self.token.encode("ascii")).hexdigest().encode("ascii") 540 return ( 541 base64.urlsafe_b64encode( 542 binascii.unhexlify(hashed_access_token[: len(hashed_access_token) // 2]) 543 ) 544 .rstrip(b"=") 545 .decode("ascii") 546 ) 547 548 @property 549 def serializer(self) -> Serializer: 550 from authentik.providers.oauth2.api.tokens import TokenModelSerializer 551 552 return TokenModelSerializer 553 554 555class RefreshToken(InternallyManagedMixin, SerializerModel, ExpiringModel, BaseGrantModel): 556 """OAuth2 Refresh Token, opaque""" 557 558 token = models.TextField(default=generate_client_secret) 559 _id_token = models.TextField(verbose_name=_("ID Token")) 560 # Shadow the `session` field from `BaseGrantModel` as we want refresh tokens to persist even 561 # when the session is terminated. 562 session = models.ForeignKey( 563 AuthenticatedSession, null=True, on_delete=models.SET_DEFAULT, default=None 564 ) 565 566 class Meta: 567 indexes = ExpiringModel.Meta.indexes + [ 568 HashIndex(fields=["token"]), 569 ] 570 verbose_name = _("OAuth2 Refresh Token") 571 verbose_name_plural = _("OAuth2 Refresh Tokens") 572 573 def __str__(self): 574 return f"Refresh Token for {self.provider_id} for user {self.user_id}" 575 576 @property 577 def id_token(self) -> IDToken: 578 """Load ID Token from json""" 579 from authentik.providers.oauth2.id_token import IDToken 580 581 raw_token = json.loads(self._id_token) 582 return from_dict(IDToken, raw_token) 583 584 @id_token.setter 585 def id_token(self, value: IDToken): 586 self._id_token = json.dumps(asdict(value)) 587 588 @property 589 def serializer(self) -> Serializer: 590 from authentik.providers.oauth2.api.tokens import TokenModelSerializer 591 592 return TokenModelSerializer 593 594 595class DeviceToken(InternallyManagedMixin, ExpiringModel): 596 """Temporary device token for OAuth device flow""" 597 598 user = models.ForeignKey( 599 "authentik_core.User", default=None, on_delete=models.CASCADE, null=True 600 ) 601 provider = models.ForeignKey(OAuth2Provider, on_delete=models.CASCADE) 602 device_code = models.TextField(default=generate_key) 603 user_code = models.TextField(default=generate_code_fixed_length) 604 _scope = models.TextField(default="", verbose_name=_("Scopes")) 605 session = models.ForeignKey( 606 AuthenticatedSession, null=True, on_delete=models.SET_DEFAULT, default=None 607 ) 608 609 @property 610 def scope(self) -> list[str]: 611 """Return scopes as list of strings""" 612 return self._scope.split() 613 614 @scope.setter 615 def scope(self, value): 616 self._scope = " ".join(value) 617 618 class Meta: 619 verbose_name = _("Device Token") 620 verbose_name_plural = _("Device Tokens") 621 indexes = ExpiringModel.Meta.indexes 622 623 def __str__(self): 624 return f"Device Token for {self.provider_id}"
57def generate_client_secret() -> str: 58 """Generate client secret with adequate length""" 59 return generate_id(128)
Generate client secret with adequate length
62class ClientTypes(models.TextChoices): 63 """Confidential clients are capable of maintaining the confidentiality 64 of their credentials. Public clients are incapable.""" 65 66 CONFIDENTIAL = "confidential", _("Confidential") 67 PUBLIC = "public", _("Public")
Confidential clients are capable of maintaining the confidentiality of their credentials. Public clients are incapable.
70class GrantTypes(models.TextChoices): 71 """OAuth2 Grant types we support""" 72 73 AUTHORIZATION_CODE = "authorization_code" 74 IMPLICIT = "implicit" 75 HYBRID = "hybrid"
OAuth2 Grant types we support
78class ResponseMode(models.TextChoices): 79 """https://openid.net/specs/oauth-v2-multiple-response-types-1_0.html#OAuth.Post""" 80 81 QUERY = "query" 82 FRAGMENT = "fragment" 83 FORM_POST = "form_post"
86class IssuerMode(models.TextChoices): 87 """Configure how the `iss` field is created.""" 88 89 GLOBAL = "global", _("Same identifier is used for all providers") 90 PER_PROVIDER = ( 91 "per_provider", 92 _("Each provider has a different issuer, based on the application slug."), 93 )
Configure how the iss field is created.
96class RedirectURIMatchingMode(models.TextChoices): 97 STRICT = "strict", _("Strict URL comparison") 98 REGEX = "regex", _("Regular Expression URL matching")
Class for creating enumerated string choices.
101class OAuth2LogoutMethod(models.TextChoices): 102 """OAuth2/OIDC Logout methods""" 103 104 BACKCHANNEL = "backchannel", _("Back-channel") 105 FRONTCHANNEL = "frontchannel", _("Front-channel")
OAuth2/OIDC Logout methods
108@dataclass 109class RedirectURI: 110 """A single redirect URI entry""" 111 112 matching_mode: RedirectURIMatchingMode 113 url: str
A single redirect URI entry
116class ResponseTypes(models.TextChoices): 117 """Response Type required by the client.""" 118 119 CODE = "code", _("code (Authorization Code Flow)") 120 ID_TOKEN = "id_token", _("id_token (Implicit Flow)") 121 ID_TOKEN_TOKEN = "id_token token", _("id_token token (Implicit Flow)") 122 CODE_TOKEN = "code token", _("code token (Hybrid Flow)") 123 CODE_ID_TOKEN = "code id_token", _("code id_token (Hybrid Flow)") 124 CODE_ID_TOKEN_TOKEN = "code id_token token", _("code id_token token (Hybrid Flow)")
Response Type required by the client.
127class JWTAlgorithms(models.TextChoices): 128 """Algorithm used to sign the JWT Token""" 129 130 HS256 = "HS256", _("HS256 (Symmetric Encryption)") 131 RS256 = "RS256", _("RS256 (Asymmetric Encryption)") 132 ES256 = "ES256", _("ES256 (Asymmetric Encryption)") 133 ES384 = "ES384", _("ES384 (Asymmetric Encryption)") 134 ES512 = "ES512", _("ES512 (Asymmetric Encryption)") 135 136 @classmethod 137 def from_private_key(cls, private_key: PrivateKeyTypes | None) -> str: 138 if isinstance(private_key, RSAPrivateKey): 139 return cls.RS256 140 if isinstance(private_key, EllipticCurvePrivateKey): 141 curve = private_key.curve 142 if isinstance(curve, SECP256R1): 143 return cls.ES256 144 if isinstance(curve, SECP384R1): 145 return cls.ES384 146 if isinstance(curve, SECP521R1): 147 return cls.ES512 148 raise ValueError(f"Invalid private key type: {type(private_key)}")
Algorithm used to sign the JWT Token
136 @classmethod 137 def from_private_key(cls, private_key: PrivateKeyTypes | None) -> str: 138 if isinstance(private_key, RSAPrivateKey): 139 return cls.RS256 140 if isinstance(private_key, EllipticCurvePrivateKey): 141 curve = private_key.curve 142 if isinstance(curve, SECP256R1): 143 return cls.ES256 144 if isinstance(curve, SECP384R1): 145 return cls.ES384 146 if isinstance(curve, SECP521R1): 147 return cls.ES512 148 raise ValueError(f"Invalid private key type: {type(private_key)}")
151class ScopeMapping(PropertyMapping): 152 """Map an OAuth Scope to users properties""" 153 154 scope_name = models.TextField(help_text=_("Scope used by the client")) 155 description = models.TextField( 156 blank=True, 157 help_text=_( 158 "Description shown to the user when consenting. " 159 "If left empty, the user won't be informed." 160 ), 161 ) 162 163 @property 164 def component(self) -> str: 165 return "ak-property-mapping-provider-scope-form" 166 167 @property 168 def serializer(self) -> type[Serializer]: 169 from authentik.providers.oauth2.api.scopes import ScopeMappingSerializer 170 171 return ScopeMappingSerializer 172 173 def __str__(self): 174 return f"Scope Mapping {self.name} ({self.scope_name})" 175 176 class Meta: 177 verbose_name = _("Scope Mapping") 178 verbose_name_plural = _("Scope Mappings")
Map an OAuth Scope to users properties
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
167 @property 168 def serializer(self) -> type[Serializer]: 169 from authentik.providers.oauth2.api.scopes import ScopeMappingSerializer 170 171 return ScopeMappingSerializer
Get serializer for this model
Accessor to the related object on the forward side of a one-to-one relation.
In the example::
class Restaurant(Model):
place = OneToOneField(Place, related_name='restaurant')
Restaurant.place is a ForwardOneToOneDescriptor instance.
Inherited Members
- authentik.core.models.PropertyMapping
- pm_uuid
- name
- expression
- objects
- evaluate
- managed
- provider_set
- source_userpropertymappings_set
- source_grouppropertymappings_set
- notificationwebhookmapping
- oauthsourcepropertymapping
- scopemapping
- endpoint_set
- racpropertymapping
- radiusproviderpropertymapping
- samlsourcepropertymapping
- samlpropertymapping
- scimprovider_set
- scimmapping
- kerberossourcepropertymapping
- ldapsourcepropertymapping
- plexsourcepropertymapping
- scimsourcepropertymapping
- telegramsourcepropertymapping
- googleworkspaceprovider_set
- googleworkspaceprovidermapping
- microsoftentraprovider_set
- microsoftentraprovidermapping
The requested object does not exist
The query returned multiple objects when only one was expected.
181class OAuth2Provider(WebfingerProvider, Provider): 182 """OAuth2 Provider for generic OAuth and OpenID Connect Applications.""" 183 184 client_type = models.CharField( 185 max_length=30, 186 choices=ClientTypes.choices, 187 default=ClientTypes.CONFIDENTIAL, 188 verbose_name=_("Client Type"), 189 help_text=_( 190 "Confidential clients are capable of maintaining the confidentiality " 191 "of their credentials. Public clients are incapable" 192 ), 193 ) 194 client_id = models.CharField( 195 max_length=255, 196 unique=True, 197 verbose_name=_("Client ID"), 198 default=generate_id, 199 ) 200 client_secret = models.CharField( 201 max_length=255, 202 blank=True, 203 verbose_name=_("Client Secret"), 204 default=generate_client_secret, 205 ) 206 _redirect_uris = models.JSONField( 207 default=list, 208 verbose_name=_("Redirect URIs"), 209 ) 210 logout_uri = models.TextField( 211 validators=[DomainlessURLValidator(schemes=("http", "https"))], 212 verbose_name=_("Logout URI"), 213 blank=True, 214 ) 215 logout_method = models.TextField( 216 choices=OAuth2LogoutMethod.choices, 217 default=OAuth2LogoutMethod.BACKCHANNEL, 218 verbose_name=_("Logout Method"), 219 help_text=_( 220 "Backchannel logs out with server to server calls. " 221 "Frontchannel uses iframes in your browser" 222 ), 223 ) 224 225 include_claims_in_id_token = models.BooleanField( 226 default=True, 227 verbose_name=_("Include claims in id_token"), 228 help_text=_( 229 "Include User claims from scopes in the id_token, for applications " 230 "that don't access the userinfo endpoint." 231 ), 232 ) 233 234 access_code_validity = models.TextField( 235 default="minutes=1", 236 validators=[timedelta_string_validator], 237 help_text=_( 238 "Access codes not valid on or after current time + this value " 239 "(Format: hours=1;minutes=2;seconds=3)." 240 ), 241 ) 242 access_token_validity = models.TextField( 243 default="hours=1", 244 validators=[timedelta_string_validator], 245 help_text=_( 246 "Tokens not valid on or after current time + this value " 247 "(Format: hours=1;minutes=2;seconds=3)." 248 ), 249 ) 250 refresh_token_validity = models.TextField( 251 default="days=30", 252 validators=[timedelta_string_validator], 253 help_text=_( 254 "Tokens not valid on or after current time + this value " 255 "(Format: hours=1;minutes=2;seconds=3)." 256 ), 257 ) 258 refresh_token_threshold = models.TextField( 259 default="seconds=0", 260 validators=[timedelta_string_validator], 261 help_text=_( 262 "When refreshing a token, if the refresh token is valid for less than " 263 "this duration, it will be renewed. " 264 "When set to seconds=0, token will always be renewed. " 265 "(Format: hours=1;minutes=2;seconds=3)." 266 ), 267 ) 268 269 sub_mode = models.TextField( 270 choices=SubModes.choices, 271 default=SubModes.HASHED_USER_ID, 272 help_text=_( 273 "Configure what data should be used as unique User Identifier. For most cases, " 274 "the default should be fine." 275 ), 276 ) 277 issuer_mode = models.TextField( 278 choices=IssuerMode.choices, 279 default=IssuerMode.PER_PROVIDER, 280 help_text=_("Configure how the issuer field of the ID Token should be filled."), 281 ) 282 283 signing_key = models.ForeignKey( 284 CertificateKeyPair, 285 verbose_name=_("Signing Key"), 286 on_delete=models.SET_NULL, 287 null=True, 288 help_text=_("Key used to sign the tokens."), 289 related_name="oauth2provider_signing_key_set", 290 ) 291 encryption_key = models.ForeignKey( 292 CertificateKeyPair, 293 verbose_name=_("Encryption Key"), 294 on_delete=models.SET_NULL, 295 null=True, 296 help_text=_( 297 "Key used to encrypt the tokens. When set, " 298 "tokens will be encrypted and returned as JWEs." 299 ), 300 related_name="oauth2provider_encryption_key_set", 301 ) 302 303 jwt_federation_sources = models.ManyToManyField( 304 OAuthSource, 305 verbose_name=_( 306 "Any JWT signed by the JWK of the selected source can be used to authenticate." 307 ), 308 related_name="oauth2_providers", 309 default=None, 310 blank=True, 311 ) 312 jwt_federation_providers = models.ManyToManyField("OAuth2Provider", blank=True, default=None) 313 314 @cached_property 315 def jwt_key(self) -> tuple[str | PrivateKeyTypes, str]: 316 """Get either the configured certificate or the client secret""" 317 if not self.signing_key: 318 # No Certificate at all, assume HS256 319 return self.client_secret, JWTAlgorithms.HS256 320 key: CertificateKeyPair = self.signing_key 321 private_key = key.private_key 322 return private_key, JWTAlgorithms.from_private_key(private_key) 323 324 def get_issuer(self, request: HttpRequest) -> str | None: 325 """Get issuer, based on request""" 326 if self.issuer_mode == IssuerMode.GLOBAL: 327 return request.build_absolute_uri(reverse("authentik_core:root-redirect")) 328 try: 329 url = reverse( 330 "authentik_providers_oauth2:provider-root", 331 kwargs={ 332 "application_slug": self.application.slug, 333 }, 334 ) 335 return request.build_absolute_uri(url) 336 except Provider.application.RelatedObjectDoesNotExist: 337 return None 338 339 @property 340 def redirect_uris(self) -> list[RedirectURI]: 341 uris = [] 342 for entry in self._redirect_uris: 343 uris.append( 344 from_dict( 345 RedirectURI, 346 entry, 347 config=Config(type_hooks={RedirectURIMatchingMode: RedirectURIMatchingMode}), 348 ) 349 ) 350 return uris 351 352 @redirect_uris.setter 353 def redirect_uris(self, value: list[RedirectURI]): 354 cleansed = [] 355 for entry in value: 356 cleansed.append(asdict(entry)) 357 self._redirect_uris = cleansed 358 359 @property 360 def launch_url(self) -> str | None: 361 """Guess launch_url based on first redirect_uri""" 362 redirects = self.redirect_uris 363 if len(redirects) < 1: 364 return None 365 main_url = redirects[0].url 366 try: 367 launch_url = urlparse(main_url)._replace(path="") 368 return urlunparse(launch_url) 369 except ValueError as exc: 370 LOGGER.warning("Failed to format launch url", exc=exc) 371 return None 372 373 @property 374 def icon_url(self) -> str | None: 375 return static("authentik/sources/openidconnect.svg") 376 377 @property 378 def component(self) -> str: 379 return "ak-provider-oauth2-form" 380 381 @property 382 def serializer(self) -> type[Serializer]: 383 from authentik.providers.oauth2.api.providers import OAuth2ProviderSerializer 384 385 return OAuth2ProviderSerializer 386 387 def __str__(self): 388 return f"OAuth2 Provider {self.name}" 389 390 def encode(self, payload: dict[str, Any], jwt_type: str | None = None) -> str: 391 """Represent the ID Token as a JSON Web Token (JWT). 392 393 :param payload The payload to encode into the JWT 394 :param jwt_type The type of the JWT. This will be put in the JWT header using the `typ` 395 parameter. See RFC7515 Section 4.1.9. If not set fallback to the default of `JWT`. 396 """ 397 headers = {} 398 if self.signing_key: 399 headers["kid"] = self.signing_key.kid 400 if jwt_type is not None: 401 headers["typ"] = jwt_type 402 key, alg = self.jwt_key 403 encoded = encode(payload, key, algorithm=alg, headers=headers) 404 if self.encryption_key: 405 return self.encrypt(encoded) 406 return encoded 407 408 def encrypt(self, raw: str) -> str: 409 """Encrypt JWT""" 410 key = JWK.from_pem(self.encryption_key.certificate_data.encode()) 411 jwe = JWE( 412 raw, 413 json_encode( 414 { 415 "alg": "RSA-OAEP-256", 416 "enc": "A256CBC-HS512", 417 "typ": "JWE", 418 "kid": self.encryption_key.kid, 419 } 420 ), 421 ) 422 jwe.add_recipient(key) 423 return jwe.serialize(compact=True) 424 425 def webfinger(self, resource: str, request: HttpRequest): 426 return { 427 "subject": resource, 428 "links": [ 429 { 430 "rel": "http://openid.net/specs/connect/1.0/issuer", 431 "href": request.build_absolute_uri( 432 reverse( 433 "authentik_providers_oauth2:provider-root", 434 kwargs={ 435 "application_slug": self.application.slug, 436 }, 437 ) 438 ), 439 }, 440 ], 441 } 442 443 class Meta: 444 verbose_name = _("OAuth2/OpenID Provider") 445 verbose_name_plural = _("OAuth2/OpenID Providers")
OAuth2 Provider for generic OAuth and OpenID Connect Applications.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Child.parent is a ForwardManyToOneDescriptor instance.
Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Child.parent is a ForwardManyToOneDescriptor instance.
Accessor to the related objects manager on the forward and reverse sides of a many-to-many relation.
In the example::
class Pizza(Model):
toppings = ManyToManyField(Topping, related_name='pizzas')
Pizza.toppings and Topping.pizzas are ManyToManyDescriptor
instances.
Most of the implementation is delegated to a dynamically defined manager
class built by create_forward_many_to_many_manager() defined below.
Accessor to the related objects manager on the forward and reverse sides of a many-to-many relation.
In the example::
class Pizza(Model):
toppings = ManyToManyField(Topping, related_name='pizzas')
Pizza.toppings and Topping.pizzas are ManyToManyDescriptor
instances.
Most of the implementation is delegated to a dynamically defined manager
class built by create_forward_many_to_many_manager() defined below.
314 @cached_property 315 def jwt_key(self) -> tuple[str | PrivateKeyTypes, str]: 316 """Get either the configured certificate or the client secret""" 317 if not self.signing_key: 318 # No Certificate at all, assume HS256 319 return self.client_secret, JWTAlgorithms.HS256 320 key: CertificateKeyPair = self.signing_key 321 private_key = key.private_key 322 return private_key, JWTAlgorithms.from_private_key(private_key)
Get either the configured certificate or the client secret
324 def get_issuer(self, request: HttpRequest) -> str | None: 325 """Get issuer, based on request""" 326 if self.issuer_mode == IssuerMode.GLOBAL: 327 return request.build_absolute_uri(reverse("authentik_core:root-redirect")) 328 try: 329 url = reverse( 330 "authentik_providers_oauth2:provider-root", 331 kwargs={ 332 "application_slug": self.application.slug, 333 }, 334 ) 335 return request.build_absolute_uri(url) 336 except Provider.application.RelatedObjectDoesNotExist: 337 return None
Get issuer, based on request
359 @property 360 def launch_url(self) -> str | None: 361 """Guess launch_url based on first redirect_uri""" 362 redirects = self.redirect_uris 363 if len(redirects) < 1: 364 return None 365 main_url = redirects[0].url 366 try: 367 launch_url = urlparse(main_url)._replace(path="") 368 return urlunparse(launch_url) 369 except ValueError as exc: 370 LOGGER.warning("Failed to format launch url", exc=exc) 371 return None
Guess launch_url based on first redirect_uri
381 @property 382 def serializer(self) -> type[Serializer]: 383 from authentik.providers.oauth2.api.providers import OAuth2ProviderSerializer 384 385 return OAuth2ProviderSerializer
Get serializer for this model
390 def encode(self, payload: dict[str, Any], jwt_type: str | None = None) -> str: 391 """Represent the ID Token as a JSON Web Token (JWT). 392 393 :param payload The payload to encode into the JWT 394 :param jwt_type The type of the JWT. This will be put in the JWT header using the `typ` 395 parameter. See RFC7515 Section 4.1.9. If not set fallback to the default of `JWT`. 396 """ 397 headers = {} 398 if self.signing_key: 399 headers["kid"] = self.signing_key.kid 400 if jwt_type is not None: 401 headers["typ"] = jwt_type 402 key, alg = self.jwt_key 403 encoded = encode(payload, key, algorithm=alg, headers=headers) 404 if self.encryption_key: 405 return self.encrypt(encoded) 406 return encoded
Represent the ID Token as a JSON Web Token (JWT).
:param payload The payload to encode into the JWT
:param jwt_type The type of the JWT. This will be put in the JWT header using the typ
parameter. See RFC7515 Section 4.1.9. If not set fallback to the default of JWT.
408 def encrypt(self, raw: str) -> str: 409 """Encrypt JWT""" 410 key = JWK.from_pem(self.encryption_key.certificate_data.encode()) 411 jwe = JWE( 412 raw, 413 json_encode( 414 { 415 "alg": "RSA-OAEP-256", 416 "enc": "A256CBC-HS512", 417 "typ": "JWE", 418 "kid": self.encryption_key.kid, 419 } 420 ), 421 ) 422 jwe.add_recipient(key) 423 return jwe.serialize(compact=True)
Encrypt JWT
425 def webfinger(self, resource: str, request: HttpRequest): 426 return { 427 "subject": resource, 428 "links": [ 429 { 430 "rel": "http://openid.net/specs/connect/1.0/issuer", 431 "href": request.build_absolute_uri( 432 reverse( 433 "authentik_providers_oauth2:provider-root", 434 kwargs={ 435 "application_slug": self.application.slug, 436 }, 437 ) 438 ), 439 }, 440 ], 441 }
Method descriptor with partial application of the given arguments and keywords.
Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.
Method descriptor with partial application of the given arguments and keywords.
Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.
Method descriptor with partial application of the given arguments and keywords.
Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.
Method descriptor with partial application of the given arguments and keywords.
Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.
Accessor to the related object on the forward side of a one-to-one relation.
In the example::
class Restaurant(Model):
place = OneToOneField(Place, related_name='restaurant')
Restaurant.place is a ForwardOneToOneDescriptor instance.
Accessor to the related objects manager on the forward and reverse sides of a many-to-many relation.
In the example::
class Pizza(Model):
toppings = ManyToManyField(Topping, related_name='pizzas')
Pizza.toppings and Topping.pizzas are ManyToManyDescriptor
instances.
Most of the implementation is delegated to a dynamically defined manager
class built by create_forward_many_to_many_manager() defined below.
Accessor to the related objects manager on the forward and reverse sides of a many-to-many relation.
In the example::
class Pizza(Model):
toppings = ManyToManyField(Topping, related_name='pizzas')
Pizza.toppings and Topping.pizzas are ManyToManyDescriptor
instances.
Most of the implementation is delegated to a dynamically defined manager
class built by create_forward_many_to_many_manager() defined below.
Accessor to the related objects manager on the reverse side of a many-to-one relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Parent.children is a ReverseManyToOneDescriptor instance.
Most of the implementation is delegated to a dynamically defined manager
class built by create_forward_many_to_many_manager() defined below.
Accessor to the related objects manager on the reverse side of a many-to-one relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Parent.children is a ReverseManyToOneDescriptor instance.
Most of the implementation is delegated to a dynamically defined manager
class built by create_forward_many_to_many_manager() defined below.
Accessor to the related objects manager on the reverse side of a many-to-one relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Parent.children is a ReverseManyToOneDescriptor instance.
Most of the implementation is delegated to a dynamically defined manager
class built by create_forward_many_to_many_manager() defined below.
Accessor to the related object on the reverse side of a one-to-one relation.
In the example::
class Restaurant(Model):
place = OneToOneField(Place, related_name='restaurant')
Place.restaurant is a ReverseOneToOneDescriptor instance.
Accessor to the related objects manager on the forward and reverse sides of a many-to-many relation.
In the example::
class Pizza(Model):
toppings = ManyToManyField(Topping, related_name='pizzas')
Pizza.toppings and Topping.pizzas are ManyToManyDescriptor
instances.
Most of the implementation is delegated to a dynamically defined manager
class built by create_forward_many_to_many_manager() defined below.
Inherited Members
- authentik.core.models.Provider
- name
- authentication_flow
- invalidation_flow
- property_mappings
- backchannel_application
- is_backchannel
- objects
- authentication_flow_id
- invalidation_flow_id
- backchannel_application_id
- id
- application
- outpost_set
- oauth2provider
- ldapprovider
- racprovider
- radiusprovider
- samlprovider
- scimprovider
- googleworkspaceprovider
- microsoftentraprovider
- ssfprovider
The requested object does not exist
The query returned multiple objects when only one was expected.
448class BaseGrantModel(models.Model): 449 """Base Model for all grants""" 450 451 provider = models.ForeignKey(OAuth2Provider, on_delete=models.CASCADE) 452 user = models.ForeignKey(User, verbose_name=_("User"), on_delete=models.CASCADE) 453 revoked = models.BooleanField(default=False) 454 _scope = models.TextField(default="", verbose_name=_("Scopes")) 455 auth_time = models.DateTimeField(verbose_name="Authentication time") 456 session = models.ForeignKey( 457 AuthenticatedSession, null=True, on_delete=models.CASCADE, default=None 458 ) 459 460 class Meta: 461 abstract = True 462 463 @property 464 def scope(self) -> list[str]: 465 """Return scopes as list of strings""" 466 return self._scope.split() 467 468 @scope.setter 469 def scope(self, value): 470 self._scope = " ".join(value)
Base Model for all grants
Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Child.parent is a ForwardManyToOneDescriptor instance.
Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Child.parent is a ForwardManyToOneDescriptor instance.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Child.parent is a ForwardManyToOneDescriptor instance.
463 @property 464 def scope(self) -> list[str]: 465 """Return scopes as list of strings""" 466 return self._scope.split()
Return scopes as list of strings
Method descriptor with partial application of the given arguments and keywords.
Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.
473class AuthorizationCode(InternallyManagedMixin, SerializerModel, ExpiringModel, BaseGrantModel): 474 """OAuth2 Authorization Code""" 475 476 code = models.CharField(max_length=255, unique=True, verbose_name=_("Code")) 477 nonce = models.TextField(null=True, default=None, verbose_name=_("Nonce")) 478 code_challenge = models.CharField(max_length=255, null=True, verbose_name=_("Code Challenge")) 479 code_challenge_method = models.CharField( 480 max_length=255, null=True, verbose_name=_("Code Challenge Method") 481 ) 482 483 class Meta: 484 verbose_name = _("Authorization Code") 485 verbose_name_plural = _("Authorization Codes") 486 indexes = ExpiringModel.Meta.indexes 487 488 def __str__(self): 489 return f"Authorization code for {self.provider_id} for user {self.user_id}" 490 491 @property 492 def serializer(self) -> Serializer: 493 from authentik.providers.oauth2.api.tokens import ExpiringBaseGrantModelSerializer 494 495 return ExpiringBaseGrantModelSerializer 496 497 @property 498 def c_hash(self): 499 """https://openid.net/specs/openid-connect-core-1_0.html#IDToken""" 500 hashed_code = sha256(self.code.encode("ascii")).hexdigest().encode("ascii") 501 return ( 502 base64.urlsafe_b64encode(binascii.unhexlify(hashed_code[: len(hashed_code) // 2])) 503 .rstrip(b"=") 504 .decode("ascii") 505 )
OAuth2 Authorization Code
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
491 @property 492 def serializer(self) -> Serializer: 493 from authentik.providers.oauth2.api.tokens import ExpiringBaseGrantModelSerializer 494 495 return ExpiringBaseGrantModelSerializer
Get serializer for this model
497 @property 498 def c_hash(self): 499 """https://openid.net/specs/openid-connect-core-1_0.html#IDToken""" 500 hashed_code = sha256(self.code.encode("ascii")).hexdigest().encode("ascii") 501 return ( 502 base64.urlsafe_b64encode(binascii.unhexlify(hashed_code[: len(hashed_code) // 2])) 503 .rstrip(b"=") 504 .decode("ascii") 505 )
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Child.parent is a ForwardManyToOneDescriptor instance.
Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Child.parent is a ForwardManyToOneDescriptor instance.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Child.parent is a ForwardManyToOneDescriptor instance.
Method descriptor with partial application of the given arguments and keywords.
Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.
Method descriptor with partial application of the given arguments and keywords.
Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.
The requested object does not exist
The query returned multiple objects when only one was expected.
508class AccessToken(InternallyManagedMixin, SerializerModel, ExpiringModel, BaseGrantModel): 509 """OAuth2 access token, non-opaque using a JWT as identifier""" 510 511 token = models.TextField() 512 _id_token = models.TextField() 513 514 class Meta: 515 indexes = ExpiringModel.Meta.indexes + [ 516 HashIndex(fields=["token"]), 517 ] 518 verbose_name = _("OAuth2 Access Token") 519 verbose_name_plural = _("OAuth2 Access Tokens") 520 521 def __str__(self): 522 return f"Access Token for {self.provider_id} for user {self.user_id}" 523 524 @property 525 def id_token(self) -> IDToken: 526 """Load ID Token from json""" 527 from authentik.providers.oauth2.id_token import IDToken 528 529 raw_token = json.loads(self._id_token) 530 return from_dict(IDToken, raw_token) 531 532 @id_token.setter 533 def id_token(self, value: IDToken): 534 self.token = value.to_access_token(self.provider, self) 535 self._id_token = json.dumps(asdict(value)) 536 537 @property 538 def at_hash(self): 539 """Get hashed access_token""" 540 hashed_access_token = sha256(self.token.encode("ascii")).hexdigest().encode("ascii") 541 return ( 542 base64.urlsafe_b64encode( 543 binascii.unhexlify(hashed_access_token[: len(hashed_access_token) // 2]) 544 ) 545 .rstrip(b"=") 546 .decode("ascii") 547 ) 548 549 @property 550 def serializer(self) -> Serializer: 551 from authentik.providers.oauth2.api.tokens import TokenModelSerializer 552 553 return TokenModelSerializer
OAuth2 access token, non-opaque using a JWT as identifier
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
524 @property 525 def id_token(self) -> IDToken: 526 """Load ID Token from json""" 527 from authentik.providers.oauth2.id_token import IDToken 528 529 raw_token = json.loads(self._id_token) 530 return from_dict(IDToken, raw_token)
Load ID Token from json
537 @property 538 def at_hash(self): 539 """Get hashed access_token""" 540 hashed_access_token = sha256(self.token.encode("ascii")).hexdigest().encode("ascii") 541 return ( 542 base64.urlsafe_b64encode( 543 binascii.unhexlify(hashed_access_token[: len(hashed_access_token) // 2]) 544 ) 545 .rstrip(b"=") 546 .decode("ascii") 547 )
Get hashed access_token
549 @property 550 def serializer(self) -> Serializer: 551 from authentik.providers.oauth2.api.tokens import TokenModelSerializer 552 553 return TokenModelSerializer
Get serializer for this model
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Child.parent is a ForwardManyToOneDescriptor instance.
Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Child.parent is a ForwardManyToOneDescriptor instance.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Child.parent is a ForwardManyToOneDescriptor instance.
Method descriptor with partial application of the given arguments and keywords.
Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.
Method descriptor with partial application of the given arguments and keywords.
Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.
The requested object does not exist
The query returned multiple objects when only one was expected.
556class RefreshToken(InternallyManagedMixin, SerializerModel, ExpiringModel, BaseGrantModel): 557 """OAuth2 Refresh Token, opaque""" 558 559 token = models.TextField(default=generate_client_secret) 560 _id_token = models.TextField(verbose_name=_("ID Token")) 561 # Shadow the `session` field from `BaseGrantModel` as we want refresh tokens to persist even 562 # when the session is terminated. 563 session = models.ForeignKey( 564 AuthenticatedSession, null=True, on_delete=models.SET_DEFAULT, default=None 565 ) 566 567 class Meta: 568 indexes = ExpiringModel.Meta.indexes + [ 569 HashIndex(fields=["token"]), 570 ] 571 verbose_name = _("OAuth2 Refresh Token") 572 verbose_name_plural = _("OAuth2 Refresh Tokens") 573 574 def __str__(self): 575 return f"Refresh Token for {self.provider_id} for user {self.user_id}" 576 577 @property 578 def id_token(self) -> IDToken: 579 """Load ID Token from json""" 580 from authentik.providers.oauth2.id_token import IDToken 581 582 raw_token = json.loads(self._id_token) 583 return from_dict(IDToken, raw_token) 584 585 @id_token.setter 586 def id_token(self, value: IDToken): 587 self._id_token = json.dumps(asdict(value)) 588 589 @property 590 def serializer(self) -> Serializer: 591 from authentik.providers.oauth2.api.tokens import TokenModelSerializer 592 593 return TokenModelSerializer
OAuth2 Refresh Token, opaque
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Child.parent is a ForwardManyToOneDescriptor instance.
577 @property 578 def id_token(self) -> IDToken: 579 """Load ID Token from json""" 580 from authentik.providers.oauth2.id_token import IDToken 581 582 raw_token = json.loads(self._id_token) 583 return from_dict(IDToken, raw_token)
Load ID Token from json
589 @property 590 def serializer(self) -> Serializer: 591 from authentik.providers.oauth2.api.tokens import TokenModelSerializer 592 593 return TokenModelSerializer
Get serializer for this model
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Child.parent is a ForwardManyToOneDescriptor instance.
Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Child.parent is a ForwardManyToOneDescriptor instance.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
Method descriptor with partial application of the given arguments and keywords.
Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.
Method descriptor with partial application of the given arguments and keywords.
Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.
The requested object does not exist
The query returned multiple objects when only one was expected.
596class DeviceToken(InternallyManagedMixin, ExpiringModel): 597 """Temporary device token for OAuth device flow""" 598 599 user = models.ForeignKey( 600 "authentik_core.User", default=None, on_delete=models.CASCADE, null=True 601 ) 602 provider = models.ForeignKey(OAuth2Provider, on_delete=models.CASCADE) 603 device_code = models.TextField(default=generate_key) 604 user_code = models.TextField(default=generate_code_fixed_length) 605 _scope = models.TextField(default="", verbose_name=_("Scopes")) 606 session = models.ForeignKey( 607 AuthenticatedSession, null=True, on_delete=models.SET_DEFAULT, default=None 608 ) 609 610 @property 611 def scope(self) -> list[str]: 612 """Return scopes as list of strings""" 613 return self._scope.split() 614 615 @scope.setter 616 def scope(self, value): 617 self._scope = " ".join(value) 618 619 class Meta: 620 verbose_name = _("Device Token") 621 verbose_name_plural = _("Device Tokens") 622 indexes = ExpiringModel.Meta.indexes 623 624 def __str__(self): 625 return f"Device Token for {self.provider_id}"
Temporary device token for OAuth device flow
Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Child.parent is a ForwardManyToOneDescriptor instance.
Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Child.parent is a ForwardManyToOneDescriptor instance.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Child.parent is a ForwardManyToOneDescriptor instance.
610 @property 611 def scope(self) -> list[str]: 612 """Return scopes as list of strings""" 613 return self._scope.split()
Return scopes as list of strings
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
Inherited Members
The requested object does not exist
The query returned multiple objects when only one was expected.