authentik.core.models
authentik core models
1"""authentik core models""" 2 3import re 4import traceback 5from datetime import datetime 6from enum import StrEnum 7from hashlib import sha256 8from typing import Any, Self 9from uuid import uuid4 10 11import pgtrigger 12from deepmerge import always_merger 13from django.contrib.auth.hashers import check_password, identify_hasher 14from django.contrib.auth.models import AbstractUser, Permission 15from django.contrib.auth.models import UserManager as DjangoUserManager 16from django.contrib.sessions.base_session import AbstractBaseSession 17from django.core.validators import validate_slug 18from django.db import models 19from django.db.models import Manager, Q, QuerySet, options 20from django.http import HttpRequest 21from django.utils.functional import cached_property 22from django.utils.timezone import now 23from django.utils.translation import gettext_lazy as _ 24from guardian.conf import settings 25from guardian.models import RoleModelPermission, RoleObjectPermission 26from model_utils.managers import InheritanceManager 27from psqlextra.indexes import UniqueIndex 28from psqlextra.models import PostgresMaterializedViewModel 29from rest_framework.serializers import Serializer 30from structlog.stdlib import get_logger 31 32from authentik.admin.files.fields import FileField 33from authentik.admin.files.manager import get_file_manager 34from authentik.admin.files.usage import FileUsage 35from authentik.blueprints.models import ManagedModel 36from authentik.core.expression.exceptions import PropertyMappingExpressionException 37from authentik.core.types import UILoginButton, UserSettingSerializer 38from authentik.lib.avatars import get_avatar 39from authentik.lib.expression.exceptions import ControlFlowException 40from authentik.lib.generators import generate_id 41from authentik.lib.merge import MERGE_LIST_UNIQUE 42from authentik.lib.models import ( 43 CreatedUpdatedModel, 44 DomainlessFormattedURLValidator, 45 SerializerModel, 46) 47from authentik.lib.utils.inheritance import get_deepest_child 48from authentik.lib.utils.reflection import class_to_path 49from authentik.lib.utils.time import timedelta_from_string 50from authentik.policies.models import PolicyBindingModel 51from authentik.rbac.models import Role 52from authentik.tenants.models import DEFAULT_TOKEN_DURATION, DEFAULT_TOKEN_LENGTH 53from authentik.tenants.utils import get_current_tenant, get_unique_identifier 54 55LOGGER = get_logger() 56USERNAME_MAX_LENGTH = 150 57USER_PATH_SYSTEM_PREFIX = "goauthentik.io" 58_USER_ATTR_PREFIX = f"{USER_PATH_SYSTEM_PREFIX}/user" 59USER_ATTRIBUTE_DEBUG = f"{_USER_ATTR_PREFIX}/debug" 60USER_ATTRIBUTE_GENERATED = f"{_USER_ATTR_PREFIX}/generated" 61USER_ATTRIBUTE_EXPIRES = f"{_USER_ATTR_PREFIX}/expires" 62USER_ATTRIBUTE_DELETE_ON_LOGOUT = f"{_USER_ATTR_PREFIX}/delete-on-logout" 63USER_ATTRIBUTE_SOURCES = f"{_USER_ATTR_PREFIX}/sources" 64USER_ATTRIBUTE_TOKEN_EXPIRING = f"{_USER_ATTR_PREFIX}/token-expires" # nosec 65USER_ATTRIBUTE_TOKEN_MAXIMUM_LIFETIME = f"{_USER_ATTR_PREFIX}/token-maximum-lifetime" # nosec 66USER_ATTRIBUTE_CHANGE_USERNAME = f"{_USER_ATTR_PREFIX}/can-change-username" 67USER_ATTRIBUTE_CHANGE_NAME = f"{_USER_ATTR_PREFIX}/can-change-name" 68USER_ATTRIBUTE_CHANGE_EMAIL = f"{_USER_ATTR_PREFIX}/can-change-email" 69USER_PATH_SERVICE_ACCOUNT = f"{USER_PATH_SYSTEM_PREFIX}/service-accounts" 70 71options.DEFAULT_NAMES = options.DEFAULT_NAMES + ( 72 # used_by API that allows models to specify if they shadow an object 73 # for example the proxy provider which is built on top of an oauth provider 74 "authentik_used_by_shadows", 75) 76 77GROUP_RECURSION_LIMIT = 20 78 79MANAGED_ROLE_PREFIX_USER = "ak-managed-role--user" 80MANAGED_ROLE_PREFIX_GROUP = "ak-managed-role--group" 81 82 83def managed_role_name(user_or_group: models.Model): 84 if isinstance(user_or_group, User): 85 return f"{MANAGED_ROLE_PREFIX_USER}-{user_or_group.pk}" 86 if isinstance(user_or_group, Group): 87 return f"{MANAGED_ROLE_PREFIX_GROUP}-{user_or_group.pk}" 88 raise TypeError("Managed roles are only available for User or Group.") 89 90 91def default_token_duration() -> datetime: 92 """Default duration a Token is valid""" 93 current_tenant = get_current_tenant() 94 token_duration = ( 95 current_tenant.default_token_duration 96 if hasattr(current_tenant, "default_token_duration") 97 else DEFAULT_TOKEN_DURATION 98 ) 99 return now() + timedelta_from_string(token_duration) 100 101 102def default_token_key() -> str: 103 """Default token key""" 104 current_tenant = get_current_tenant() 105 token_length = ( 106 current_tenant.default_token_length 107 if hasattr(current_tenant, "default_token_length") 108 else DEFAULT_TOKEN_LENGTH 109 ) 110 # We use generate_id since the chars in the key should be easy 111 # to use in Emails (for verification) and URLs (for recovery) 112 return generate_id(token_length) 113 114 115class UserTypes(models.TextChoices): 116 """User types, both for grouping, licensing and permissions in the case 117 of the internal_service_account""" 118 119 INTERNAL = "internal" 120 EXTERNAL = "external" 121 122 # User-created service accounts 123 SERVICE_ACCOUNT = "service_account" 124 125 # Special user type for internally managed and created service 126 # accounts, such as outpost users 127 INTERNAL_SERVICE_ACCOUNT = "internal_service_account" 128 129 130class AttributesMixin(models.Model): 131 """Adds an attributes property to a model""" 132 133 attributes = models.JSONField(default=dict, blank=True) 134 135 class Meta: 136 abstract = True 137 138 def update_attributes(self, properties: dict[str, Any]): 139 """Update fields and attributes, but correctly by merging dicts""" 140 needs_update = False 141 for key, value in properties.items(): 142 if key == "attributes": 143 continue 144 if getattr(self, key, None) != value: 145 setattr(self, key, value) 146 needs_update = True 147 final_attributes = {} 148 MERGE_LIST_UNIQUE.merge(final_attributes, self.attributes) 149 MERGE_LIST_UNIQUE.merge(final_attributes, properties.get("attributes", {})) 150 if self.attributes != final_attributes: 151 self.attributes = final_attributes 152 needs_update = True 153 if needs_update: 154 self.save() 155 156 @classmethod 157 def update_or_create_attributes( 158 cls, query: dict[str, Any], properties: dict[str, Any] 159 ) -> tuple[Self, bool]: 160 """Same as django's update_or_create but correctly updates attributes by merging dicts""" 161 instance = cls.objects.filter(**query).first() 162 if not instance: 163 return cls.objects.create(**properties), True 164 instance.update_attributes(properties) 165 return instance, False 166 167 168class GroupQuerySet(QuerySet): 169 def with_descendants(self): 170 pks = self.values_list("pk", flat=True) 171 return Group.objects.filter(Q(pk__in=pks) | Q(ancestor_nodes__ancestor__in=pks)).distinct() 172 173 def with_ancestors(self): 174 pks = self.values_list("pk", flat=True) 175 return Group.objects.filter( 176 Q(pk__in=pks) | Q(descendant_nodes__descendant__in=pks) 177 ).distinct() 178 179 180class Group(SerializerModel, AttributesMixin): 181 """Group model which supports a hierarchy and has attributes""" 182 183 group_uuid = models.UUIDField(primary_key=True, editable=False, default=uuid4) 184 185 name = models.TextField(verbose_name=_("name"), unique=True) 186 is_superuser = models.BooleanField( 187 default=False, help_text=_("Users added to this group will be superusers.") 188 ) 189 190 roles = models.ManyToManyField("authentik_rbac.Role", related_name="groups", blank=True) 191 192 parents = models.ManyToManyField( 193 "Group", 194 blank=True, 195 symmetrical=False, 196 through="GroupParentageNode", 197 related_name="children", 198 ) 199 200 objects = GroupQuerySet.as_manager() 201 202 class Meta: 203 indexes = ( 204 models.Index(fields=["name"]), 205 models.Index(fields=["is_superuser"]), 206 ) 207 verbose_name = _("Group") 208 verbose_name_plural = _("Groups") 209 permissions = [ 210 ("add_user_to_group", _("Add user to group")), 211 ("remove_user_from_group", _("Remove user from group")), 212 ("enable_group_superuser", _("Enable superuser status")), 213 ("disable_group_superuser", _("Disable superuser status")), 214 ] 215 216 def __str__(self): 217 return f"Group {self.name}" 218 219 @property 220 def serializer(self) -> Serializer: 221 from authentik.core.api.groups import GroupSerializer 222 223 return GroupSerializer 224 225 @property 226 def num_pk(self) -> int: 227 """Get a numerical, int32 ID for the group""" 228 # int max is 2147483647 (10 digits) so 9 is the max usable 229 # in the LDAP Outpost we use the last 5 chars so match here 230 return int(str(self.pk.int)[:5]) 231 232 def is_member(self, user: User) -> bool: 233 """Recursively check if `user` is member of us, or any parent.""" 234 return user.all_groups().filter(group_uuid=self.group_uuid).exists() 235 236 def all_roles(self) -> QuerySet[Role]: 237 """Get all roles of this group and all of its ancestors.""" 238 return Role.objects.filter( 239 groups__in=Group.objects.filter(pk=self.pk).with_ancestors() 240 ).distinct() 241 242 def get_managed_role(self, create=False): 243 if create: 244 name = managed_role_name(self) 245 role, created = Role.objects.get_or_create(name=name, managed=name) 246 if created: 247 role.groups.add(self) 248 return role 249 else: 250 return Role.objects.filter(name=managed_role_name(self)).first() 251 252 def assign_perms_to_managed_role( 253 self, 254 perms: str | list[str] | Permission | list[Permission], 255 obj: models.Model | None = None, 256 ): 257 if not perms: 258 return 259 role = self.get_managed_role(create=True) 260 role.assign_perms(perms, obj) 261 262 263class GroupParentageNode(models.Model): 264 uuid = models.UUIDField(primary_key=True, editable=False, default=uuid4) 265 266 child = models.ForeignKey(Group, related_name="parent_nodes", on_delete=models.CASCADE) 267 parent = models.ForeignKey(Group, related_name="child_nodes", on_delete=models.CASCADE) 268 269 class Meta: 270 verbose_name = _("Group Parentage Node") 271 verbose_name_plural = _("Group Parentage Nodes") 272 273 db_table = "authentik_core_groupparentage" 274 275 triggers = [ 276 pgtrigger.Trigger( 277 name="refresh_groupancestry", 278 operation=pgtrigger.Insert | pgtrigger.Update | pgtrigger.Delete, 279 when=pgtrigger.After, 280 func=""" 281 REFRESH MATERIALIZED VIEW CONCURRENTLY authentik_core_groupancestry; 282 RETURN NULL; 283 """, 284 ), 285 ] 286 287 def __str__(self) -> str: 288 return f"Group Parentage Node from #{self.child_id} to {self.parent_id}" 289 290 291class GroupAncestryNode(PostgresMaterializedViewModel): 292 descendant = models.ForeignKey( 293 Group, related_name="ancestor_nodes", on_delete=models.DO_NOTHING 294 ) 295 ancestor = models.ForeignKey( 296 Group, related_name="descendant_nodes", on_delete=models.DO_NOTHING 297 ) 298 299 class Meta: 300 # This is a transitive closure of authentik_core_groupparentage 301 # See https://en.wikipedia.org/wiki/Transitive_closure#In_graph_theory 302 db_table = "authentik_core_groupancestry" 303 indexes = [ 304 models.Index(fields=["descendant"]), 305 models.Index(fields=["ancestor"]), 306 UniqueIndex(fields=["id"]), 307 ] 308 309 class ViewMeta: 310 query = """ 311 WITH RECURSIVE accumulator AS ( 312 SELECT 313 child_id::text || '-' || parent_id::text as id, 314 child_id AS descendant_id, 315 parent_id AS ancestor_id 316 FROM authentik_core_groupparentage 317 318 UNION 319 320 SELECT 321 accumulator.descendant_id::text || '-' || current.parent_id::text as id, 322 accumulator.descendant_id, 323 current.parent_id AS ancestor_id 324 FROM accumulator 325 JOIN authentik_core_groupparentage current 326 ON accumulator.ancestor_id = current.child_id 327 ) 328 SELECT * FROM accumulator 329 """ 330 331 def __str__(self) -> str: 332 return f"Group Ancestry Node from {self.descendant_id} to {self.ancestor_id}" 333 334 335class UserQuerySet(models.QuerySet): 336 """User queryset""" 337 338 def exclude_anonymous(self): 339 """Exclude anonymous user""" 340 return self.exclude(**{User.USERNAME_FIELD: settings.ANONYMOUS_USER_NAME}) 341 342 343class UserManager(DjangoUserManager): 344 """User manager that doesn't assign is_superuser and is_staff""" 345 346 def get_queryset(self): 347 """Create special user queryset""" 348 return UserQuerySet(self.model, using=self._db) 349 350 def create_user(self, username, email=None, password=None, **extra_fields): 351 """User manager that doesn't assign is_superuser and is_staff""" 352 return self._create_user(username, email, password, **extra_fields) 353 354 def exclude_anonymous(self) -> QuerySet: 355 """Exclude anonymous user""" 356 return self.get_queryset().exclude_anonymous() 357 358 359class User(SerializerModel, AttributesMixin, AbstractUser): 360 """authentik User model, based on django's contrib auth user model.""" 361 362 # Overwriting PermissionsMixin: permissions are handled by roles. 363 # (This knowingly violates the Liskov substitution principle. It is better to fail loudly.) 364 user_permissions = None 365 366 uuid = models.UUIDField(default=uuid4, editable=False, unique=True) 367 name = models.TextField(help_text=_("User's display name.")) 368 path = models.TextField(default="users") 369 type = models.TextField(choices=UserTypes.choices, default=UserTypes.INTERNAL) 370 371 sources = models.ManyToManyField("Source", through="UserSourceConnection") 372 groups = models.ManyToManyField("Group", related_name="users") 373 roles = models.ManyToManyField("authentik_rbac.Role", related_name="users", blank=True) 374 password_change_date = models.DateTimeField(auto_now_add=True) 375 376 last_updated = models.DateTimeField(auto_now=True) 377 378 objects = UserManager() 379 380 class Meta: 381 verbose_name = _("User") 382 verbose_name_plural = _("Users") 383 permissions = [ 384 ("reset_user_password", _("Reset Password")), 385 ("impersonate", _("Can impersonate other users")), 386 ("preview_user", _("Can preview user data sent to providers")), 387 ("view_user_applications", _("View applications the user has access to")), 388 ] 389 indexes = [ 390 models.Index(fields=["last_login"]), 391 models.Index(fields=["password_change_date"]), 392 models.Index(fields=["uuid"]), 393 models.Index(fields=["path"]), 394 models.Index(fields=["type"]), 395 models.Index(fields=["date_joined"]), 396 models.Index(fields=["last_updated"]), 397 ] 398 399 def __str__(self): 400 return self.username 401 402 @staticmethod 403 def default_path() -> str: 404 """Get the default user path""" 405 return User._meta.get_field("path").default 406 407 def all_groups(self) -> QuerySet[Group]: 408 """Recursively get all groups this user is a member of.""" 409 return self.groups.all().with_ancestors() 410 411 def all_roles(self) -> QuerySet[Role]: 412 """Get all roles of this user and all of its groups (recursively).""" 413 return Role.objects.filter(Q(users=self) | Q(groups__in=self.all_groups())).distinct() 414 415 def get_managed_role(self, create=False): 416 if create: 417 name = managed_role_name(self) 418 role, created = Role.objects.get_or_create(name=name, managed=name) 419 if created: 420 role.users.add(self) 421 return role 422 else: 423 return Role.objects.filter(name=managed_role_name(self)).first() 424 425 def get_all_model_perms_on_managed_role(self) -> QuerySet[RoleModelPermission]: 426 role = self.get_managed_role() 427 if not role: 428 return RoleModelPermission.objects.none() 429 return RoleModelPermission.objects.filter(role=role) 430 431 def get_all_obj_perms_on_managed_role(self) -> QuerySet[RoleObjectPermission]: 432 role = self.get_managed_role() 433 if not role: 434 return RoleObjectPermission.objects.none() 435 return RoleObjectPermission.objects.filter(role=role) 436 437 def assign_perms_to_managed_role( 438 self, 439 perms: str | list[str] | Permission | list[Permission], 440 obj: models.Model | None = None, 441 ): 442 if not perms: 443 return 444 role = self.get_managed_role(create=True) 445 role.assign_perms(perms, obj) 446 447 def remove_perms_from_managed_role( 448 self, 449 perms: str | list[str] | Permission | list[Permission], 450 obj: models.Model | None = None, 451 ): 452 role = self.get_managed_role() 453 if not role: 454 return None 455 role.remove_perms(perms, obj) 456 457 def remove_all_perms_from_managed_role(self): 458 role = self.get_managed_role() 459 if not role: 460 return None 461 RoleModelPermission.objects.filter(role=role).delete() 462 RoleObjectPermission.objects.filter(role=role).delete() 463 464 def group_attributes(self, request: HttpRequest | None = None) -> dict[str, Any]: 465 """Get a dictionary containing the attributes from all groups the user belongs to, 466 including the users attributes""" 467 final_attributes = {} 468 if request and hasattr(request, "brand"): 469 always_merger.merge(final_attributes, request.brand.attributes) 470 for group in self.all_groups().order_by("name"): 471 always_merger.merge(final_attributes, group.attributes) 472 always_merger.merge(final_attributes, self.attributes) 473 return final_attributes 474 475 def app_entitlements(self, app: Application | None) -> QuerySet[ApplicationEntitlement]: 476 """Get all entitlements this user has for `app`.""" 477 if not app: 478 return [] 479 all_groups = self.all_groups() 480 qs = app.applicationentitlement_set.filter( 481 Q( 482 Q(bindings__user=self) | Q(bindings__group__in=all_groups), 483 bindings__negate=False, 484 ) 485 | Q( 486 Q(~Q(bindings__user=self), bindings__user__isnull=False) 487 | Q(~Q(bindings__group__in=all_groups), bindings__group__isnull=False), 488 bindings__negate=True, 489 ), 490 bindings__enabled=True, 491 ).order_by("name") 492 return qs 493 494 def app_entitlements_attributes(self, app: Application | None) -> dict: 495 """Get a dictionary containing all merged attributes from app entitlements for `app`.""" 496 final_attributes = {} 497 for attrs in self.app_entitlements(app).values_list("attributes", flat=True): 498 always_merger.merge(final_attributes, attrs) 499 return final_attributes 500 501 @property 502 def serializer(self) -> Serializer: 503 from authentik.core.api.users import UserSerializer 504 505 return UserSerializer 506 507 @cached_property 508 def is_superuser(self) -> bool: 509 """Get supseruser status based on membership in a group with superuser status""" 510 return self.all_groups().filter(is_superuser=True).exists() 511 512 @property 513 def is_staff(self) -> bool: 514 """superuser == staff user""" 515 return self.is_superuser # type: ignore 516 517 # TODO: remove this after 2026. 518 @property 519 def ak_groups(self): 520 """This is a proxy for a renamed, deprecated field.""" 521 from authentik.events.models import Event 522 523 deprecation = "authentik.core.models.User.ak_groups" 524 replacement = "authentik.core.models.User.groups" 525 message_logger = ( 526 f"{deprecation} is deprecated and will be removed in a future version of " 527 f"authentik. Please use {replacement} instead." 528 ) 529 message_event = ( 530 f"{message_logger} This event will not be repeated until it expires (by " 531 "default: in 30 days). See authentik logs for every will invocation of this " 532 "deprecation." 533 ) 534 stacktrace = traceback.format_stack() 535 # The last line is this function, the next-to-last line is its caller 536 cause = stacktrace[-2] if len(stacktrace) > 1 else "Unknown, see stacktrace in logs" 537 if search := re.search(r'"(.*?)"', cause): 538 cause = f"Property mapping or Expression policy named {search.group(1)}" 539 540 LOGGER.warning( 541 "deprecation used", 542 message=message_logger, 543 deprecation=deprecation, 544 replacement=replacement, 545 cause=cause, 546 stacktrace=stacktrace, 547 ) 548 Event.log_deprecation( 549 deprecation, message=message_event, cause=cause, replacement=replacement 550 ) 551 return self.groups 552 553 def set_password(self, raw_password, signal=True, sender=None, request=None): 554 if self.pk and signal: 555 from authentik.core.signals import password_changed 556 557 if not sender: 558 sender = self 559 password_changed.send(sender=sender, user=self, password=raw_password, request=request) 560 self.password_change_date = now() 561 return super().set_password(raw_password) 562 563 @staticmethod 564 def validate_password_hash(password_hash: str): 565 """Validate that the value is a recognized Django password hash.""" 566 identify_hasher(password_hash) # Raises ValueError if invalid 567 568 def set_password_from_hash(self, password_hash: str, signal=True, sender=None, request=None): 569 """Set password directly from a pre-hashed value. 570 571 Unlike set_password(), this does not hash the input again. The provided value 572 must already be a valid Django password hash, and it is stored directly on the 573 user after validation. 574 575 Because no raw password is available, downstream password sync integrations 576 such as LDAP and Kerberos cannot be updated from this code path. 577 578 Raises ValueError if the hash format is not recognized. 579 """ 580 self.validate_password_hash(password_hash) 581 if self.pk and signal: 582 from authentik.core.signals import password_hash_changed 583 584 if not sender: 585 sender = self 586 password_hash_changed.send(sender=sender, user=self, request=request) 587 self.password = password_hash 588 self.password_change_date = now() 589 590 def check_password(self, raw_password: str) -> bool: 591 """ 592 Return a boolean of whether the raw_password was correct. Handles 593 hashing formats behind the scenes. 594 595 Slightly changed version which doesn't send a signal for such internal hash upgrades 596 """ 597 598 def setter(raw_password): 599 self.set_password(raw_password, signal=False) 600 # Password hash upgrades shouldn't be considered password changes. 601 self._password = None 602 self.save(update_fields=["password"]) 603 604 return check_password(raw_password, self.password, setter) 605 606 @property 607 def uid(self) -> str: 608 """Generate a globally unique UID, based on the user ID and the hashed secret key""" 609 return sha256(f"{self.id}-{get_unique_identifier()}".encode("ascii")).hexdigest() 610 611 def locale(self, request: HttpRequest | None = None) -> str: 612 """Get the locale the user has configured""" 613 if request and hasattr(request, "LANGUAGE_CODE"): 614 return request.LANGUAGE_CODE 615 try: 616 return self.attributes.get("settings", {}).get("locale", "") 617 618 except Exception as exc: # noqa 619 LOGGER.warning("Failed to get default locale", exc=exc) 620 if request: 621 return request.brand.locale 622 return "" 623 624 @property 625 def avatar(self) -> str: 626 """Get avatar, depending on authentik.avatar setting""" 627 return get_avatar(self) 628 629 630class Provider(SerializerModel): 631 """Application-independent Provider instance. For example SAML2 Remote, OAuth2 Application""" 632 633 name = models.TextField(unique=True) 634 635 authentication_flow = models.ForeignKey( 636 "authentik_flows.Flow", 637 null=True, 638 on_delete=models.SET_NULL, 639 help_text=_( 640 "Flow used for authentication when the associated application is accessed by an " 641 "un-authenticated user." 642 ), 643 related_name="provider_authentication", 644 ) 645 authorization_flow = models.ForeignKey( 646 "authentik_flows.Flow", 647 # Set to cascade even though null is allowed, since most providers 648 # still require an authorization flow set 649 on_delete=models.CASCADE, 650 null=True, 651 help_text=_("Flow used when authorizing this provider."), 652 related_name="provider_authorization", 653 ) 654 invalidation_flow = models.ForeignKey( 655 "authentik_flows.Flow", 656 on_delete=models.SET_DEFAULT, 657 default=None, 658 null=True, 659 help_text=_("Flow used ending the session from a provider."), 660 related_name="provider_invalidation", 661 ) 662 663 property_mappings = models.ManyToManyField("PropertyMapping", default=None, blank=True) 664 665 backchannel_application = models.ForeignKey( 666 "Application", 667 default=None, 668 null=True, 669 on_delete=models.CASCADE, 670 help_text=_( 671 "Accessed from applications; optional backchannel providers for protocols " 672 "like LDAP and SCIM." 673 ), 674 related_name="backchannel_providers", 675 ) 676 677 is_backchannel = models.BooleanField(default=False) 678 679 objects = InheritanceManager() 680 681 @property 682 def launch_url(self) -> str | None: 683 """URL to this provider and initiate authorization for the user. 684 Can return None for providers that are not URL-based""" 685 return None 686 687 @property 688 def icon_url(self) -> str | None: 689 return None 690 691 @property 692 def component(self) -> str: 693 """Return component used to edit this object""" 694 raise NotImplementedError 695 696 @property 697 def serializer(self) -> type[Serializer]: 698 """Get serializer for this model""" 699 raise NotImplementedError 700 701 def __str__(self): 702 return str(self.name) 703 704 705class BackchannelProvider(Provider): 706 """Base class for providers that augment other providers, for example LDAP and SCIM. 707 Multiple of these providers can be configured per application, they may not use the application 708 slug in URLs as an application may have multiple instances of the same 709 type of Backchannel provider 710 711 They can use the application's policies and metadata""" 712 713 @property 714 def component(self) -> str: 715 raise NotImplementedError 716 717 @property 718 def serializer(self) -> type[Serializer]: 719 raise NotImplementedError 720 721 class Meta: 722 abstract = True 723 724 725class ApplicationQuerySet(QuerySet): 726 def with_provider(self) -> QuerySet[Application]: 727 qs = self.select_related("provider") 728 for subclass in Provider.objects.get_queryset()._get_subclasses_recurse(Provider): 729 qs = qs.select_related(f"provider__{subclass}") 730 # Also prefetch/select through each subclass path to ensure casted instances have access 731 qs = qs.prefetch_related(f"provider__{subclass}__property_mappings") 732 qs = qs.select_related(f"provider__{subclass}__application") 733 qs = qs.select_related(f"provider__{subclass}__backchannel_application") 734 return qs 735 736 737class Application(SerializerModel, PolicyBindingModel): 738 """Every Application which uses authentik for authentication/identification/authorization 739 needs an Application record. Other authentication types can subclass this Model to 740 add custom fields and other properties""" 741 742 name = models.TextField(help_text=_("Application's display Name.")) 743 slug = models.TextField( 744 validators=[validate_slug], 745 help_text=_("Internal application name, used in URLs."), 746 unique=True, 747 ) 748 group = models.TextField(blank=True, default="") 749 750 provider = models.OneToOneField( 751 "Provider", null=True, blank=True, default=None, on_delete=models.SET_DEFAULT 752 ) 753 754 meta_launch_url = models.TextField( 755 default="", blank=True, validators=[DomainlessFormattedURLValidator()] 756 ) 757 758 open_in_new_tab = models.BooleanField( 759 default=False, help_text=_("Open launch URL in a new browser tab or window.") 760 ) 761 762 meta_icon = FileField(default="", blank=True) 763 meta_description = models.TextField(default="", blank=True) 764 meta_publisher = models.TextField(default="", blank=True) 765 meta_hide = models.BooleanField( 766 default=False, help_text=_("Hide this application from the user's My applications page.") 767 ) 768 769 objects = ApplicationQuerySet.as_manager() 770 771 # Reserved slugs that would clash with OAuth2 provider endpoints 772 reserved_slugs = ["authorize", "token", "device", "userinfo", "introspect", "revoke"] 773 774 @property 775 def serializer(self) -> Serializer: 776 from authentik.core.api.applications import ApplicationSerializer 777 778 return ApplicationSerializer 779 780 @property 781 def get_meta_icon(self) -> str | None: 782 """Get the URL to the App Icon image""" 783 if not self.meta_icon: 784 return None 785 786 return get_file_manager(FileUsage.MEDIA).file_url(self.meta_icon) 787 788 @property 789 def get_meta_icon_themed_urls(self) -> dict[str, str] | None: 790 """Get themed URLs for meta_icon if it contains %(theme)s""" 791 if not self.meta_icon: 792 return None 793 794 return get_file_manager(FileUsage.MEDIA).themed_urls(self.meta_icon) 795 796 def get_launch_url(self, user: User | None = None, user_data: dict | None = None) -> str | None: 797 """Get launch URL if set, otherwise attempt to get launch URL based on provider. 798 799 Args: 800 user: User instance for formatting the URL 801 user_data: Pre-serialized user data to avoid re-serialization (performance optimization) 802 """ 803 from authentik.core.api.users import UserSerializer 804 805 url = None 806 if self.meta_launch_url: 807 url = self.meta_launch_url 808 elif provider := self.get_provider(): 809 url = provider.launch_url 810 if user and url: 811 try: 812 # Use pre-serialized data if available, otherwise serialize now 813 if user_data is None: 814 user_data = UserSerializer(instance=user).data 815 return url % user_data 816 except Exception as exc: # noqa 817 LOGGER.warning("Failed to format launch url", exc=exc) 818 return url 819 return url 820 821 def get_provider(self) -> Provider | None: 822 """Get casted provider instance. Needs Application queryset with_provider""" 823 if hasattr(self, "_cached_provider"): 824 return self._cached_provider 825 if not self.provider: 826 self._cached_provider = None 827 return None 828 self._cached_provider = get_deepest_child(self.provider) 829 return self._cached_provider 830 831 def backchannel_provider_for[T: Provider](self, provider_type: type[T], **kwargs) -> T | None: 832 """Get Backchannel provider for a specific type""" 833 provider: BackchannelProvider | None = self.backchannel_providers.filter( 834 **{f"{provider_type._meta.model_name}__isnull": False}, 835 **kwargs, 836 ).first() 837 return getattr(provider, provider_type._meta.model_name) if provider else None 838 839 def __str__(self): 840 return str(self.name) 841 842 class Meta: 843 verbose_name = _("Application") 844 verbose_name_plural = _("Applications") 845 846 847class ApplicationEntitlement(AttributesMixin, SerializerModel, PolicyBindingModel): 848 """Application-scoped entitlement to control authorization in an application""" 849 850 name = models.TextField() 851 852 app = models.ForeignKey(Application, on_delete=models.CASCADE) 853 854 class Meta: 855 verbose_name = _("Application Entitlement") 856 verbose_name_plural = _("Application Entitlements") 857 unique_together = (("app", "name"),) 858 859 def __str__(self): 860 return f"Application Entitlement {self.name} for app {self.app_id}" 861 862 @property 863 def serializer(self) -> type[Serializer]: 864 from authentik.core.api.application_entitlements import ApplicationEntitlementSerializer 865 866 return ApplicationEntitlementSerializer 867 868 def supported_policy_binding_targets(self): 869 return ["group", "user"] 870 871 872class SourceUserMatchingModes(models.TextChoices): 873 """Different modes a source can handle new/returning users""" 874 875 IDENTIFIER = "identifier", _("Use the source-specific identifier") 876 EMAIL_LINK = ( 877 "email_link", 878 _( 879 "Link to a user with identical email address. Can have security implications " 880 "when a source doesn't validate email addresses." 881 ), 882 ) 883 EMAIL_DENY = ( 884 "email_deny", 885 _( 886 "Use the user's email address, but deny enrollment when the email address already " 887 "exists." 888 ), 889 ) 890 USERNAME_LINK = ( 891 "username_link", 892 _( 893 "Link to a user with identical username. Can have security implications " 894 "when a username is used with another source." 895 ), 896 ) 897 USERNAME_DENY = ( 898 "username_deny", 899 _("Use the user's username, but deny enrollment when the username already exists."), 900 ) 901 902 903class SourceGroupMatchingModes(models.TextChoices): 904 """Different modes a source can handle new/returning groups""" 905 906 IDENTIFIER = "identifier", _("Use the source-specific identifier") 907 NAME_LINK = ( 908 "name_link", 909 _( 910 "Link to a group with identical name. Can have security implications " 911 "when a group name is used with another source." 912 ), 913 ) 914 NAME_DENY = ( 915 "name_deny", 916 _("Use the group name, but deny enrollment when the name already exists."), 917 ) 918 919 920class Source(ManagedModel, SerializerModel, PolicyBindingModel): 921 """Base Authentication source, i.e. an OAuth Provider, SAML Remote or LDAP Server""" 922 923 MANAGED_INBUILT = "goauthentik.io/sources/inbuilt" 924 925 name = models.TextField(help_text=_("Source's display Name.")) 926 slug = models.TextField( 927 validators=[validate_slug], 928 help_text=_("Internal source name, used in URLs."), 929 unique=True, 930 ) 931 932 user_path_template = models.TextField(default="goauthentik.io/sources/%(slug)s") 933 934 enabled = models.BooleanField(default=True) 935 promoted = models.BooleanField( 936 default=False, 937 help_text=_( 938 "When enabled, this source will be displayed as a prominent button on the " 939 "login page, instead of a small icon." 940 ), 941 ) 942 user_property_mappings = models.ManyToManyField( 943 "PropertyMapping", default=None, blank=True, related_name="source_userpropertymappings_set" 944 ) 945 group_property_mappings = models.ManyToManyField( 946 "PropertyMapping", default=None, blank=True, related_name="source_grouppropertymappings_set" 947 ) 948 949 icon = FileField(blank=True, default="") 950 951 authentication_flow = models.ForeignKey( 952 "authentik_flows.Flow", 953 blank=True, 954 null=True, 955 default=None, 956 on_delete=models.SET_NULL, 957 help_text=_("Flow to use when authenticating existing users."), 958 related_name="source_authentication", 959 ) 960 enrollment_flow = models.ForeignKey( 961 "authentik_flows.Flow", 962 blank=True, 963 null=True, 964 default=None, 965 on_delete=models.SET_NULL, 966 help_text=_("Flow to use when enrolling new users."), 967 related_name="source_enrollment", 968 ) 969 970 user_matching_mode = models.TextField( 971 choices=SourceUserMatchingModes.choices, 972 default=SourceUserMatchingModes.IDENTIFIER, 973 help_text=_( 974 "How the source determines if an existing user should be authenticated or " 975 "a new user enrolled." 976 ), 977 ) 978 group_matching_mode = models.TextField( 979 choices=SourceGroupMatchingModes.choices, 980 default=SourceGroupMatchingModes.IDENTIFIER, 981 help_text=_( 982 "How the source determines if an existing group should be used or a new group created." 983 ), 984 ) 985 986 objects = InheritanceManager() 987 988 def get_icon_url(self, request=None, use_cache: bool = True) -> str | None: 989 """Get the URL to the source icon.""" 990 if not self.icon: 991 return None 992 return get_file_manager(FileUsage.MEDIA).file_url(self.icon, request, use_cache=use_cache) 993 994 @property 995 def icon_url(self) -> str | None: 996 """Get the URL to the source icon""" 997 return self.get_icon_url() 998 999 def get_icon_themed_urls( 1000 self, 1001 request=None, 1002 use_cache: bool = True, 1003 ) -> dict[str, str] | None: 1004 """Get themed URLs for icon if it contains %(theme)s.""" 1005 if not self.icon: 1006 return None 1007 return get_file_manager(FileUsage.MEDIA).themed_urls( 1008 self.icon, 1009 request, 1010 use_cache=use_cache, 1011 ) 1012 1013 @property 1014 def icon_themed_urls(self) -> dict[str, str] | None: 1015 return self.get_icon_themed_urls() 1016 1017 def get_user_path(self) -> str: 1018 """Get user path, fallback to default for formatting errors""" 1019 try: 1020 return self.user_path_template % { 1021 "slug": self.slug, 1022 } 1023 1024 except Exception as exc: # noqa 1025 LOGGER.warning("Failed to template user path", exc=exc, source=self) 1026 return User.default_path() 1027 1028 @property 1029 def component(self) -> str: 1030 """Return component used to edit this object""" 1031 if self.managed == self.MANAGED_INBUILT: 1032 return "" 1033 raise NotImplementedError 1034 1035 @property 1036 def property_mapping_type(self) -> type[PropertyMapping]: 1037 """Return property mapping type used by this object""" 1038 if self.managed == self.MANAGED_INBUILT: 1039 from authentik.core.models import PropertyMapping 1040 1041 return PropertyMapping 1042 raise NotImplementedError 1043 1044 def ui_login_button(self, request: HttpRequest) -> UILoginButton | None: 1045 """If source uses a http-based flow, return UI Information about the login 1046 button. If source doesn't use http-based flow, return None.""" 1047 return None 1048 1049 def ui_user_settings(self) -> UserSettingSerializer | None: 1050 """Entrypoint to integrate with User settings. Can either return None if no 1051 user settings are available, or UserSettingSerializer.""" 1052 return None 1053 1054 def get_base_user_properties(self, **kwargs) -> dict[str, Any | dict[str, Any]]: 1055 """Get base properties for a user to build final properties upon.""" 1056 if self.managed == self.MANAGED_INBUILT: 1057 return {} 1058 raise NotImplementedError 1059 1060 def get_base_group_properties(self, **kwargs) -> dict[str, Any | dict[str, Any]]: 1061 """Get base properties for a group to build final properties upon.""" 1062 if self.managed == self.MANAGED_INBUILT: 1063 return {} 1064 raise NotImplementedError 1065 1066 def __str__(self): 1067 return str(self.name) 1068 1069 class Meta: 1070 indexes = [ 1071 models.Index( 1072 fields=[ 1073 "slug", 1074 ] 1075 ), 1076 models.Index( 1077 fields=[ 1078 "name", 1079 ] 1080 ), 1081 models.Index( 1082 fields=[ 1083 "enabled", 1084 ] 1085 ), 1086 ] 1087 1088 1089class UserSourceConnection(SerializerModel, CreatedUpdatedModel): 1090 """Connection between User and Source.""" 1091 1092 user = models.ForeignKey(User, on_delete=models.CASCADE) 1093 source = models.ForeignKey(Source, on_delete=models.CASCADE) 1094 identifier = models.TextField() 1095 1096 objects = InheritanceManager() 1097 1098 @property 1099 def serializer(self) -> type[Serializer]: 1100 """Get serializer for this model""" 1101 raise NotImplementedError 1102 1103 def __str__(self) -> str: 1104 return f"User-source connection (user={self.user_id}, source={self.source_id})" 1105 1106 class Meta: 1107 unique_together = (("user", "source"),) 1108 indexes = ( 1109 models.Index(fields=("identifier",)), 1110 models.Index(fields=("source", "identifier")), 1111 ) 1112 1113 1114class GroupSourceConnection(SerializerModel, CreatedUpdatedModel): 1115 """Connection between Group and Source.""" 1116 1117 group = models.ForeignKey(Group, on_delete=models.CASCADE) 1118 source = models.ForeignKey(Source, on_delete=models.CASCADE) 1119 identifier = models.TextField() 1120 1121 objects = InheritanceManager() 1122 1123 @property 1124 def serializer(self) -> type[Serializer]: 1125 """Get serializer for this model""" 1126 raise NotImplementedError 1127 1128 def __str__(self) -> str: 1129 return f"Group-source connection (group={self.group_id}, source={self.source_id})" 1130 1131 class Meta: 1132 unique_together = (("group", "source"),) 1133 1134 1135class ExpiringManager(Manager): 1136 """Manager for expiring objects which filters out expired objects by default""" 1137 1138 def get_queryset(self): 1139 return QuerySet(self.model, using=self._db).exclude(expires__lt=now(), expiring=True) 1140 1141 def including_expired(self): 1142 return QuerySet(self.model, using=self._db) 1143 1144 1145class ExpiringModel(models.Model): 1146 """Base Model which can expire, and is automatically cleaned up.""" 1147 1148 expires = models.DateTimeField(default=None, null=True) 1149 expiring = models.BooleanField(default=True) 1150 1151 objects = ExpiringManager() 1152 1153 class Meta: 1154 abstract = True 1155 indexes = [ 1156 models.Index(fields=["expires"]), 1157 models.Index(fields=["expiring"]), 1158 models.Index(fields=["expiring", "expires"]), 1159 ] 1160 1161 def expire_action(self, *args, **kwargs): 1162 """Handler which is called when this object is expired. By 1163 default the object is deleted. This is less efficient compared 1164 to bulk deleting objects, but classes like Token() need to change 1165 values instead of being deleted.""" 1166 try: 1167 return self.delete(*args, **kwargs) 1168 except self.DoesNotExist: 1169 # Object has already been deleted, so this should be fine 1170 return None 1171 1172 @classmethod 1173 def filter_not_expired(cls, **kwargs) -> QuerySet[Self]: 1174 """Filer for tokens which are not expired yet or are not expiring, 1175 and match filters in `kwargs`""" 1176 from authentik.events.models import Event 1177 1178 deprecation_id = f"{class_to_path(cls)}.filter_not_expired" 1179 1180 Event.log_deprecation( 1181 deprecation_id, 1182 message=( 1183 ".filter_not_expired() is deprecated as the default lookup now excludes " 1184 "expired objects." 1185 ), 1186 ) 1187 1188 for obj in ( 1189 cls.objects.including_expired() 1190 .filter(**kwargs) 1191 .filter(Q(expires__lt=now(), expiring=True)) 1192 ): 1193 obj.delete() 1194 return cls.objects.filter(**kwargs) 1195 1196 @property 1197 def is_expired(self) -> bool: 1198 """Check if token is expired yet.""" 1199 if not self.expiring: 1200 return False 1201 return now() > self.expires 1202 1203 1204class TokenIntents(models.TextChoices): 1205 """Intents a Token can be created for.""" 1206 1207 # Single use token 1208 INTENT_VERIFICATION = "verification" 1209 1210 # Allow access to API 1211 INTENT_API = "api" 1212 1213 # Recovery use for the recovery app 1214 INTENT_RECOVERY = "recovery" 1215 1216 # App-specific passwords 1217 INTENT_APP_PASSWORD = "app_password" # nosec 1218 1219 1220class Token(SerializerModel, ManagedModel, ExpiringModel): 1221 """Token used to authenticate the User for API Access or confirm another Stage like Email.""" 1222 1223 token_uuid = models.UUIDField(primary_key=True, editable=False, default=uuid4) 1224 identifier = models.SlugField(max_length=255, unique=True) 1225 key = models.TextField(default=default_token_key) 1226 intent = models.TextField( 1227 choices=TokenIntents.choices, default=TokenIntents.INTENT_VERIFICATION 1228 ) 1229 user = models.ForeignKey("User", on_delete=models.CASCADE, related_name="+") 1230 description = models.TextField(default="", blank=True) 1231 1232 class Meta: 1233 verbose_name = _("Token") 1234 verbose_name_plural = _("Tokens") 1235 indexes = ExpiringModel.Meta.indexes + [ 1236 models.Index(fields=["identifier"]), 1237 models.Index(fields=["key"]), 1238 ] 1239 permissions = [ 1240 ("view_token_key", _("View token's key")), 1241 ("set_token_key", _("Set a token's key")), 1242 ] 1243 1244 def __str__(self): 1245 description = f"{self.identifier}" 1246 if self.expiring: 1247 description += f" (expires={self.expires})" 1248 return description 1249 1250 @property 1251 def serializer(self) -> type[Serializer]: 1252 from authentik.core.api.tokens import TokenSerializer 1253 1254 return TokenSerializer 1255 1256 def expire_action(self, *args, **kwargs): 1257 """Handler which is called when this object is expired.""" 1258 from authentik.events.models import Event, EventAction 1259 1260 if self.intent in [ 1261 TokenIntents.INTENT_RECOVERY, 1262 TokenIntents.INTENT_VERIFICATION, 1263 TokenIntents.INTENT_APP_PASSWORD, 1264 ]: 1265 super().expire_action(*args, **kwargs) 1266 return 1267 1268 self.key = default_token_key() 1269 self.expires = default_token_duration() 1270 self.save(*args, **kwargs) 1271 Event.new( 1272 action=EventAction.SECRET_ROTATE, 1273 token=self, 1274 message=f"Token {self.identifier}'s secret was rotated.", 1275 ).save() 1276 1277 1278class PropertyMapping(SerializerModel, ManagedModel): 1279 """User-defined key -> x mapping which can be used by providers to expose extra data.""" 1280 1281 pm_uuid = models.UUIDField(primary_key=True, editable=False, default=uuid4) 1282 name = models.TextField(unique=True) 1283 expression = models.TextField() 1284 1285 objects = InheritanceManager() 1286 1287 @property 1288 def component(self) -> str: 1289 """Return component used to edit this object""" 1290 raise NotImplementedError 1291 1292 @property 1293 def serializer(self) -> type[Serializer]: 1294 """Get serializer for this model""" 1295 raise NotImplementedError 1296 1297 def evaluate(self, user: User | None, request: HttpRequest | None, **kwargs) -> Any: 1298 """Evaluate `self.expression` using `**kwargs` as Context.""" 1299 from authentik.core.expression.evaluator import PropertyMappingEvaluator 1300 1301 evaluator = PropertyMappingEvaluator(self, user, request, **kwargs) 1302 try: 1303 return evaluator.evaluate(self.expression) 1304 except ControlFlowException as exc: 1305 raise exc 1306 except Exception as exc: 1307 raise PropertyMappingExpressionException(exc, self) from exc 1308 1309 def __str__(self): 1310 return f"Property Mapping {self.name}" 1311 1312 class Meta: 1313 verbose_name = _("Property Mapping") 1314 verbose_name_plural = _("Property Mappings") 1315 1316 1317class Session(ExpiringModel, AbstractBaseSession): 1318 """User session with extra fields for fast access""" 1319 1320 # Remove upstream field because we're using our own ExpiringModel 1321 expire_date = None 1322 session_data = models.BinaryField(_("session data")) 1323 1324 # Keep in sync with Session.Keys 1325 last_ip = models.GenericIPAddressField() 1326 last_user_agent = models.TextField(blank=True) 1327 last_used = models.DateTimeField(auto_now=True) 1328 1329 class Meta: 1330 verbose_name = _("Session") 1331 verbose_name_plural = _("Sessions") 1332 indexes = ExpiringModel.Meta.indexes + [ 1333 models.Index(fields=["expires", "session_key"]), 1334 ] 1335 default_permissions = [] 1336 1337 def __str__(self): 1338 return self.session_key 1339 1340 class Keys(StrEnum): 1341 """ 1342 Keys to be set with the session interface for the fields above to be updated. 1343 1344 If a field is added here that needs to be initialized when the session is initialized, 1345 it must also be reflected in authentik.root.middleware.SessionMiddleware.process_request 1346 and in authentik.core.sessions.SessionStore.__init__ 1347 """ 1348 1349 LAST_IP = "last_ip" 1350 LAST_USER_AGENT = "last_user_agent" 1351 LAST_USED = "last_used" 1352 1353 @classmethod 1354 def get_session_store_class(cls): 1355 from authentik.core.sessions import SessionStore 1356 1357 return SessionStore 1358 1359 def get_decoded(self): 1360 raise NotImplementedError 1361 1362 1363class AuthenticatedSession(SerializerModel): 1364 session = models.OneToOneField(Session, on_delete=models.CASCADE, primary_key=True) 1365 # We use the session as primary key, but we need the API to be able to reference 1366 # this object uniquely without exposing the session key 1367 uuid = models.UUIDField(default=uuid4, unique=True) 1368 1369 user = models.ForeignKey(User, on_delete=models.CASCADE) 1370 1371 @property 1372 def serializer(self) -> type[Serializer]: 1373 from authentik.core.api.authenticated_sessions import AuthenticatedSessionSerializer 1374 1375 return AuthenticatedSessionSerializer 1376 1377 class Meta: 1378 verbose_name = _("Authenticated Session") 1379 verbose_name_plural = _("Authenticated Sessions") 1380 1381 def __str__(self) -> str: 1382 return f"Authenticated Session {str(self.pk)[:10]}" 1383 1384 @staticmethod 1385 def from_request(request: HttpRequest, user: User) -> AuthenticatedSession | None: 1386 """Create a new session from a http request""" 1387 if not hasattr(request, "session") or not request.session.exists( 1388 request.session.session_key 1389 ): 1390 return None 1391 return AuthenticatedSession( 1392 session=Session.objects.filter(session_key=request.session.session_key).first(), 1393 user=user, 1394 )
84def managed_role_name(user_or_group: models.Model): 85 if isinstance(user_or_group, User): 86 return f"{MANAGED_ROLE_PREFIX_USER}-{user_or_group.pk}" 87 if isinstance(user_or_group, Group): 88 return f"{MANAGED_ROLE_PREFIX_GROUP}-{user_or_group.pk}" 89 raise TypeError("Managed roles are only available for User or Group.")
92def default_token_duration() -> datetime: 93 """Default duration a Token is valid""" 94 current_tenant = get_current_tenant() 95 token_duration = ( 96 current_tenant.default_token_duration 97 if hasattr(current_tenant, "default_token_duration") 98 else DEFAULT_TOKEN_DURATION 99 ) 100 return now() + timedelta_from_string(token_duration)
Default duration a Token is valid
103def default_token_key() -> str: 104 """Default token key""" 105 current_tenant = get_current_tenant() 106 token_length = ( 107 current_tenant.default_token_length 108 if hasattr(current_tenant, "default_token_length") 109 else DEFAULT_TOKEN_LENGTH 110 ) 111 # We use generate_id since the chars in the key should be easy 112 # to use in Emails (for verification) and URLs (for recovery) 113 return generate_id(token_length)
Default token key
116class UserTypes(models.TextChoices): 117 """User types, both for grouping, licensing and permissions in the case 118 of the internal_service_account""" 119 120 INTERNAL = "internal" 121 EXTERNAL = "external" 122 123 # User-created service accounts 124 SERVICE_ACCOUNT = "service_account" 125 126 # Special user type for internally managed and created service 127 # accounts, such as outpost users 128 INTERNAL_SERVICE_ACCOUNT = "internal_service_account"
User types, both for grouping, licensing and permissions in the case of the internal_service_account
131class AttributesMixin(models.Model): 132 """Adds an attributes property to a model""" 133 134 attributes = models.JSONField(default=dict, blank=True) 135 136 class Meta: 137 abstract = True 138 139 def update_attributes(self, properties: dict[str, Any]): 140 """Update fields and attributes, but correctly by merging dicts""" 141 needs_update = False 142 for key, value in properties.items(): 143 if key == "attributes": 144 continue 145 if getattr(self, key, None) != value: 146 setattr(self, key, value) 147 needs_update = True 148 final_attributes = {} 149 MERGE_LIST_UNIQUE.merge(final_attributes, self.attributes) 150 MERGE_LIST_UNIQUE.merge(final_attributes, properties.get("attributes", {})) 151 if self.attributes != final_attributes: 152 self.attributes = final_attributes 153 needs_update = True 154 if needs_update: 155 self.save() 156 157 @classmethod 158 def update_or_create_attributes( 159 cls, query: dict[str, Any], properties: dict[str, Any] 160 ) -> tuple[Self, bool]: 161 """Same as django's update_or_create but correctly updates attributes by merging dicts""" 162 instance = cls.objects.filter(**query).first() 163 if not instance: 164 return cls.objects.create(**properties), True 165 instance.update_attributes(properties) 166 return instance, False
Adds an attributes property to a model
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
139 def update_attributes(self, properties: dict[str, Any]): 140 """Update fields and attributes, but correctly by merging dicts""" 141 needs_update = False 142 for key, value in properties.items(): 143 if key == "attributes": 144 continue 145 if getattr(self, key, None) != value: 146 setattr(self, key, value) 147 needs_update = True 148 final_attributes = {} 149 MERGE_LIST_UNIQUE.merge(final_attributes, self.attributes) 150 MERGE_LIST_UNIQUE.merge(final_attributes, properties.get("attributes", {})) 151 if self.attributes != final_attributes: 152 self.attributes = final_attributes 153 needs_update = True 154 if needs_update: 155 self.save()
Update fields and attributes, but correctly by merging dicts
157 @classmethod 158 def update_or_create_attributes( 159 cls, query: dict[str, Any], properties: dict[str, Any] 160 ) -> tuple[Self, bool]: 161 """Same as django's update_or_create but correctly updates attributes by merging dicts""" 162 instance = cls.objects.filter(**query).first() 163 if not instance: 164 return cls.objects.create(**properties), True 165 instance.update_attributes(properties) 166 return instance, False
Same as django's update_or_create but correctly updates attributes by merging dicts
169class GroupQuerySet(QuerySet): 170 def with_descendants(self): 171 pks = self.values_list("pk", flat=True) 172 return Group.objects.filter(Q(pk__in=pks) | Q(ancestor_nodes__ancestor__in=pks)).distinct() 173 174 def with_ancestors(self): 175 pks = self.values_list("pk", flat=True) 176 return Group.objects.filter( 177 Q(pk__in=pks) | Q(descendant_nodes__descendant__in=pks) 178 ).distinct()
Represent a lazy database lookup for a set of objects.
181class Group(SerializerModel, AttributesMixin): 182 """Group model which supports a hierarchy and has attributes""" 183 184 group_uuid = models.UUIDField(primary_key=True, editable=False, default=uuid4) 185 186 name = models.TextField(verbose_name=_("name"), unique=True) 187 is_superuser = models.BooleanField( 188 default=False, help_text=_("Users added to this group will be superusers.") 189 ) 190 191 roles = models.ManyToManyField("authentik_rbac.Role", related_name="groups", blank=True) 192 193 parents = models.ManyToManyField( 194 "Group", 195 blank=True, 196 symmetrical=False, 197 through="GroupParentageNode", 198 related_name="children", 199 ) 200 201 objects = GroupQuerySet.as_manager() 202 203 class Meta: 204 indexes = ( 205 models.Index(fields=["name"]), 206 models.Index(fields=["is_superuser"]), 207 ) 208 verbose_name = _("Group") 209 verbose_name_plural = _("Groups") 210 permissions = [ 211 ("add_user_to_group", _("Add user to group")), 212 ("remove_user_from_group", _("Remove user from group")), 213 ("enable_group_superuser", _("Enable superuser status")), 214 ("disable_group_superuser", _("Disable superuser status")), 215 ] 216 217 def __str__(self): 218 return f"Group {self.name}" 219 220 @property 221 def serializer(self) -> Serializer: 222 from authentik.core.api.groups import GroupSerializer 223 224 return GroupSerializer 225 226 @property 227 def num_pk(self) -> int: 228 """Get a numerical, int32 ID for the group""" 229 # int max is 2147483647 (10 digits) so 9 is the max usable 230 # in the LDAP Outpost we use the last 5 chars so match here 231 return int(str(self.pk.int)[:5]) 232 233 def is_member(self, user: User) -> bool: 234 """Recursively check if `user` is member of us, or any parent.""" 235 return user.all_groups().filter(group_uuid=self.group_uuid).exists() 236 237 def all_roles(self) -> QuerySet[Role]: 238 """Get all roles of this group and all of its ancestors.""" 239 return Role.objects.filter( 240 groups__in=Group.objects.filter(pk=self.pk).with_ancestors() 241 ).distinct() 242 243 def get_managed_role(self, create=False): 244 if create: 245 name = managed_role_name(self) 246 role, created = Role.objects.get_or_create(name=name, managed=name) 247 if created: 248 role.groups.add(self) 249 return role 250 else: 251 return Role.objects.filter(name=managed_role_name(self)).first() 252 253 def assign_perms_to_managed_role( 254 self, 255 perms: str | list[str] | Permission | list[Permission], 256 obj: models.Model | None = None, 257 ): 258 if not perms: 259 return 260 role = self.get_managed_role(create=True) 261 role.assign_perms(perms, obj)
Group model which supports a hierarchy and has attributes
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
Accessor to the related objects manager on the forward and reverse sides of a many-to-many relation.
In the example::
class Pizza(Model):
toppings = ManyToManyField(Topping, related_name='pizzas')
Pizza.toppings and Topping.pizzas are ManyToManyDescriptor
instances.
Most of the implementation is delegated to a dynamically defined manager
class built by create_forward_many_to_many_manager() defined below.
Accessor to the related objects manager on the forward and reverse sides of a many-to-many relation.
In the example::
class Pizza(Model):
toppings = ManyToManyField(Topping, related_name='pizzas')
Pizza.toppings and Topping.pizzas are ManyToManyDescriptor
instances.
Most of the implementation is delegated to a dynamically defined manager
class built by create_forward_many_to_many_manager() defined below.
220 @property 221 def serializer(self) -> Serializer: 222 from authentik.core.api.groups import GroupSerializer 223 224 return GroupSerializer
Get serializer for this model
226 @property 227 def num_pk(self) -> int: 228 """Get a numerical, int32 ID for the group""" 229 # int max is 2147483647 (10 digits) so 9 is the max usable 230 # in the LDAP Outpost we use the last 5 chars so match here 231 return int(str(self.pk.int)[:5])
Get a numerical, int32 ID for the group
233 def is_member(self, user: User) -> bool: 234 """Recursively check if `user` is member of us, or any parent.""" 235 return user.all_groups().filter(group_uuid=self.group_uuid).exists()
Recursively check if user is member of us, or any parent.
237 def all_roles(self) -> QuerySet[Role]: 238 """Get all roles of this group and all of its ancestors.""" 239 return Role.objects.filter( 240 groups__in=Group.objects.filter(pk=self.pk).with_ancestors() 241 ).distinct()
Get all roles of this group and all of its ancestors.
243 def get_managed_role(self, create=False): 244 if create: 245 name = managed_role_name(self) 246 role, created = Role.objects.get_or_create(name=name, managed=name) 247 if created: 248 role.groups.add(self) 249 return role 250 else: 251 return Role.objects.filter(name=managed_role_name(self)).first()
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
Accessor to the related objects manager on the reverse side of a many-to-one relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Parent.children is a ReverseManyToOneDescriptor instance.
Most of the implementation is delegated to a dynamically defined manager
class built by create_forward_many_to_many_manager() defined below.
Accessor to the related objects manager on the forward and reverse sides of a many-to-many relation.
In the example::
class Pizza(Model):
toppings = ManyToManyField(Topping, related_name='pizzas')
Pizza.toppings and Topping.pizzas are ManyToManyDescriptor
instances.
Most of the implementation is delegated to a dynamically defined manager
class built by create_forward_many_to_many_manager() defined below.
Accessor to the related objects manager on the reverse side of a many-to-one relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Parent.children is a ReverseManyToOneDescriptor instance.
Most of the implementation is delegated to a dynamically defined manager
class built by create_forward_many_to_many_manager() defined below.
Accessor to the related objects manager on the reverse side of a many-to-one relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Parent.children is a ReverseManyToOneDescriptor instance.
Most of the implementation is delegated to a dynamically defined manager
class built by create_forward_many_to_many_manager() defined below.
Accessor to the related objects manager on the reverse side of a many-to-one relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Parent.children is a ReverseManyToOneDescriptor instance.
Most of the implementation is delegated to a dynamically defined manager
class built by create_forward_many_to_many_manager() defined below.
Accessor to the related objects manager on the reverse side of a many-to-one relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Parent.children is a ReverseManyToOneDescriptor instance.
Most of the implementation is delegated to a dynamically defined manager
class built by create_forward_many_to_many_manager() defined below.
Accessor to the related objects manager on the forward and reverse sides of a many-to-many relation.
In the example::
class Pizza(Model):
toppings = ManyToManyField(Topping, related_name='pizzas')
Pizza.toppings and Topping.pizzas are ManyToManyDescriptor
instances.
Most of the implementation is delegated to a dynamically defined manager
class built by create_forward_many_to_many_manager() defined below.
Accessor to the related objects manager on the reverse side of a many-to-one relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Parent.children is a ReverseManyToOneDescriptor instance.
Most of the implementation is delegated to a dynamically defined manager
class built by create_forward_many_to_many_manager() defined below.
Accessor to the related objects manager on the reverse side of a many-to-one relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Parent.children is a ReverseManyToOneDescriptor instance.
Most of the implementation is delegated to a dynamically defined manager
class built by create_forward_many_to_many_manager() defined below.
Accessor to the related objects manager on the reverse side of a many-to-one relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Parent.children is a ReverseManyToOneDescriptor instance.
Most of the implementation is delegated to a dynamically defined manager
class built by create_forward_many_to_many_manager() defined below.
Accessor to the related objects manager on the forward and reverse sides of a many-to-many relation.
In the example::
class Pizza(Model):
toppings = ManyToManyField(Topping, related_name='pizzas')
Pizza.toppings and Topping.pizzas are ManyToManyDescriptor
instances.
Most of the implementation is delegated to a dynamically defined manager
class built by create_forward_many_to_many_manager() defined below.
Accessor to the related objects manager on the reverse side of a many-to-one relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Parent.children is a ReverseManyToOneDescriptor instance.
Most of the implementation is delegated to a dynamically defined manager
class built by create_forward_many_to_many_manager() defined below.
Accessor to the related objects manager on the reverse side of a many-to-one relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Parent.children is a ReverseManyToOneDescriptor instance.
Most of the implementation is delegated to a dynamically defined manager
class built by create_forward_many_to_many_manager() defined below.
Accessor to the related objects manager on the reverse side of a many-to-one relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Parent.children is a ReverseManyToOneDescriptor instance.
Most of the implementation is delegated to a dynamically defined manager
class built by create_forward_many_to_many_manager() defined below.
Accessor to the related objects manager on the forward and reverse sides of a many-to-many relation.
In the example::
class Pizza(Model):
toppings = ManyToManyField(Topping, related_name='pizzas')
Pizza.toppings and Topping.pizzas are ManyToManyDescriptor
instances.
Most of the implementation is delegated to a dynamically defined manager
class built by create_forward_many_to_many_manager() defined below.
Accessor to the related objects manager on the reverse side of a many-to-one relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Parent.children is a ReverseManyToOneDescriptor instance.
Most of the implementation is delegated to a dynamically defined manager
class built by create_forward_many_to_many_manager() defined below.
Accessor to the related objects manager on the reverse side of a many-to-one relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Parent.children is a ReverseManyToOneDescriptor instance.
Most of the implementation is delegated to a dynamically defined manager
class built by create_forward_many_to_many_manager() defined below.
Accessor to the related objects manager on the reverse side of a many-to-one relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Parent.children is a ReverseManyToOneDescriptor instance.
Most of the implementation is delegated to a dynamically defined manager
class built by create_forward_many_to_many_manager() defined below.
Accessor to the related objects manager on the reverse side of a many-to-one relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Parent.children is a ReverseManyToOneDescriptor instance.
Most of the implementation is delegated to a dynamically defined manager
class built by create_forward_many_to_many_manager() defined below.
The requested object does not exist
The query returned multiple objects when only one was expected.
264class GroupParentageNode(models.Model): 265 uuid = models.UUIDField(primary_key=True, editable=False, default=uuid4) 266 267 child = models.ForeignKey(Group, related_name="parent_nodes", on_delete=models.CASCADE) 268 parent = models.ForeignKey(Group, related_name="child_nodes", on_delete=models.CASCADE) 269 270 class Meta: 271 verbose_name = _("Group Parentage Node") 272 verbose_name_plural = _("Group Parentage Nodes") 273 274 db_table = "authentik_core_groupparentage" 275 276 triggers = [ 277 pgtrigger.Trigger( 278 name="refresh_groupancestry", 279 operation=pgtrigger.Insert | pgtrigger.Update | pgtrigger.Delete, 280 when=pgtrigger.After, 281 func=""" 282 REFRESH MATERIALIZED VIEW CONCURRENTLY authentik_core_groupancestry; 283 RETURN NULL; 284 """, 285 ), 286 ] 287 288 def __str__(self) -> str: 289 return f"Group Parentage Node from #{self.child_id} to {self.parent_id}"
GroupParentageNode(uuid, child, parent)
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Child.parent is a ForwardManyToOneDescriptor instance.
The requested object does not exist
The query returned multiple objects when only one was expected.
292class GroupAncestryNode(PostgresMaterializedViewModel): 293 descendant = models.ForeignKey( 294 Group, related_name="ancestor_nodes", on_delete=models.DO_NOTHING 295 ) 296 ancestor = models.ForeignKey( 297 Group, related_name="descendant_nodes", on_delete=models.DO_NOTHING 298 ) 299 300 class Meta: 301 # This is a transitive closure of authentik_core_groupparentage 302 # See https://en.wikipedia.org/wiki/Transitive_closure#In_graph_theory 303 db_table = "authentik_core_groupancestry" 304 indexes = [ 305 models.Index(fields=["descendant"]), 306 models.Index(fields=["ancestor"]), 307 UniqueIndex(fields=["id"]), 308 ] 309 310 class ViewMeta: 311 query = """ 312 WITH RECURSIVE accumulator AS ( 313 SELECT 314 child_id::text || '-' || parent_id::text as id, 315 child_id AS descendant_id, 316 parent_id AS ancestor_id 317 FROM authentik_core_groupparentage 318 319 UNION 320 321 SELECT 322 accumulator.descendant_id::text || '-' || current.parent_id::text as id, 323 accumulator.descendant_id, 324 current.parent_id AS ancestor_id 325 FROM accumulator 326 JOIN authentik_core_groupparentage current 327 ON accumulator.ancestor_id = current.child_id 328 ) 329 SELECT * FROM accumulator 330 """ 331 332 def __str__(self) -> str: 333 return f"Group Ancestry Node from {self.descendant_id} to {self.ancestor_id}"
GroupAncestryNode(id, descendant, ancestor)
Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Child.parent is a ForwardManyToOneDescriptor instance.
310 class ViewMeta: 311 query = """ 312 WITH RECURSIVE accumulator AS ( 313 SELECT 314 child_id::text || '-' || parent_id::text as id, 315 child_id AS descendant_id, 316 parent_id AS ancestor_id 317 FROM authentik_core_groupparentage 318 319 UNION 320 321 SELECT 322 accumulator.descendant_id::text || '-' || current.parent_id::text as id, 323 accumulator.descendant_id, 324 current.parent_id AS ancestor_id 325 FROM accumulator 326 JOIN authentik_core_groupparentage current 327 ON accumulator.ancestor_id = current.child_id 328 ) 329 SELECT * FROM accumulator 330 """
The requested object does not exist
The query returned multiple objects when only one was expected.
336class UserQuerySet(models.QuerySet): 337 """User queryset""" 338 339 def exclude_anonymous(self): 340 """Exclude anonymous user""" 341 return self.exclude(**{User.USERNAME_FIELD: settings.ANONYMOUS_USER_NAME})
User queryset
344class UserManager(DjangoUserManager): 345 """User manager that doesn't assign is_superuser and is_staff""" 346 347 def get_queryset(self): 348 """Create special user queryset""" 349 return UserQuerySet(self.model, using=self._db) 350 351 def create_user(self, username, email=None, password=None, **extra_fields): 352 """User manager that doesn't assign is_superuser and is_staff""" 353 return self._create_user(username, email, password, **extra_fields) 354 355 def exclude_anonymous(self) -> QuerySet: 356 """Exclude anonymous user""" 357 return self.get_queryset().exclude_anonymous()
User manager that doesn't assign is_superuser and is_staff
347 def get_queryset(self): 348 """Create special user queryset""" 349 return UserQuerySet(self.model, using=self._db)
Create special user queryset
351 def create_user(self, username, email=None, password=None, **extra_fields): 352 """User manager that doesn't assign is_superuser and is_staff""" 353 return self._create_user(username, email, password, **extra_fields)
User manager that doesn't assign is_superuser and is_staff
360class User(SerializerModel, AttributesMixin, AbstractUser): 361 """authentik User model, based on django's contrib auth user model.""" 362 363 # Overwriting PermissionsMixin: permissions are handled by roles. 364 # (This knowingly violates the Liskov substitution principle. It is better to fail loudly.) 365 user_permissions = None 366 367 uuid = models.UUIDField(default=uuid4, editable=False, unique=True) 368 name = models.TextField(help_text=_("User's display name.")) 369 path = models.TextField(default="users") 370 type = models.TextField(choices=UserTypes.choices, default=UserTypes.INTERNAL) 371 372 sources = models.ManyToManyField("Source", through="UserSourceConnection") 373 groups = models.ManyToManyField("Group", related_name="users") 374 roles = models.ManyToManyField("authentik_rbac.Role", related_name="users", blank=True) 375 password_change_date = models.DateTimeField(auto_now_add=True) 376 377 last_updated = models.DateTimeField(auto_now=True) 378 379 objects = UserManager() 380 381 class Meta: 382 verbose_name = _("User") 383 verbose_name_plural = _("Users") 384 permissions = [ 385 ("reset_user_password", _("Reset Password")), 386 ("impersonate", _("Can impersonate other users")), 387 ("preview_user", _("Can preview user data sent to providers")), 388 ("view_user_applications", _("View applications the user has access to")), 389 ] 390 indexes = [ 391 models.Index(fields=["last_login"]), 392 models.Index(fields=["password_change_date"]), 393 models.Index(fields=["uuid"]), 394 models.Index(fields=["path"]), 395 models.Index(fields=["type"]), 396 models.Index(fields=["date_joined"]), 397 models.Index(fields=["last_updated"]), 398 ] 399 400 def __str__(self): 401 return self.username 402 403 @staticmethod 404 def default_path() -> str: 405 """Get the default user path""" 406 return User._meta.get_field("path").default 407 408 def all_groups(self) -> QuerySet[Group]: 409 """Recursively get all groups this user is a member of.""" 410 return self.groups.all().with_ancestors() 411 412 def all_roles(self) -> QuerySet[Role]: 413 """Get all roles of this user and all of its groups (recursively).""" 414 return Role.objects.filter(Q(users=self) | Q(groups__in=self.all_groups())).distinct() 415 416 def get_managed_role(self, create=False): 417 if create: 418 name = managed_role_name(self) 419 role, created = Role.objects.get_or_create(name=name, managed=name) 420 if created: 421 role.users.add(self) 422 return role 423 else: 424 return Role.objects.filter(name=managed_role_name(self)).first() 425 426 def get_all_model_perms_on_managed_role(self) -> QuerySet[RoleModelPermission]: 427 role = self.get_managed_role() 428 if not role: 429 return RoleModelPermission.objects.none() 430 return RoleModelPermission.objects.filter(role=role) 431 432 def get_all_obj_perms_on_managed_role(self) -> QuerySet[RoleObjectPermission]: 433 role = self.get_managed_role() 434 if not role: 435 return RoleObjectPermission.objects.none() 436 return RoleObjectPermission.objects.filter(role=role) 437 438 def assign_perms_to_managed_role( 439 self, 440 perms: str | list[str] | Permission | list[Permission], 441 obj: models.Model | None = None, 442 ): 443 if not perms: 444 return 445 role = self.get_managed_role(create=True) 446 role.assign_perms(perms, obj) 447 448 def remove_perms_from_managed_role( 449 self, 450 perms: str | list[str] | Permission | list[Permission], 451 obj: models.Model | None = None, 452 ): 453 role = self.get_managed_role() 454 if not role: 455 return None 456 role.remove_perms(perms, obj) 457 458 def remove_all_perms_from_managed_role(self): 459 role = self.get_managed_role() 460 if not role: 461 return None 462 RoleModelPermission.objects.filter(role=role).delete() 463 RoleObjectPermission.objects.filter(role=role).delete() 464 465 def group_attributes(self, request: HttpRequest | None = None) -> dict[str, Any]: 466 """Get a dictionary containing the attributes from all groups the user belongs to, 467 including the users attributes""" 468 final_attributes = {} 469 if request and hasattr(request, "brand"): 470 always_merger.merge(final_attributes, request.brand.attributes) 471 for group in self.all_groups().order_by("name"): 472 always_merger.merge(final_attributes, group.attributes) 473 always_merger.merge(final_attributes, self.attributes) 474 return final_attributes 475 476 def app_entitlements(self, app: Application | None) -> QuerySet[ApplicationEntitlement]: 477 """Get all entitlements this user has for `app`.""" 478 if not app: 479 return [] 480 all_groups = self.all_groups() 481 qs = app.applicationentitlement_set.filter( 482 Q( 483 Q(bindings__user=self) | Q(bindings__group__in=all_groups), 484 bindings__negate=False, 485 ) 486 | Q( 487 Q(~Q(bindings__user=self), bindings__user__isnull=False) 488 | Q(~Q(bindings__group__in=all_groups), bindings__group__isnull=False), 489 bindings__negate=True, 490 ), 491 bindings__enabled=True, 492 ).order_by("name") 493 return qs 494 495 def app_entitlements_attributes(self, app: Application | None) -> dict: 496 """Get a dictionary containing all merged attributes from app entitlements for `app`.""" 497 final_attributes = {} 498 for attrs in self.app_entitlements(app).values_list("attributes", flat=True): 499 always_merger.merge(final_attributes, attrs) 500 return final_attributes 501 502 @property 503 def serializer(self) -> Serializer: 504 from authentik.core.api.users import UserSerializer 505 506 return UserSerializer 507 508 @cached_property 509 def is_superuser(self) -> bool: 510 """Get supseruser status based on membership in a group with superuser status""" 511 return self.all_groups().filter(is_superuser=True).exists() 512 513 @property 514 def is_staff(self) -> bool: 515 """superuser == staff user""" 516 return self.is_superuser # type: ignore 517 518 # TODO: remove this after 2026. 519 @property 520 def ak_groups(self): 521 """This is a proxy for a renamed, deprecated field.""" 522 from authentik.events.models import Event 523 524 deprecation = "authentik.core.models.User.ak_groups" 525 replacement = "authentik.core.models.User.groups" 526 message_logger = ( 527 f"{deprecation} is deprecated and will be removed in a future version of " 528 f"authentik. Please use {replacement} instead." 529 ) 530 message_event = ( 531 f"{message_logger} This event will not be repeated until it expires (by " 532 "default: in 30 days). See authentik logs for every will invocation of this " 533 "deprecation." 534 ) 535 stacktrace = traceback.format_stack() 536 # The last line is this function, the next-to-last line is its caller 537 cause = stacktrace[-2] if len(stacktrace) > 1 else "Unknown, see stacktrace in logs" 538 if search := re.search(r'"(.*?)"', cause): 539 cause = f"Property mapping or Expression policy named {search.group(1)}" 540 541 LOGGER.warning( 542 "deprecation used", 543 message=message_logger, 544 deprecation=deprecation, 545 replacement=replacement, 546 cause=cause, 547 stacktrace=stacktrace, 548 ) 549 Event.log_deprecation( 550 deprecation, message=message_event, cause=cause, replacement=replacement 551 ) 552 return self.groups 553 554 def set_password(self, raw_password, signal=True, sender=None, request=None): 555 if self.pk and signal: 556 from authentik.core.signals import password_changed 557 558 if not sender: 559 sender = self 560 password_changed.send(sender=sender, user=self, password=raw_password, request=request) 561 self.password_change_date = now() 562 return super().set_password(raw_password) 563 564 @staticmethod 565 def validate_password_hash(password_hash: str): 566 """Validate that the value is a recognized Django password hash.""" 567 identify_hasher(password_hash) # Raises ValueError if invalid 568 569 def set_password_from_hash(self, password_hash: str, signal=True, sender=None, request=None): 570 """Set password directly from a pre-hashed value. 571 572 Unlike set_password(), this does not hash the input again. The provided value 573 must already be a valid Django password hash, and it is stored directly on the 574 user after validation. 575 576 Because no raw password is available, downstream password sync integrations 577 such as LDAP and Kerberos cannot be updated from this code path. 578 579 Raises ValueError if the hash format is not recognized. 580 """ 581 self.validate_password_hash(password_hash) 582 if self.pk and signal: 583 from authentik.core.signals import password_hash_changed 584 585 if not sender: 586 sender = self 587 password_hash_changed.send(sender=sender, user=self, request=request) 588 self.password = password_hash 589 self.password_change_date = now() 590 591 def check_password(self, raw_password: str) -> bool: 592 """ 593 Return a boolean of whether the raw_password was correct. Handles 594 hashing formats behind the scenes. 595 596 Slightly changed version which doesn't send a signal for such internal hash upgrades 597 """ 598 599 def setter(raw_password): 600 self.set_password(raw_password, signal=False) 601 # Password hash upgrades shouldn't be considered password changes. 602 self._password = None 603 self.save(update_fields=["password"]) 604 605 return check_password(raw_password, self.password, setter) 606 607 @property 608 def uid(self) -> str: 609 """Generate a globally unique UID, based on the user ID and the hashed secret key""" 610 return sha256(f"{self.id}-{get_unique_identifier()}".encode("ascii")).hexdigest() 611 612 def locale(self, request: HttpRequest | None = None) -> str: 613 """Get the locale the user has configured""" 614 if request and hasattr(request, "LANGUAGE_CODE"): 615 return request.LANGUAGE_CODE 616 try: 617 return self.attributes.get("settings", {}).get("locale", "") 618 619 except Exception as exc: # noqa 620 LOGGER.warning("Failed to get default locale", exc=exc) 621 if request: 622 return request.brand.locale 623 return "" 624 625 @property 626 def avatar(self) -> str: 627 """Get avatar, depending on authentik.avatar setting""" 628 return get_avatar(self)
authentik User model, based on django's contrib auth user model.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
Accessor to the related objects manager on the forward and reverse sides of a many-to-many relation.
In the example::
class Pizza(Model):
toppings = ManyToManyField(Topping, related_name='pizzas')
Pizza.toppings and Topping.pizzas are ManyToManyDescriptor
instances.
Most of the implementation is delegated to a dynamically defined manager
class built by create_forward_many_to_many_manager() defined below.
Accessor to the related objects manager on the forward and reverse sides of a many-to-many relation.
In the example::
class Pizza(Model):
toppings = ManyToManyField(Topping, related_name='pizzas')
Pizza.toppings and Topping.pizzas are ManyToManyDescriptor
instances.
Most of the implementation is delegated to a dynamically defined manager
class built by create_forward_many_to_many_manager() defined below.
Accessor to the related objects manager on the forward and reverse sides of a many-to-many relation.
In the example::
class Pizza(Model):
toppings = ManyToManyField(Topping, related_name='pizzas')
Pizza.toppings and Topping.pizzas are ManyToManyDescriptor
instances.
Most of the implementation is delegated to a dynamically defined manager
class built by create_forward_many_to_many_manager() defined below.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
403 @staticmethod 404 def default_path() -> str: 405 """Get the default user path""" 406 return User._meta.get_field("path").default
Get the default user path
408 def all_groups(self) -> QuerySet[Group]: 409 """Recursively get all groups this user is a member of.""" 410 return self.groups.all().with_ancestors()
Recursively get all groups this user is a member of.
412 def all_roles(self) -> QuerySet[Role]: 413 """Get all roles of this user and all of its groups (recursively).""" 414 return Role.objects.filter(Q(users=self) | Q(groups__in=self.all_groups())).distinct()
Get all roles of this user and all of its groups (recursively).
416 def get_managed_role(self, create=False): 417 if create: 418 name = managed_role_name(self) 419 role, created = Role.objects.get_or_create(name=name, managed=name) 420 if created: 421 role.users.add(self) 422 return role 423 else: 424 return Role.objects.filter(name=managed_role_name(self)).first()
465 def group_attributes(self, request: HttpRequest | None = None) -> dict[str, Any]: 466 """Get a dictionary containing the attributes from all groups the user belongs to, 467 including the users attributes""" 468 final_attributes = {} 469 if request and hasattr(request, "brand"): 470 always_merger.merge(final_attributes, request.brand.attributes) 471 for group in self.all_groups().order_by("name"): 472 always_merger.merge(final_attributes, group.attributes) 473 always_merger.merge(final_attributes, self.attributes) 474 return final_attributes
Get a dictionary containing the attributes from all groups the user belongs to, including the users attributes
476 def app_entitlements(self, app: Application | None) -> QuerySet[ApplicationEntitlement]: 477 """Get all entitlements this user has for `app`.""" 478 if not app: 479 return [] 480 all_groups = self.all_groups() 481 qs = app.applicationentitlement_set.filter( 482 Q( 483 Q(bindings__user=self) | Q(bindings__group__in=all_groups), 484 bindings__negate=False, 485 ) 486 | Q( 487 Q(~Q(bindings__user=self), bindings__user__isnull=False) 488 | Q(~Q(bindings__group__in=all_groups), bindings__group__isnull=False), 489 bindings__negate=True, 490 ), 491 bindings__enabled=True, 492 ).order_by("name") 493 return qs
Get all entitlements this user has for app.
495 def app_entitlements_attributes(self, app: Application | None) -> dict: 496 """Get a dictionary containing all merged attributes from app entitlements for `app`.""" 497 final_attributes = {} 498 for attrs in self.app_entitlements(app).values_list("attributes", flat=True): 499 always_merger.merge(final_attributes, attrs) 500 return final_attributes
Get a dictionary containing all merged attributes from app entitlements for app.
502 @property 503 def serializer(self) -> Serializer: 504 from authentik.core.api.users import UserSerializer 505 506 return UserSerializer
Get serializer for this model
Get supseruser status based on membership in a group with superuser status
513 @property 514 def is_staff(self) -> bool: 515 """superuser == staff user""" 516 return self.is_superuser # type: ignore
superuser == staff user
519 @property 520 def ak_groups(self): 521 """This is a proxy for a renamed, deprecated field.""" 522 from authentik.events.models import Event 523 524 deprecation = "authentik.core.models.User.ak_groups" 525 replacement = "authentik.core.models.User.groups" 526 message_logger = ( 527 f"{deprecation} is deprecated and will be removed in a future version of " 528 f"authentik. Please use {replacement} instead." 529 ) 530 message_event = ( 531 f"{message_logger} This event will not be repeated until it expires (by " 532 "default: in 30 days). See authentik logs for every will invocation of this " 533 "deprecation." 534 ) 535 stacktrace = traceback.format_stack() 536 # The last line is this function, the next-to-last line is its caller 537 cause = stacktrace[-2] if len(stacktrace) > 1 else "Unknown, see stacktrace in logs" 538 if search := re.search(r'"(.*?)"', cause): 539 cause = f"Property mapping or Expression policy named {search.group(1)}" 540 541 LOGGER.warning( 542 "deprecation used", 543 message=message_logger, 544 deprecation=deprecation, 545 replacement=replacement, 546 cause=cause, 547 stacktrace=stacktrace, 548 ) 549 Event.log_deprecation( 550 deprecation, message=message_event, cause=cause, replacement=replacement 551 ) 552 return self.groups
This is a proxy for a renamed, deprecated field.
554 def set_password(self, raw_password, signal=True, sender=None, request=None): 555 if self.pk and signal: 556 from authentik.core.signals import password_changed 557 558 if not sender: 559 sender = self 560 password_changed.send(sender=sender, user=self, password=raw_password, request=request) 561 self.password_change_date = now() 562 return super().set_password(raw_password)
564 @staticmethod 565 def validate_password_hash(password_hash: str): 566 """Validate that the value is a recognized Django password hash.""" 567 identify_hasher(password_hash) # Raises ValueError if invalid
Validate that the value is a recognized Django password hash.
569 def set_password_from_hash(self, password_hash: str, signal=True, sender=None, request=None): 570 """Set password directly from a pre-hashed value. 571 572 Unlike set_password(), this does not hash the input again. The provided value 573 must already be a valid Django password hash, and it is stored directly on the 574 user after validation. 575 576 Because no raw password is available, downstream password sync integrations 577 such as LDAP and Kerberos cannot be updated from this code path. 578 579 Raises ValueError if the hash format is not recognized. 580 """ 581 self.validate_password_hash(password_hash) 582 if self.pk and signal: 583 from authentik.core.signals import password_hash_changed 584 585 if not sender: 586 sender = self 587 password_hash_changed.send(sender=sender, user=self, request=request) 588 self.password = password_hash 589 self.password_change_date = now()
Set password directly from a pre-hashed value.
Unlike set_password(), this does not hash the input again. The provided value must already be a valid Django password hash, and it is stored directly on the user after validation.
Because no raw password is available, downstream password sync integrations such as LDAP and Kerberos cannot be updated from this code path.
Raises ValueError if the hash format is not recognized.
591 def check_password(self, raw_password: str) -> bool: 592 """ 593 Return a boolean of whether the raw_password was correct. Handles 594 hashing formats behind the scenes. 595 596 Slightly changed version which doesn't send a signal for such internal hash upgrades 597 """ 598 599 def setter(raw_password): 600 self.set_password(raw_password, signal=False) 601 # Password hash upgrades shouldn't be considered password changes. 602 self._password = None 603 self.save(update_fields=["password"]) 604 605 return check_password(raw_password, self.password, setter)
Return a boolean of whether the raw_password was correct. Handles hashing formats behind the scenes.
Slightly changed version which doesn't send a signal for such internal hash upgrades
607 @property 608 def uid(self) -> str: 609 """Generate a globally unique UID, based on the user ID and the hashed secret key""" 610 return sha256(f"{self.id}-{get_unique_identifier()}".encode("ascii")).hexdigest()
Generate a globally unique UID, based on the user ID and the hashed secret key
612 def locale(self, request: HttpRequest | None = None) -> str: 613 """Get the locale the user has configured""" 614 if request and hasattr(request, "LANGUAGE_CODE"): 615 return request.LANGUAGE_CODE 616 try: 617 return self.attributes.get("settings", {}).get("locale", "") 618 619 except Exception as exc: # noqa 620 LOGGER.warning("Failed to get default locale", exc=exc) 621 if request: 622 return request.brand.locale 623 return ""
Get the locale the user has configured
625 @property 626 def avatar(self) -> str: 627 """Get avatar, depending on authentik.avatar setting""" 628 return get_avatar(self)
Get avatar, depending on authentik.avatar setting
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
Method descriptor with partial application of the given arguments and keywords.
Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.
Method descriptor with partial application of the given arguments and keywords.
Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.
Method descriptor with partial application of the given arguments and keywords.
Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.
Method descriptor with partial application of the given arguments and keywords.
Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.
Method descriptor with partial application of the given arguments and keywords.
Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.
Method descriptor with partial application of the given arguments and keywords.
Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.
Method descriptor with partial application of the given arguments and keywords.
Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
Accessor to the related objects manager on the reverse side of a many-to-one relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Parent.children is a ReverseManyToOneDescriptor instance.
Most of the implementation is delegated to a dynamically defined manager
class built by create_forward_many_to_many_manager() defined below.
Accessor to the related objects manager on the reverse side of a many-to-one relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Parent.children is a ReverseManyToOneDescriptor instance.
Most of the implementation is delegated to a dynamically defined manager
class built by create_forward_many_to_many_manager() defined below.
Accessor to the related objects manager on the reverse side of a many-to-one relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Parent.children is a ReverseManyToOneDescriptor instance.
Most of the implementation is delegated to a dynamically defined manager
class built by create_forward_many_to_many_manager() defined below.
Accessor to the related objects manager on the reverse side of a many-to-one relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Parent.children is a ReverseManyToOneDescriptor instance.
Most of the implementation is delegated to a dynamically defined manager
class built by create_forward_many_to_many_manager() defined below.
Accessor to the related objects manager on the reverse side of a many-to-one relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Parent.children is a ReverseManyToOneDescriptor instance.
Most of the implementation is delegated to a dynamically defined manager
class built by create_forward_many_to_many_manager() defined below.
Accessor to the related objects manager on the reverse side of a many-to-one relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Parent.children is a ReverseManyToOneDescriptor instance.
Most of the implementation is delegated to a dynamically defined manager
class built by create_forward_many_to_many_manager() defined below.
Accessor to the related objects manager on the reverse side of a many-to-one relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Parent.children is a ReverseManyToOneDescriptor instance.
Most of the implementation is delegated to a dynamically defined manager
class built by create_forward_many_to_many_manager() defined below.
Accessor to the related objects manager on the reverse side of a many-to-one relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Parent.children is a ReverseManyToOneDescriptor instance.
Most of the implementation is delegated to a dynamically defined manager
class built by create_forward_many_to_many_manager() defined below.
Accessor to the related objects manager on the reverse side of a many-to-one relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Parent.children is a ReverseManyToOneDescriptor instance.
Most of the implementation is delegated to a dynamically defined manager
class built by create_forward_many_to_many_manager() defined below.
Accessor to the related objects manager on the reverse side of a many-to-one relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Parent.children is a ReverseManyToOneDescriptor instance.
Most of the implementation is delegated to a dynamically defined manager
class built by create_forward_many_to_many_manager() defined below.
Accessor to the related objects manager on the reverse side of a many-to-one relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Parent.children is a ReverseManyToOneDescriptor instance.
Most of the implementation is delegated to a dynamically defined manager
class built by create_forward_many_to_many_manager() defined below.
Accessor to the related objects manager on the reverse side of a many-to-one relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Parent.children is a ReverseManyToOneDescriptor instance.
Most of the implementation is delegated to a dynamically defined manager
class built by create_forward_many_to_many_manager() defined below.
Accessor to the related objects manager on the reverse side of a many-to-one relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Parent.children is a ReverseManyToOneDescriptor instance.
Most of the implementation is delegated to a dynamically defined manager
class built by create_forward_many_to_many_manager() defined below.
Accessor to the related objects manager on the reverse side of a many-to-one relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Parent.children is a ReverseManyToOneDescriptor instance.
Most of the implementation is delegated to a dynamically defined manager
class built by create_forward_many_to_many_manager() defined below.
Accessor to the related objects manager on the reverse side of a many-to-one relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Parent.children is a ReverseManyToOneDescriptor instance.
Most of the implementation is delegated to a dynamically defined manager
class built by create_forward_many_to_many_manager() defined below.
Accessor to the related objects manager on the reverse side of a many-to-one relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Parent.children is a ReverseManyToOneDescriptor instance.
Most of the implementation is delegated to a dynamically defined manager
class built by create_forward_many_to_many_manager() defined below.
Accessor to the related objects manager on the reverse side of a many-to-one relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Parent.children is a ReverseManyToOneDescriptor instance.
Most of the implementation is delegated to a dynamically defined manager
class built by create_forward_many_to_many_manager() defined below.
Accessor to the related objects manager on the reverse side of a many-to-one relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Parent.children is a ReverseManyToOneDescriptor instance.
Most of the implementation is delegated to a dynamically defined manager
class built by create_forward_many_to_many_manager() defined below.
Accessor to the related objects manager on the reverse side of a many-to-one relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Parent.children is a ReverseManyToOneDescriptor instance.
Most of the implementation is delegated to a dynamically defined manager
class built by create_forward_many_to_many_manager() defined below.
Accessor to the related objects manager on the reverse side of a many-to-one relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Parent.children is a ReverseManyToOneDescriptor instance.
Most of the implementation is delegated to a dynamically defined manager
class built by create_forward_many_to_many_manager() defined below.
Accessor to the related objects manager on the reverse side of a many-to-one relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Parent.children is a ReverseManyToOneDescriptor instance.
Most of the implementation is delegated to a dynamically defined manager
class built by create_forward_many_to_many_manager() defined below.
Accessor to the related objects manager on the forward and reverse sides of a many-to-many relation.
In the example::
class Pizza(Model):
toppings = ManyToManyField(Topping, related_name='pizzas')
Pizza.toppings and Topping.pizzas are ManyToManyDescriptor
instances.
Most of the implementation is delegated to a dynamically defined manager
class built by create_forward_many_to_many_manager() defined below.
Accessor to the related objects manager on the reverse side of a many-to-one relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Parent.children is a ReverseManyToOneDescriptor instance.
Most of the implementation is delegated to a dynamically defined manager
class built by create_forward_many_to_many_manager() defined below.
Accessor to the related objects manager on the reverse side of a many-to-one relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Parent.children is a ReverseManyToOneDescriptor instance.
Most of the implementation is delegated to a dynamically defined manager
class built by create_forward_many_to_many_manager() defined below.
Accessor to the related objects manager on the reverse side of a many-to-one relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Parent.children is a ReverseManyToOneDescriptor instance.
Most of the implementation is delegated to a dynamically defined manager
class built by create_forward_many_to_many_manager() defined below.
Accessor to the related objects manager on the reverse side of a many-to-one relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Parent.children is a ReverseManyToOneDescriptor instance.
Most of the implementation is delegated to a dynamically defined manager
class built by create_forward_many_to_many_manager() defined below.
Accessor to the related objects manager on the reverse side of a many-to-one relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Parent.children is a ReverseManyToOneDescriptor instance.
Most of the implementation is delegated to a dynamically defined manager
class built by create_forward_many_to_many_manager() defined below.
Accessor to the related objects manager on the reverse side of a many-to-one relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Parent.children is a ReverseManyToOneDescriptor instance.
Most of the implementation is delegated to a dynamically defined manager
class built by create_forward_many_to_many_manager() defined below.
The requested object does not exist
The query returned multiple objects when only one was expected.
631class Provider(SerializerModel): 632 """Application-independent Provider instance. For example SAML2 Remote, OAuth2 Application""" 633 634 name = models.TextField(unique=True) 635 636 authentication_flow = models.ForeignKey( 637 "authentik_flows.Flow", 638 null=True, 639 on_delete=models.SET_NULL, 640 help_text=_( 641 "Flow used for authentication when the associated application is accessed by an " 642 "un-authenticated user." 643 ), 644 related_name="provider_authentication", 645 ) 646 authorization_flow = models.ForeignKey( 647 "authentik_flows.Flow", 648 # Set to cascade even though null is allowed, since most providers 649 # still require an authorization flow set 650 on_delete=models.CASCADE, 651 null=True, 652 help_text=_("Flow used when authorizing this provider."), 653 related_name="provider_authorization", 654 ) 655 invalidation_flow = models.ForeignKey( 656 "authentik_flows.Flow", 657 on_delete=models.SET_DEFAULT, 658 default=None, 659 null=True, 660 help_text=_("Flow used ending the session from a provider."), 661 related_name="provider_invalidation", 662 ) 663 664 property_mappings = models.ManyToManyField("PropertyMapping", default=None, blank=True) 665 666 backchannel_application = models.ForeignKey( 667 "Application", 668 default=None, 669 null=True, 670 on_delete=models.CASCADE, 671 help_text=_( 672 "Accessed from applications; optional backchannel providers for protocols " 673 "like LDAP and SCIM." 674 ), 675 related_name="backchannel_providers", 676 ) 677 678 is_backchannel = models.BooleanField(default=False) 679 680 objects = InheritanceManager() 681 682 @property 683 def launch_url(self) -> str | None: 684 """URL to this provider and initiate authorization for the user. 685 Can return None for providers that are not URL-based""" 686 return None 687 688 @property 689 def icon_url(self) -> str | None: 690 return None 691 692 @property 693 def component(self) -> str: 694 """Return component used to edit this object""" 695 raise NotImplementedError 696 697 @property 698 def serializer(self) -> type[Serializer]: 699 """Get serializer for this model""" 700 raise NotImplementedError 701 702 def __str__(self): 703 return str(self.name)
Application-independent Provider instance. For example SAML2 Remote, OAuth2 Application
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Child.parent is a ForwardManyToOneDescriptor instance.
Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Child.parent is a ForwardManyToOneDescriptor instance.
Accessor to the related objects manager on the forward and reverse sides of a many-to-many relation.
In the example::
class Pizza(Model):
toppings = ManyToManyField(Topping, related_name='pizzas')
Pizza.toppings and Topping.pizzas are ManyToManyDescriptor
instances.
Most of the implementation is delegated to a dynamically defined manager
class built by create_forward_many_to_many_manager() defined below.
Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Child.parent is a ForwardManyToOneDescriptor instance.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
682 @property 683 def launch_url(self) -> str | None: 684 """URL to this provider and initiate authorization for the user. 685 Can return None for providers that are not URL-based""" 686 return None
URL to this provider and initiate authorization for the user. Can return None for providers that are not URL-based
692 @property 693 def component(self) -> str: 694 """Return component used to edit this object""" 695 raise NotImplementedError
Return component used to edit this object
697 @property 698 def serializer(self) -> type[Serializer]: 699 """Get serializer for this model""" 700 raise NotImplementedError
Get serializer for this model
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
Accessor to the related object on the reverse side of a one-to-one relation.
In the example::
class Restaurant(Model):
place = OneToOneField(Place, related_name='restaurant')
Place.restaurant is a ReverseOneToOneDescriptor instance.
Accessor to the related objects manager on the forward and reverse sides of a many-to-many relation.
In the example::
class Pizza(Model):
toppings = ManyToManyField(Topping, related_name='pizzas')
Pizza.toppings and Topping.pizzas are ManyToManyDescriptor
instances.
Most of the implementation is delegated to a dynamically defined manager
class built by create_forward_many_to_many_manager() defined below.
Accessor to the related object on the reverse side of a one-to-one relation.
In the example::
class Restaurant(Model):
place = OneToOneField(Place, related_name='restaurant')
Place.restaurant is a ReverseOneToOneDescriptor instance.
Accessor to the related object on the reverse side of a one-to-one relation.
In the example::
class Restaurant(Model):
place = OneToOneField(Place, related_name='restaurant')
Place.restaurant is a ReverseOneToOneDescriptor instance.
Accessor to the related object on the reverse side of a one-to-one relation.
In the example::
class Restaurant(Model):
place = OneToOneField(Place, related_name='restaurant')
Place.restaurant is a ReverseOneToOneDescriptor instance.
Accessor to the related object on the reverse side of a one-to-one relation.
In the example::
class Restaurant(Model):
place = OneToOneField(Place, related_name='restaurant')
Place.restaurant is a ReverseOneToOneDescriptor instance.
Accessor to the related object on the reverse side of a one-to-one relation.
In the example::
class Restaurant(Model):
place = OneToOneField(Place, related_name='restaurant')
Place.restaurant is a ReverseOneToOneDescriptor instance.
Accessor to the related object on the reverse side of a one-to-one relation.
In the example::
class Restaurant(Model):
place = OneToOneField(Place, related_name='restaurant')
Place.restaurant is a ReverseOneToOneDescriptor instance.
Accessor to the related object on the reverse side of a one-to-one relation.
In the example::
class Restaurant(Model):
place = OneToOneField(Place, related_name='restaurant')
Place.restaurant is a ReverseOneToOneDescriptor instance.
Accessor to the related object on the reverse side of a one-to-one relation.
In the example::
class Restaurant(Model):
place = OneToOneField(Place, related_name='restaurant')
Place.restaurant is a ReverseOneToOneDescriptor instance.
Accessor to the related object on the reverse side of a one-to-one relation.
In the example::
class Restaurant(Model):
place = OneToOneField(Place, related_name='restaurant')
Place.restaurant is a ReverseOneToOneDescriptor instance.
Inherited Members
The requested object does not exist
The query returned multiple objects when only one was expected.
706class BackchannelProvider(Provider): 707 """Base class for providers that augment other providers, for example LDAP and SCIM. 708 Multiple of these providers can be configured per application, they may not use the application 709 slug in URLs as an application may have multiple instances of the same 710 type of Backchannel provider 711 712 They can use the application's policies and metadata""" 713 714 @property 715 def component(self) -> str: 716 raise NotImplementedError 717 718 @property 719 def serializer(self) -> type[Serializer]: 720 raise NotImplementedError 721 722 class Meta: 723 abstract = True
Base class for providers that augment other providers, for example LDAP and SCIM. Multiple of these providers can be configured per application, they may not use the application slug in URLs as an application may have multiple instances of the same type of Backchannel provider
They can use the application's policies and metadata
Accessor to the related object on the forward side of a one-to-one relation.
In the example::
class Restaurant(Model):
place = OneToOneField(Place, related_name='restaurant')
Restaurant.place is a ForwardOneToOneDescriptor instance.
Inherited Members
- Provider
- name
- authentication_flow
- invalidation_flow
- property_mappings
- backchannel_application
- is_backchannel
- objects
- launch_url
- icon_url
- DoesNotExist
- MultipleObjectsReturned
- authentication_flow_id
- invalidation_flow_id
- backchannel_application_id
- id
- application
- outpost_set
- oauth2provider
- ldapprovider
- racprovider
- radiusprovider
- samlprovider
- scimprovider
- googleworkspaceprovider
- microsoftentraprovider
- ssfprovider
726class ApplicationQuerySet(QuerySet): 727 def with_provider(self) -> QuerySet[Application]: 728 qs = self.select_related("provider") 729 for subclass in Provider.objects.get_queryset()._get_subclasses_recurse(Provider): 730 qs = qs.select_related(f"provider__{subclass}") 731 # Also prefetch/select through each subclass path to ensure casted instances have access 732 qs = qs.prefetch_related(f"provider__{subclass}__property_mappings") 733 qs = qs.select_related(f"provider__{subclass}__application") 734 qs = qs.select_related(f"provider__{subclass}__backchannel_application") 735 return qs
Represent a lazy database lookup for a set of objects.
727 def with_provider(self) -> QuerySet[Application]: 728 qs = self.select_related("provider") 729 for subclass in Provider.objects.get_queryset()._get_subclasses_recurse(Provider): 730 qs = qs.select_related(f"provider__{subclass}") 731 # Also prefetch/select through each subclass path to ensure casted instances have access 732 qs = qs.prefetch_related(f"provider__{subclass}__property_mappings") 733 qs = qs.select_related(f"provider__{subclass}__application") 734 qs = qs.select_related(f"provider__{subclass}__backchannel_application") 735 return qs
738class Application(SerializerModel, PolicyBindingModel): 739 """Every Application which uses authentik for authentication/identification/authorization 740 needs an Application record. Other authentication types can subclass this Model to 741 add custom fields and other properties""" 742 743 name = models.TextField(help_text=_("Application's display Name.")) 744 slug = models.TextField( 745 validators=[validate_slug], 746 help_text=_("Internal application name, used in URLs."), 747 unique=True, 748 ) 749 group = models.TextField(blank=True, default="") 750 751 provider = models.OneToOneField( 752 "Provider", null=True, blank=True, default=None, on_delete=models.SET_DEFAULT 753 ) 754 755 meta_launch_url = models.TextField( 756 default="", blank=True, validators=[DomainlessFormattedURLValidator()] 757 ) 758 759 open_in_new_tab = models.BooleanField( 760 default=False, help_text=_("Open launch URL in a new browser tab or window.") 761 ) 762 763 meta_icon = FileField(default="", blank=True) 764 meta_description = models.TextField(default="", blank=True) 765 meta_publisher = models.TextField(default="", blank=True) 766 meta_hide = models.BooleanField( 767 default=False, help_text=_("Hide this application from the user's My applications page.") 768 ) 769 770 objects = ApplicationQuerySet.as_manager() 771 772 # Reserved slugs that would clash with OAuth2 provider endpoints 773 reserved_slugs = ["authorize", "token", "device", "userinfo", "introspect", "revoke"] 774 775 @property 776 def serializer(self) -> Serializer: 777 from authentik.core.api.applications import ApplicationSerializer 778 779 return ApplicationSerializer 780 781 @property 782 def get_meta_icon(self) -> str | None: 783 """Get the URL to the App Icon image""" 784 if not self.meta_icon: 785 return None 786 787 return get_file_manager(FileUsage.MEDIA).file_url(self.meta_icon) 788 789 @property 790 def get_meta_icon_themed_urls(self) -> dict[str, str] | None: 791 """Get themed URLs for meta_icon if it contains %(theme)s""" 792 if not self.meta_icon: 793 return None 794 795 return get_file_manager(FileUsage.MEDIA).themed_urls(self.meta_icon) 796 797 def get_launch_url(self, user: User | None = None, user_data: dict | None = None) -> str | None: 798 """Get launch URL if set, otherwise attempt to get launch URL based on provider. 799 800 Args: 801 user: User instance for formatting the URL 802 user_data: Pre-serialized user data to avoid re-serialization (performance optimization) 803 """ 804 from authentik.core.api.users import UserSerializer 805 806 url = None 807 if self.meta_launch_url: 808 url = self.meta_launch_url 809 elif provider := self.get_provider(): 810 url = provider.launch_url 811 if user and url: 812 try: 813 # Use pre-serialized data if available, otherwise serialize now 814 if user_data is None: 815 user_data = UserSerializer(instance=user).data 816 return url % user_data 817 except Exception as exc: # noqa 818 LOGGER.warning("Failed to format launch url", exc=exc) 819 return url 820 return url 821 822 def get_provider(self) -> Provider | None: 823 """Get casted provider instance. Needs Application queryset with_provider""" 824 if hasattr(self, "_cached_provider"): 825 return self._cached_provider 826 if not self.provider: 827 self._cached_provider = None 828 return None 829 self._cached_provider = get_deepest_child(self.provider) 830 return self._cached_provider 831 832 def backchannel_provider_for[T: Provider](self, provider_type: type[T], **kwargs) -> T | None: 833 """Get Backchannel provider for a specific type""" 834 provider: BackchannelProvider | None = self.backchannel_providers.filter( 835 **{f"{provider_type._meta.model_name}__isnull": False}, 836 **kwargs, 837 ).first() 838 return getattr(provider, provider_type._meta.model_name) if provider else None 839 840 def __str__(self): 841 return str(self.name) 842 843 class Meta: 844 verbose_name = _("Application") 845 verbose_name_plural = _("Applications")
Every Application which uses authentik for authentication/identification/authorization needs an Application record. Other authentication types can subclass this Model to add custom fields and other properties
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
Accessor to the related object on the forward side of a one-to-one relation.
In the example::
class Restaurant(Model):
place = OneToOneField(Place, related_name='restaurant')
Restaurant.place is a ForwardOneToOneDescriptor instance.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
775 @property 776 def serializer(self) -> Serializer: 777 from authentik.core.api.applications import ApplicationSerializer 778 779 return ApplicationSerializer
Get serializer for this model
781 @property 782 def get_meta_icon(self) -> str | None: 783 """Get the URL to the App Icon image""" 784 if not self.meta_icon: 785 return None 786 787 return get_file_manager(FileUsage.MEDIA).file_url(self.meta_icon)
Get the URL to the App Icon image
789 @property 790 def get_meta_icon_themed_urls(self) -> dict[str, str] | None: 791 """Get themed URLs for meta_icon if it contains %(theme)s""" 792 if not self.meta_icon: 793 return None 794 795 return get_file_manager(FileUsage.MEDIA).themed_urls(self.meta_icon)
Get themed URLs for meta_icon if it contains %(theme)s
797 def get_launch_url(self, user: User | None = None, user_data: dict | None = None) -> str | None: 798 """Get launch URL if set, otherwise attempt to get launch URL based on provider. 799 800 Args: 801 user: User instance for formatting the URL 802 user_data: Pre-serialized user data to avoid re-serialization (performance optimization) 803 """ 804 from authentik.core.api.users import UserSerializer 805 806 url = None 807 if self.meta_launch_url: 808 url = self.meta_launch_url 809 elif provider := self.get_provider(): 810 url = provider.launch_url 811 if user and url: 812 try: 813 # Use pre-serialized data if available, otherwise serialize now 814 if user_data is None: 815 user_data = UserSerializer(instance=user).data 816 return url % user_data 817 except Exception as exc: # noqa 818 LOGGER.warning("Failed to format launch url", exc=exc) 819 return url 820 return url
Get launch URL if set, otherwise attempt to get launch URL based on provider.
Args: user: User instance for formatting the URL user_data: Pre-serialized user data to avoid re-serialization (performance optimization)
822 def get_provider(self) -> Provider | None: 823 """Get casted provider instance. Needs Application queryset with_provider""" 824 if hasattr(self, "_cached_provider"): 825 return self._cached_provider 826 if not self.provider: 827 self._cached_provider = None 828 return None 829 self._cached_provider = get_deepest_child(self.provider) 830 return self._cached_provider
Get casted provider instance. Needs Application queryset with_provider
832 def backchannel_provider_for[T: Provider](self, provider_type: type[T], **kwargs) -> T | None: 833 """Get Backchannel provider for a specific type""" 834 provider: BackchannelProvider | None = self.backchannel_providers.filter( 835 **{f"{provider_type._meta.model_name}__isnull": False}, 836 **kwargs, 837 ).first() 838 return getattr(provider, provider_type._meta.model_name) if provider else None
Get Backchannel provider for a specific type
Accessor to the related object on the forward side of a one-to-one relation.
In the example::
class Restaurant(Model):
place = OneToOneField(Place, related_name='restaurant')
Restaurant.place is a ForwardOneToOneDescriptor instance.
Accessor to the related objects manager on the reverse side of a many-to-one relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Parent.children is a ReverseManyToOneDescriptor instance.
Most of the implementation is delegated to a dynamically defined manager
class built by create_forward_many_to_many_manager() defined below.
Accessor to the related objects manager on the reverse side of a many-to-one relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Parent.children is a ReverseManyToOneDescriptor instance.
Most of the implementation is delegated to a dynamically defined manager
class built by create_forward_many_to_many_manager() defined below.
Accessor to the related objects manager on the reverse side of a many-to-one relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Parent.children is a ReverseManyToOneDescriptor instance.
Most of the implementation is delegated to a dynamically defined manager
class built by create_forward_many_to_many_manager() defined below.
Accessor to the related objects manager on the reverse side of a many-to-one relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Parent.children is a ReverseManyToOneDescriptor instance.
Most of the implementation is delegated to a dynamically defined manager
class built by create_forward_many_to_many_manager() defined below.
Inherited Members
The requested object does not exist
The query returned multiple objects when only one was expected.
848class ApplicationEntitlement(AttributesMixin, SerializerModel, PolicyBindingModel): 849 """Application-scoped entitlement to control authorization in an application""" 850 851 name = models.TextField() 852 853 app = models.ForeignKey(Application, on_delete=models.CASCADE) 854 855 class Meta: 856 verbose_name = _("Application Entitlement") 857 verbose_name_plural = _("Application Entitlements") 858 unique_together = (("app", "name"),) 859 860 def __str__(self): 861 return f"Application Entitlement {self.name} for app {self.app_id}" 862 863 @property 864 def serializer(self) -> type[Serializer]: 865 from authentik.core.api.application_entitlements import ApplicationEntitlementSerializer 866 867 return ApplicationEntitlementSerializer 868 869 def supported_policy_binding_targets(self): 870 return ["group", "user"]
Application-scoped entitlement to control authorization in an application
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Child.parent is a ForwardManyToOneDescriptor instance.
863 @property 864 def serializer(self) -> type[Serializer]: 865 from authentik.core.api.application_entitlements import ApplicationEntitlementSerializer 866 867 return ApplicationEntitlementSerializer
Get serializer for this model
Return the list of objects that can be bound to this object.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
Accessor to the related object on the forward side of a one-to-one relation.
In the example::
class Restaurant(Model):
place = OneToOneField(Place, related_name='restaurant')
Restaurant.place is a ForwardOneToOneDescriptor instance.
Inherited Members
The requested object does not exist
The query returned multiple objects when only one was expected.
873class SourceUserMatchingModes(models.TextChoices): 874 """Different modes a source can handle new/returning users""" 875 876 IDENTIFIER = "identifier", _("Use the source-specific identifier") 877 EMAIL_LINK = ( 878 "email_link", 879 _( 880 "Link to a user with identical email address. Can have security implications " 881 "when a source doesn't validate email addresses." 882 ), 883 ) 884 EMAIL_DENY = ( 885 "email_deny", 886 _( 887 "Use the user's email address, but deny enrollment when the email address already " 888 "exists." 889 ), 890 ) 891 USERNAME_LINK = ( 892 "username_link", 893 _( 894 "Link to a user with identical username. Can have security implications " 895 "when a username is used with another source." 896 ), 897 ) 898 USERNAME_DENY = ( 899 "username_deny", 900 _("Use the user's username, but deny enrollment when the username already exists."), 901 )
Different modes a source can handle new/returning users
904class SourceGroupMatchingModes(models.TextChoices): 905 """Different modes a source can handle new/returning groups""" 906 907 IDENTIFIER = "identifier", _("Use the source-specific identifier") 908 NAME_LINK = ( 909 "name_link", 910 _( 911 "Link to a group with identical name. Can have security implications " 912 "when a group name is used with another source." 913 ), 914 ) 915 NAME_DENY = ( 916 "name_deny", 917 _("Use the group name, but deny enrollment when the name already exists."), 918 )
Different modes a source can handle new/returning groups
921class Source(ManagedModel, SerializerModel, PolicyBindingModel): 922 """Base Authentication source, i.e. an OAuth Provider, SAML Remote or LDAP Server""" 923 924 MANAGED_INBUILT = "goauthentik.io/sources/inbuilt" 925 926 name = models.TextField(help_text=_("Source's display Name.")) 927 slug = models.TextField( 928 validators=[validate_slug], 929 help_text=_("Internal source name, used in URLs."), 930 unique=True, 931 ) 932 933 user_path_template = models.TextField(default="goauthentik.io/sources/%(slug)s") 934 935 enabled = models.BooleanField(default=True) 936 promoted = models.BooleanField( 937 default=False, 938 help_text=_( 939 "When enabled, this source will be displayed as a prominent button on the " 940 "login page, instead of a small icon." 941 ), 942 ) 943 user_property_mappings = models.ManyToManyField( 944 "PropertyMapping", default=None, blank=True, related_name="source_userpropertymappings_set" 945 ) 946 group_property_mappings = models.ManyToManyField( 947 "PropertyMapping", default=None, blank=True, related_name="source_grouppropertymappings_set" 948 ) 949 950 icon = FileField(blank=True, default="") 951 952 authentication_flow = models.ForeignKey( 953 "authentik_flows.Flow", 954 blank=True, 955 null=True, 956 default=None, 957 on_delete=models.SET_NULL, 958 help_text=_("Flow to use when authenticating existing users."), 959 related_name="source_authentication", 960 ) 961 enrollment_flow = models.ForeignKey( 962 "authentik_flows.Flow", 963 blank=True, 964 null=True, 965 default=None, 966 on_delete=models.SET_NULL, 967 help_text=_("Flow to use when enrolling new users."), 968 related_name="source_enrollment", 969 ) 970 971 user_matching_mode = models.TextField( 972 choices=SourceUserMatchingModes.choices, 973 default=SourceUserMatchingModes.IDENTIFIER, 974 help_text=_( 975 "How the source determines if an existing user should be authenticated or " 976 "a new user enrolled." 977 ), 978 ) 979 group_matching_mode = models.TextField( 980 choices=SourceGroupMatchingModes.choices, 981 default=SourceGroupMatchingModes.IDENTIFIER, 982 help_text=_( 983 "How the source determines if an existing group should be used or a new group created." 984 ), 985 ) 986 987 objects = InheritanceManager() 988 989 def get_icon_url(self, request=None, use_cache: bool = True) -> str | None: 990 """Get the URL to the source icon.""" 991 if not self.icon: 992 return None 993 return get_file_manager(FileUsage.MEDIA).file_url(self.icon, request, use_cache=use_cache) 994 995 @property 996 def icon_url(self) -> str | None: 997 """Get the URL to the source icon""" 998 return self.get_icon_url() 999 1000 def get_icon_themed_urls( 1001 self, 1002 request=None, 1003 use_cache: bool = True, 1004 ) -> dict[str, str] | None: 1005 """Get themed URLs for icon if it contains %(theme)s.""" 1006 if not self.icon: 1007 return None 1008 return get_file_manager(FileUsage.MEDIA).themed_urls( 1009 self.icon, 1010 request, 1011 use_cache=use_cache, 1012 ) 1013 1014 @property 1015 def icon_themed_urls(self) -> dict[str, str] | None: 1016 return self.get_icon_themed_urls() 1017 1018 def get_user_path(self) -> str: 1019 """Get user path, fallback to default for formatting errors""" 1020 try: 1021 return self.user_path_template % { 1022 "slug": self.slug, 1023 } 1024 1025 except Exception as exc: # noqa 1026 LOGGER.warning("Failed to template user path", exc=exc, source=self) 1027 return User.default_path() 1028 1029 @property 1030 def component(self) -> str: 1031 """Return component used to edit this object""" 1032 if self.managed == self.MANAGED_INBUILT: 1033 return "" 1034 raise NotImplementedError 1035 1036 @property 1037 def property_mapping_type(self) -> type[PropertyMapping]: 1038 """Return property mapping type used by this object""" 1039 if self.managed == self.MANAGED_INBUILT: 1040 from authentik.core.models import PropertyMapping 1041 1042 return PropertyMapping 1043 raise NotImplementedError 1044 1045 def ui_login_button(self, request: HttpRequest) -> UILoginButton | None: 1046 """If source uses a http-based flow, return UI Information about the login 1047 button. If source doesn't use http-based flow, return None.""" 1048 return None 1049 1050 def ui_user_settings(self) -> UserSettingSerializer | None: 1051 """Entrypoint to integrate with User settings. Can either return None if no 1052 user settings are available, or UserSettingSerializer.""" 1053 return None 1054 1055 def get_base_user_properties(self, **kwargs) -> dict[str, Any | dict[str, Any]]: 1056 """Get base properties for a user to build final properties upon.""" 1057 if self.managed == self.MANAGED_INBUILT: 1058 return {} 1059 raise NotImplementedError 1060 1061 def get_base_group_properties(self, **kwargs) -> dict[str, Any | dict[str, Any]]: 1062 """Get base properties for a group to build final properties upon.""" 1063 if self.managed == self.MANAGED_INBUILT: 1064 return {} 1065 raise NotImplementedError 1066 1067 def __str__(self): 1068 return str(self.name) 1069 1070 class Meta: 1071 indexes = [ 1072 models.Index( 1073 fields=[ 1074 "slug", 1075 ] 1076 ), 1077 models.Index( 1078 fields=[ 1079 "name", 1080 ] 1081 ), 1082 models.Index( 1083 fields=[ 1084 "enabled", 1085 ] 1086 ), 1087 ]
Base Authentication source, i.e. an OAuth Provider, SAML Remote or LDAP Server
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
Accessor to the related objects manager on the forward and reverse sides of a many-to-many relation.
In the example::
class Pizza(Model):
toppings = ManyToManyField(Topping, related_name='pizzas')
Pizza.toppings and Topping.pizzas are ManyToManyDescriptor
instances.
Most of the implementation is delegated to a dynamically defined manager
class built by create_forward_many_to_many_manager() defined below.
Accessor to the related objects manager on the forward and reverse sides of a many-to-many relation.
In the example::
class Pizza(Model):
toppings = ManyToManyField(Topping, related_name='pizzas')
Pizza.toppings and Topping.pizzas are ManyToManyDescriptor
instances.
Most of the implementation is delegated to a dynamically defined manager
class built by create_forward_many_to_many_manager() defined below.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Child.parent is a ForwardManyToOneDescriptor instance.
Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Child.parent is a ForwardManyToOneDescriptor instance.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
989 def get_icon_url(self, request=None, use_cache: bool = True) -> str | None: 990 """Get the URL to the source icon.""" 991 if not self.icon: 992 return None 993 return get_file_manager(FileUsage.MEDIA).file_url(self.icon, request, use_cache=use_cache)
Get the URL to the source icon.
995 @property 996 def icon_url(self) -> str | None: 997 """Get the URL to the source icon""" 998 return self.get_icon_url()
Get the URL to the source icon
1000 def get_icon_themed_urls( 1001 self, 1002 request=None, 1003 use_cache: bool = True, 1004 ) -> dict[str, str] | None: 1005 """Get themed URLs for icon if it contains %(theme)s.""" 1006 if not self.icon: 1007 return None 1008 return get_file_manager(FileUsage.MEDIA).themed_urls( 1009 self.icon, 1010 request, 1011 use_cache=use_cache, 1012 )
Get themed URLs for icon if it contains %(theme)s.
1018 def get_user_path(self) -> str: 1019 """Get user path, fallback to default for formatting errors""" 1020 try: 1021 return self.user_path_template % { 1022 "slug": self.slug, 1023 } 1024 1025 except Exception as exc: # noqa 1026 LOGGER.warning("Failed to template user path", exc=exc, source=self) 1027 return User.default_path()
Get user path, fallback to default for formatting errors
1029 @property 1030 def component(self) -> str: 1031 """Return component used to edit this object""" 1032 if self.managed == self.MANAGED_INBUILT: 1033 return "" 1034 raise NotImplementedError
Return component used to edit this object
1036 @property 1037 def property_mapping_type(self) -> type[PropertyMapping]: 1038 """Return property mapping type used by this object""" 1039 if self.managed == self.MANAGED_INBUILT: 1040 from authentik.core.models import PropertyMapping 1041 1042 return PropertyMapping 1043 raise NotImplementedError
Return property mapping type used by this object
1050 def ui_user_settings(self) -> UserSettingSerializer | None: 1051 """Entrypoint to integrate with User settings. Can either return None if no 1052 user settings are available, or UserSettingSerializer.""" 1053 return None
Entrypoint to integrate with User settings. Can either return None if no user settings are available, or UserSettingSerializer.
1055 def get_base_user_properties(self, **kwargs) -> dict[str, Any | dict[str, Any]]: 1056 """Get base properties for a user to build final properties upon.""" 1057 if self.managed == self.MANAGED_INBUILT: 1058 return {} 1059 raise NotImplementedError
Get base properties for a user to build final properties upon.
1061 def get_base_group_properties(self, **kwargs) -> dict[str, Any | dict[str, Any]]: 1062 """Get base properties for a group to build final properties upon.""" 1063 if self.managed == self.MANAGED_INBUILT: 1064 return {} 1065 raise NotImplementedError
Get base properties for a group to build final properties upon.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
Method descriptor with partial application of the given arguments and keywords.
Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.
Method descriptor with partial application of the given arguments and keywords.
Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.
Accessor to the related object on the forward side of a one-to-one relation.
In the example::
class Restaurant(Model):
place = OneToOneField(Place, related_name='restaurant')
Restaurant.place is a ForwardOneToOneDescriptor instance.
Accessor to the related objects manager on the forward and reverse sides of a many-to-many relation.
In the example::
class Pizza(Model):
toppings = ManyToManyField(Topping, related_name='pizzas')
Pizza.toppings and Topping.pizzas are ManyToManyDescriptor
instances.
Most of the implementation is delegated to a dynamically defined manager
class built by create_forward_many_to_many_manager() defined below.
Accessor to the related objects manager on the reverse side of a many-to-one relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Parent.children is a ReverseManyToOneDescriptor instance.
Most of the implementation is delegated to a dynamically defined manager
class built by create_forward_many_to_many_manager() defined below.
Accessor to the related objects manager on the reverse side of a many-to-one relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Parent.children is a ReverseManyToOneDescriptor instance.
Most of the implementation is delegated to a dynamically defined manager
class built by create_forward_many_to_many_manager() defined below.
Accessor to the related object on the reverse side of a one-to-one relation.
In the example::
class Restaurant(Model):
place = OneToOneField(Place, related_name='restaurant')
Place.restaurant is a ReverseOneToOneDescriptor instance.
Accessor to the related object on the reverse side of a one-to-one relation.
In the example::
class Restaurant(Model):
place = OneToOneField(Place, related_name='restaurant')
Place.restaurant is a ReverseOneToOneDescriptor instance.
Accessor to the related object on the reverse side of a one-to-one relation.
In the example::
class Restaurant(Model):
place = OneToOneField(Place, related_name='restaurant')
Place.restaurant is a ReverseOneToOneDescriptor instance.
Accessor to the related object on the reverse side of a one-to-one relation.
In the example::
class Restaurant(Model):
place = OneToOneField(Place, related_name='restaurant')
Place.restaurant is a ReverseOneToOneDescriptor instance.
Accessor to the related objects manager on the forward and reverse sides of a many-to-many relation.
In the example::
class Pizza(Model):
toppings = ManyToManyField(Topping, related_name='pizzas')
Pizza.toppings and Topping.pizzas are ManyToManyDescriptor
instances.
Most of the implementation is delegated to a dynamically defined manager
class built by create_forward_many_to_many_manager() defined below.
Accessor to the related object on the reverse side of a one-to-one relation.
In the example::
class Restaurant(Model):
place = OneToOneField(Place, related_name='restaurant')
Place.restaurant is a ReverseOneToOneDescriptor instance.
Accessor to the related object on the reverse side of a one-to-one relation.
In the example::
class Restaurant(Model):
place = OneToOneField(Place, related_name='restaurant')
Place.restaurant is a ReverseOneToOneDescriptor instance.
Accessor to the related object on the reverse side of a one-to-one relation.
In the example::
class Restaurant(Model):
place = OneToOneField(Place, related_name='restaurant')
Place.restaurant is a ReverseOneToOneDescriptor instance.
Accessor to the related objects manager on the reverse side of a many-to-one relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Parent.children is a ReverseManyToOneDescriptor instance.
Most of the implementation is delegated to a dynamically defined manager
class built by create_forward_many_to_many_manager() defined below.
Inherited Members
The requested object does not exist
The query returned multiple objects when only one was expected.
1090class UserSourceConnection(SerializerModel, CreatedUpdatedModel): 1091 """Connection between User and Source.""" 1092 1093 user = models.ForeignKey(User, on_delete=models.CASCADE) 1094 source = models.ForeignKey(Source, on_delete=models.CASCADE) 1095 identifier = models.TextField() 1096 1097 objects = InheritanceManager() 1098 1099 @property 1100 def serializer(self) -> type[Serializer]: 1101 """Get serializer for this model""" 1102 raise NotImplementedError 1103 1104 def __str__(self) -> str: 1105 return f"User-source connection (user={self.user_id}, source={self.source_id})" 1106 1107 class Meta: 1108 unique_together = (("user", "source"),) 1109 indexes = ( 1110 models.Index(fields=("identifier",)), 1111 models.Index(fields=("source", "identifier")), 1112 )
Connection between User and Source.
Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Child.parent is a ForwardManyToOneDescriptor instance.
Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Child.parent is a ForwardManyToOneDescriptor instance.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
1099 @property 1100 def serializer(self) -> type[Serializer]: 1101 """Get serializer for this model""" 1102 raise NotImplementedError
Get serializer for this model
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
Method descriptor with partial application of the given arguments and keywords.
Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.
Method descriptor with partial application of the given arguments and keywords.
Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.
Method descriptor with partial application of the given arguments and keywords.
Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.
Method descriptor with partial application of the given arguments and keywords.
Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
Accessor to the related object on the reverse side of a one-to-one relation.
In the example::
class Restaurant(Model):
place = OneToOneField(Place, related_name='restaurant')
Place.restaurant is a ReverseOneToOneDescriptor instance.
Accessor to the related object on the reverse side of a one-to-one relation.
In the example::
class Restaurant(Model):
place = OneToOneField(Place, related_name='restaurant')
Place.restaurant is a ReverseOneToOneDescriptor instance.
Accessor to the related object on the reverse side of a one-to-one relation.
In the example::
class Restaurant(Model):
place = OneToOneField(Place, related_name='restaurant')
Place.restaurant is a ReverseOneToOneDescriptor instance.
Accessor to the related object on the reverse side of a one-to-one relation.
In the example::
class Restaurant(Model):
place = OneToOneField(Place, related_name='restaurant')
Place.restaurant is a ReverseOneToOneDescriptor instance.
Accessor to the related object on the reverse side of a one-to-one relation.
In the example::
class Restaurant(Model):
place = OneToOneField(Place, related_name='restaurant')
Place.restaurant is a ReverseOneToOneDescriptor instance.
Accessor to the related object on the reverse side of a one-to-one relation.
In the example::
class Restaurant(Model):
place = OneToOneField(Place, related_name='restaurant')
Place.restaurant is a ReverseOneToOneDescriptor instance.
Inherited Members
The requested object does not exist
The query returned multiple objects when only one was expected.
1115class GroupSourceConnection(SerializerModel, CreatedUpdatedModel): 1116 """Connection between Group and Source.""" 1117 1118 group = models.ForeignKey(Group, on_delete=models.CASCADE) 1119 source = models.ForeignKey(Source, on_delete=models.CASCADE) 1120 identifier = models.TextField() 1121 1122 objects = InheritanceManager() 1123 1124 @property 1125 def serializer(self) -> type[Serializer]: 1126 """Get serializer for this model""" 1127 raise NotImplementedError 1128 1129 def __str__(self) -> str: 1130 return f"Group-source connection (group={self.group_id}, source={self.source_id})" 1131 1132 class Meta: 1133 unique_together = (("group", "source"),)
Connection between Group and Source.
Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Child.parent is a ForwardManyToOneDescriptor instance.
Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Child.parent is a ForwardManyToOneDescriptor instance.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
1124 @property 1125 def serializer(self) -> type[Serializer]: 1126 """Get serializer for this model""" 1127 raise NotImplementedError
Get serializer for this model
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
Method descriptor with partial application of the given arguments and keywords.
Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.
Method descriptor with partial application of the given arguments and keywords.
Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.
Method descriptor with partial application of the given arguments and keywords.
Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.
Method descriptor with partial application of the given arguments and keywords.
Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
Accessor to the related object on the reverse side of a one-to-one relation.
In the example::
class Restaurant(Model):
place = OneToOneField(Place, related_name='restaurant')
Place.restaurant is a ReverseOneToOneDescriptor instance.
Accessor to the related object on the reverse side of a one-to-one relation.
In the example::
class Restaurant(Model):
place = OneToOneField(Place, related_name='restaurant')
Place.restaurant is a ReverseOneToOneDescriptor instance.
Accessor to the related object on the reverse side of a one-to-one relation.
In the example::
class Restaurant(Model):
place = OneToOneField(Place, related_name='restaurant')
Place.restaurant is a ReverseOneToOneDescriptor instance.
Accessor to the related object on the reverse side of a one-to-one relation.
In the example::
class Restaurant(Model):
place = OneToOneField(Place, related_name='restaurant')
Place.restaurant is a ReverseOneToOneDescriptor instance.
Accessor to the related object on the reverse side of a one-to-one relation.
In the example::
class Restaurant(Model):
place = OneToOneField(Place, related_name='restaurant')
Place.restaurant is a ReverseOneToOneDescriptor instance.
Accessor to the related object on the reverse side of a one-to-one relation.
In the example::
class Restaurant(Model):
place = OneToOneField(Place, related_name='restaurant')
Place.restaurant is a ReverseOneToOneDescriptor instance.
Inherited Members
The requested object does not exist
The query returned multiple objects when only one was expected.
1136class ExpiringManager(Manager): 1137 """Manager for expiring objects which filters out expired objects by default""" 1138 1139 def get_queryset(self): 1140 return QuerySet(self.model, using=self._db).exclude(expires__lt=now(), expiring=True) 1141 1142 def including_expired(self): 1143 return QuerySet(self.model, using=self._db)
Manager for expiring objects which filters out expired objects by default
1146class ExpiringModel(models.Model): 1147 """Base Model which can expire, and is automatically cleaned up.""" 1148 1149 expires = models.DateTimeField(default=None, null=True) 1150 expiring = models.BooleanField(default=True) 1151 1152 objects = ExpiringManager() 1153 1154 class Meta: 1155 abstract = True 1156 indexes = [ 1157 models.Index(fields=["expires"]), 1158 models.Index(fields=["expiring"]), 1159 models.Index(fields=["expiring", "expires"]), 1160 ] 1161 1162 def expire_action(self, *args, **kwargs): 1163 """Handler which is called when this object is expired. By 1164 default the object is deleted. This is less efficient compared 1165 to bulk deleting objects, but classes like Token() need to change 1166 values instead of being deleted.""" 1167 try: 1168 return self.delete(*args, **kwargs) 1169 except self.DoesNotExist: 1170 # Object has already been deleted, so this should be fine 1171 return None 1172 1173 @classmethod 1174 def filter_not_expired(cls, **kwargs) -> QuerySet[Self]: 1175 """Filer for tokens which are not expired yet or are not expiring, 1176 and match filters in `kwargs`""" 1177 from authentik.events.models import Event 1178 1179 deprecation_id = f"{class_to_path(cls)}.filter_not_expired" 1180 1181 Event.log_deprecation( 1182 deprecation_id, 1183 message=( 1184 ".filter_not_expired() is deprecated as the default lookup now excludes " 1185 "expired objects." 1186 ), 1187 ) 1188 1189 for obj in ( 1190 cls.objects.including_expired() 1191 .filter(**kwargs) 1192 .filter(Q(expires__lt=now(), expiring=True)) 1193 ): 1194 obj.delete() 1195 return cls.objects.filter(**kwargs) 1196 1197 @property 1198 def is_expired(self) -> bool: 1199 """Check if token is expired yet.""" 1200 if not self.expiring: 1201 return False 1202 return now() > self.expires
Base Model which can expire, and is automatically cleaned up.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
1162 def expire_action(self, *args, **kwargs): 1163 """Handler which is called when this object is expired. By 1164 default the object is deleted. This is less efficient compared 1165 to bulk deleting objects, but classes like Token() need to change 1166 values instead of being deleted.""" 1167 try: 1168 return self.delete(*args, **kwargs) 1169 except self.DoesNotExist: 1170 # Object has already been deleted, so this should be fine 1171 return None
Handler which is called when this object is expired. By default the object is deleted. This is less efficient compared to bulk deleting objects, but classes like Token() need to change values instead of being deleted.
1173 @classmethod 1174 def filter_not_expired(cls, **kwargs) -> QuerySet[Self]: 1175 """Filer for tokens which are not expired yet or are not expiring, 1176 and match filters in `kwargs`""" 1177 from authentik.events.models import Event 1178 1179 deprecation_id = f"{class_to_path(cls)}.filter_not_expired" 1180 1181 Event.log_deprecation( 1182 deprecation_id, 1183 message=( 1184 ".filter_not_expired() is deprecated as the default lookup now excludes " 1185 "expired objects." 1186 ), 1187 ) 1188 1189 for obj in ( 1190 cls.objects.including_expired() 1191 .filter(**kwargs) 1192 .filter(Q(expires__lt=now(), expiring=True)) 1193 ): 1194 obj.delete() 1195 return cls.objects.filter(**kwargs)
Filer for tokens which are not expired yet or are not expiring,
and match filters in kwargs
1205class TokenIntents(models.TextChoices): 1206 """Intents a Token can be created for.""" 1207 1208 # Single use token 1209 INTENT_VERIFICATION = "verification" 1210 1211 # Allow access to API 1212 INTENT_API = "api" 1213 1214 # Recovery use for the recovery app 1215 INTENT_RECOVERY = "recovery" 1216 1217 # App-specific passwords 1218 INTENT_APP_PASSWORD = "app_password" # nosec
Intents a Token can be created for.
1221class Token(SerializerModel, ManagedModel, ExpiringModel): 1222 """Token used to authenticate the User for API Access or confirm another Stage like Email.""" 1223 1224 token_uuid = models.UUIDField(primary_key=True, editable=False, default=uuid4) 1225 identifier = models.SlugField(max_length=255, unique=True) 1226 key = models.TextField(default=default_token_key) 1227 intent = models.TextField( 1228 choices=TokenIntents.choices, default=TokenIntents.INTENT_VERIFICATION 1229 ) 1230 user = models.ForeignKey("User", on_delete=models.CASCADE, related_name="+") 1231 description = models.TextField(default="", blank=True) 1232 1233 class Meta: 1234 verbose_name = _("Token") 1235 verbose_name_plural = _("Tokens") 1236 indexes = ExpiringModel.Meta.indexes + [ 1237 models.Index(fields=["identifier"]), 1238 models.Index(fields=["key"]), 1239 ] 1240 permissions = [ 1241 ("view_token_key", _("View token's key")), 1242 ("set_token_key", _("Set a token's key")), 1243 ] 1244 1245 def __str__(self): 1246 description = f"{self.identifier}" 1247 if self.expiring: 1248 description += f" (expires={self.expires})" 1249 return description 1250 1251 @property 1252 def serializer(self) -> type[Serializer]: 1253 from authentik.core.api.tokens import TokenSerializer 1254 1255 return TokenSerializer 1256 1257 def expire_action(self, *args, **kwargs): 1258 """Handler which is called when this object is expired.""" 1259 from authentik.events.models import Event, EventAction 1260 1261 if self.intent in [ 1262 TokenIntents.INTENT_RECOVERY, 1263 TokenIntents.INTENT_VERIFICATION, 1264 TokenIntents.INTENT_APP_PASSWORD, 1265 ]: 1266 super().expire_action(*args, **kwargs) 1267 return 1268 1269 self.key = default_token_key() 1270 self.expires = default_token_duration() 1271 self.save(*args, **kwargs) 1272 Event.new( 1273 action=EventAction.SECRET_ROTATE, 1274 token=self, 1275 message=f"Token {self.identifier}'s secret was rotated.", 1276 ).save()
Token used to authenticate the User for API Access or confirm another Stage like Email.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Child.parent is a ForwardManyToOneDescriptor instance.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
1251 @property 1252 def serializer(self) -> type[Serializer]: 1253 from authentik.core.api.tokens import TokenSerializer 1254 1255 return TokenSerializer
Get serializer for this model
1257 def expire_action(self, *args, **kwargs): 1258 """Handler which is called when this object is expired.""" 1259 from authentik.events.models import Event, EventAction 1260 1261 if self.intent in [ 1262 TokenIntents.INTENT_RECOVERY, 1263 TokenIntents.INTENT_VERIFICATION, 1264 TokenIntents.INTENT_APP_PASSWORD, 1265 ]: 1266 super().expire_action(*args, **kwargs) 1267 return 1268 1269 self.key = default_token_key() 1270 self.expires = default_token_duration() 1271 self.save(*args, **kwargs) 1272 Event.new( 1273 action=EventAction.SECRET_ROTATE, 1274 token=self, 1275 message=f"Token {self.identifier}'s secret was rotated.", 1276 ).save()
Handler which is called when this object is expired.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
Method descriptor with partial application of the given arguments and keywords.
Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.
Accessor to the related object on the reverse side of a one-to-one relation.
In the example::
class Restaurant(Model):
place = OneToOneField(Place, related_name='restaurant')
Place.restaurant is a ReverseOneToOneDescriptor instance.
Accessor to the related objects manager on the reverse side of a many-to-one relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Parent.children is a ReverseManyToOneDescriptor instance.
Most of the implementation is delegated to a dynamically defined manager
class built by create_forward_many_to_many_manager() defined below.
Accessor to the related objects manager on the reverse side of a many-to-one relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Parent.children is a ReverseManyToOneDescriptor instance.
Most of the implementation is delegated to a dynamically defined manager
class built by create_forward_many_to_many_manager() defined below.
Inherited Members
The requested object does not exist
The query returned multiple objects when only one was expected.
1279class PropertyMapping(SerializerModel, ManagedModel): 1280 """User-defined key -> x mapping which can be used by providers to expose extra data.""" 1281 1282 pm_uuid = models.UUIDField(primary_key=True, editable=False, default=uuid4) 1283 name = models.TextField(unique=True) 1284 expression = models.TextField() 1285 1286 objects = InheritanceManager() 1287 1288 @property 1289 def component(self) -> str: 1290 """Return component used to edit this object""" 1291 raise NotImplementedError 1292 1293 @property 1294 def serializer(self) -> type[Serializer]: 1295 """Get serializer for this model""" 1296 raise NotImplementedError 1297 1298 def evaluate(self, user: User | None, request: HttpRequest | None, **kwargs) -> Any: 1299 """Evaluate `self.expression` using `**kwargs` as Context.""" 1300 from authentik.core.expression.evaluator import PropertyMappingEvaluator 1301 1302 evaluator = PropertyMappingEvaluator(self, user, request, **kwargs) 1303 try: 1304 return evaluator.evaluate(self.expression) 1305 except ControlFlowException as exc: 1306 raise exc 1307 except Exception as exc: 1308 raise PropertyMappingExpressionException(exc, self) from exc 1309 1310 def __str__(self): 1311 return f"Property Mapping {self.name}" 1312 1313 class Meta: 1314 verbose_name = _("Property Mapping") 1315 verbose_name_plural = _("Property Mappings")
User-defined key -> x mapping which can be used by providers to expose extra data.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
1288 @property 1289 def component(self) -> str: 1290 """Return component used to edit this object""" 1291 raise NotImplementedError
Return component used to edit this object
1293 @property 1294 def serializer(self) -> type[Serializer]: 1295 """Get serializer for this model""" 1296 raise NotImplementedError
Get serializer for this model
1298 def evaluate(self, user: User | None, request: HttpRequest | None, **kwargs) -> Any: 1299 """Evaluate `self.expression` using `**kwargs` as Context.""" 1300 from authentik.core.expression.evaluator import PropertyMappingEvaluator 1301 1302 evaluator = PropertyMappingEvaluator(self, user, request, **kwargs) 1303 try: 1304 return evaluator.evaluate(self.expression) 1305 except ControlFlowException as exc: 1306 raise exc 1307 except Exception as exc: 1308 raise PropertyMappingExpressionException(exc, self) from exc
Evaluate self.expression using **kwargs as Context.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
Accessor to the related objects manager on the forward and reverse sides of a many-to-many relation.
In the example::
class Pizza(Model):
toppings = ManyToManyField(Topping, related_name='pizzas')
Pizza.toppings and Topping.pizzas are ManyToManyDescriptor
instances.
Most of the implementation is delegated to a dynamically defined manager
class built by create_forward_many_to_many_manager() defined below.
Accessor to the related objects manager on the forward and reverse sides of a many-to-many relation.
In the example::
class Pizza(Model):
toppings = ManyToManyField(Topping, related_name='pizzas')
Pizza.toppings and Topping.pizzas are ManyToManyDescriptor
instances.
Most of the implementation is delegated to a dynamically defined manager
class built by create_forward_many_to_many_manager() defined below.
Accessor to the related objects manager on the forward and reverse sides of a many-to-many relation.
In the example::
class Pizza(Model):
toppings = ManyToManyField(Topping, related_name='pizzas')
Pizza.toppings and Topping.pizzas are ManyToManyDescriptor
instances.
Most of the implementation is delegated to a dynamically defined manager
class built by create_forward_many_to_many_manager() defined below.
Accessor to the related object on the reverse side of a one-to-one relation.
In the example::
class Restaurant(Model):
place = OneToOneField(Place, related_name='restaurant')
Place.restaurant is a ReverseOneToOneDescriptor instance.
Accessor to the related object on the reverse side of a one-to-one relation.
In the example::
class Restaurant(Model):
place = OneToOneField(Place, related_name='restaurant')
Place.restaurant is a ReverseOneToOneDescriptor instance.
Accessor to the related object on the reverse side of a one-to-one relation.
In the example::
class Restaurant(Model):
place = OneToOneField(Place, related_name='restaurant')
Place.restaurant is a ReverseOneToOneDescriptor instance.
Accessor to the related objects manager on the forward and reverse sides of a many-to-many relation.
In the example::
class Pizza(Model):
toppings = ManyToManyField(Topping, related_name='pizzas')
Pizza.toppings and Topping.pizzas are ManyToManyDescriptor
instances.
Most of the implementation is delegated to a dynamically defined manager
class built by create_forward_many_to_many_manager() defined below.
Accessor to the related object on the reverse side of a one-to-one relation.
In the example::
class Restaurant(Model):
place = OneToOneField(Place, related_name='restaurant')
Place.restaurant is a ReverseOneToOneDescriptor instance.
Accessor to the related object on the reverse side of a one-to-one relation.
In the example::
class Restaurant(Model):
place = OneToOneField(Place, related_name='restaurant')
Place.restaurant is a ReverseOneToOneDescriptor instance.
Accessor to the related object on the reverse side of a one-to-one relation.
In the example::
class Restaurant(Model):
place = OneToOneField(Place, related_name='restaurant')
Place.restaurant is a ReverseOneToOneDescriptor instance.
Accessor to the related object on the reverse side of a one-to-one relation.
In the example::
class Restaurant(Model):
place = OneToOneField(Place, related_name='restaurant')
Place.restaurant is a ReverseOneToOneDescriptor instance.
Accessor to the related objects manager on the forward and reverse sides of a many-to-many relation.
In the example::
class Pizza(Model):
toppings = ManyToManyField(Topping, related_name='pizzas')
Pizza.toppings and Topping.pizzas are ManyToManyDescriptor
instances.
Most of the implementation is delegated to a dynamically defined manager
class built by create_forward_many_to_many_manager() defined below.
Accessor to the related object on the reverse side of a one-to-one relation.
In the example::
class Restaurant(Model):
place = OneToOneField(Place, related_name='restaurant')
Place.restaurant is a ReverseOneToOneDescriptor instance.
Accessor to the related object on the reverse side of a one-to-one relation.
In the example::
class Restaurant(Model):
place = OneToOneField(Place, related_name='restaurant')
Place.restaurant is a ReverseOneToOneDescriptor instance.
Accessor to the related object on the reverse side of a one-to-one relation.
In the example::
class Restaurant(Model):
place = OneToOneField(Place, related_name='restaurant')
Place.restaurant is a ReverseOneToOneDescriptor instance.
Accessor to the related object on the reverse side of a one-to-one relation.
In the example::
class Restaurant(Model):
place = OneToOneField(Place, related_name='restaurant')
Place.restaurant is a ReverseOneToOneDescriptor instance.
Accessor to the related object on the reverse side of a one-to-one relation.
In the example::
class Restaurant(Model):
place = OneToOneField(Place, related_name='restaurant')
Place.restaurant is a ReverseOneToOneDescriptor instance.
Accessor to the related object on the reverse side of a one-to-one relation.
In the example::
class Restaurant(Model):
place = OneToOneField(Place, related_name='restaurant')
Place.restaurant is a ReverseOneToOneDescriptor instance.
Accessor to the related objects manager on the forward and reverse sides of a many-to-many relation.
In the example::
class Pizza(Model):
toppings = ManyToManyField(Topping, related_name='pizzas')
Pizza.toppings and Topping.pizzas are ManyToManyDescriptor
instances.
Most of the implementation is delegated to a dynamically defined manager
class built by create_forward_many_to_many_manager() defined below.
Accessor to the related object on the reverse side of a one-to-one relation.
In the example::
class Restaurant(Model):
place = OneToOneField(Place, related_name='restaurant')
Place.restaurant is a ReverseOneToOneDescriptor instance.
Accessor to the related objects manager on the forward and reverse sides of a many-to-many relation.
In the example::
class Pizza(Model):
toppings = ManyToManyField(Topping, related_name='pizzas')
Pizza.toppings and Topping.pizzas are ManyToManyDescriptor
instances.
Most of the implementation is delegated to a dynamically defined manager
class built by create_forward_many_to_many_manager() defined below.
Accessor to the related object on the reverse side of a one-to-one relation.
In the example::
class Restaurant(Model):
place = OneToOneField(Place, related_name='restaurant')
Place.restaurant is a ReverseOneToOneDescriptor instance.
Inherited Members
The requested object does not exist
The query returned multiple objects when only one was expected.
1318class Session(ExpiringModel, AbstractBaseSession): 1319 """User session with extra fields for fast access""" 1320 1321 # Remove upstream field because we're using our own ExpiringModel 1322 expire_date = None 1323 session_data = models.BinaryField(_("session data")) 1324 1325 # Keep in sync with Session.Keys 1326 last_ip = models.GenericIPAddressField() 1327 last_user_agent = models.TextField(blank=True) 1328 last_used = models.DateTimeField(auto_now=True) 1329 1330 class Meta: 1331 verbose_name = _("Session") 1332 verbose_name_plural = _("Sessions") 1333 indexes = ExpiringModel.Meta.indexes + [ 1334 models.Index(fields=["expires", "session_key"]), 1335 ] 1336 default_permissions = [] 1337 1338 def __str__(self): 1339 return self.session_key 1340 1341 class Keys(StrEnum): 1342 """ 1343 Keys to be set with the session interface for the fields above to be updated. 1344 1345 If a field is added here that needs to be initialized when the session is initialized, 1346 it must also be reflected in authentik.root.middleware.SessionMiddleware.process_request 1347 and in authentik.core.sessions.SessionStore.__init__ 1348 """ 1349 1350 LAST_IP = "last_ip" 1351 LAST_USER_AGENT = "last_user_agent" 1352 LAST_USED = "last_used" 1353 1354 @classmethod 1355 def get_session_store_class(cls): 1356 from authentik.core.sessions import SessionStore 1357 1358 return SessionStore 1359 1360 def get_decoded(self): 1361 raise NotImplementedError
User session with extra fields for fast access
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
Method descriptor with partial application of the given arguments and keywords.
Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.
Method descriptor with partial application of the given arguments and keywords.
Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.
Accessor to the related object on the reverse side of a one-to-one relation.
In the example::
class Restaurant(Model):
place = OneToOneField(Place, related_name='restaurant')
Place.restaurant is a ReverseOneToOneDescriptor instance.
Inherited Members
1341 class Keys(StrEnum): 1342 """ 1343 Keys to be set with the session interface for the fields above to be updated. 1344 1345 If a field is added here that needs to be initialized when the session is initialized, 1346 it must also be reflected in authentik.root.middleware.SessionMiddleware.process_request 1347 and in authentik.core.sessions.SessionStore.__init__ 1348 """ 1349 1350 LAST_IP = "last_ip" 1351 LAST_USER_AGENT = "last_user_agent" 1352 LAST_USED = "last_used"
Keys to be set with the session interface for the fields above to be updated.
If a field is added here that needs to be initialized when the session is initialized, it must also be reflected in authentik.root.middleware.SessionMiddleware.process_request and in authentik.core.sessions.SessionStore.__init__
The requested object does not exist
The query returned multiple objects when only one was expected.
1364class AuthenticatedSession(SerializerModel): 1365 session = models.OneToOneField(Session, on_delete=models.CASCADE, primary_key=True) 1366 # We use the session as primary key, but we need the API to be able to reference 1367 # this object uniquely without exposing the session key 1368 uuid = models.UUIDField(default=uuid4, unique=True) 1369 1370 user = models.ForeignKey(User, on_delete=models.CASCADE) 1371 1372 @property 1373 def serializer(self) -> type[Serializer]: 1374 from authentik.core.api.authenticated_sessions import AuthenticatedSessionSerializer 1375 1376 return AuthenticatedSessionSerializer 1377 1378 class Meta: 1379 verbose_name = _("Authenticated Session") 1380 verbose_name_plural = _("Authenticated Sessions") 1381 1382 def __str__(self) -> str: 1383 return f"Authenticated Session {str(self.pk)[:10]}" 1384 1385 @staticmethod 1386 def from_request(request: HttpRequest, user: User) -> AuthenticatedSession | None: 1387 """Create a new session from a http request""" 1388 if not hasattr(request, "session") or not request.session.exists( 1389 request.session.session_key 1390 ): 1391 return None 1392 return AuthenticatedSession( 1393 session=Session.objects.filter(session_key=request.session.session_key).first(), 1394 user=user, 1395 )
AuthenticatedSession(session, uuid, user)
Accessor to the related object on the forward side of a one-to-one relation.
In the example::
class Restaurant(Model):
place = OneToOneField(Place, related_name='restaurant')
Restaurant.place is a ForwardOneToOneDescriptor instance.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Child.parent is a ForwardManyToOneDescriptor instance.
1372 @property 1373 def serializer(self) -> type[Serializer]: 1374 from authentik.core.api.authenticated_sessions import AuthenticatedSessionSerializer 1375 1376 return AuthenticatedSessionSerializer
Get serializer for this model
1385 @staticmethod 1386 def from_request(request: HttpRequest, user: User) -> AuthenticatedSession | None: 1387 """Create a new session from a http request""" 1388 if not hasattr(request, "session") or not request.session.exists( 1389 request.session.session_key 1390 ): 1391 return None 1392 return AuthenticatedSession( 1393 session=Session.objects.filter(session_key=request.session.session_key).first(), 1394 user=user, 1395 )
Create a new session from a http request
Accessor to the related objects manager on the reverse side of a many-to-one relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Parent.children is a ReverseManyToOneDescriptor instance.
Most of the implementation is delegated to a dynamically defined manager
class built by create_forward_many_to_many_manager() defined below.
Accessor to the related objects manager on the reverse side of a many-to-one relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Parent.children is a ReverseManyToOneDescriptor instance.
Most of the implementation is delegated to a dynamically defined manager
class built by create_forward_many_to_many_manager() defined below.
Accessor to the related objects manager on the reverse side of a many-to-one relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Parent.children is a ReverseManyToOneDescriptor instance.
Most of the implementation is delegated to a dynamically defined manager
class built by create_forward_many_to_many_manager() defined below.
Accessor to the related objects manager on the reverse side of a many-to-one relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Parent.children is a ReverseManyToOneDescriptor instance.
Most of the implementation is delegated to a dynamically defined manager
class built by create_forward_many_to_many_manager() defined below.
Accessor to the related objects manager on the reverse side of a many-to-one relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Parent.children is a ReverseManyToOneDescriptor instance.
Most of the implementation is delegated to a dynamically defined manager
class built by create_forward_many_to_many_manager() defined below.
Inherited Members
The requested object does not exist
The query returned multiple objects when only one was expected.