authentik.core.api.users

User API Views

   1"""User API Views"""
   2
   3from datetime import timedelta
   4from json import loads
   5from typing import Any
   6
   7from django.contrib.auth import update_session_auth_hash
   8from django.contrib.auth.models import AnonymousUser, Permission
   9from django.db.models import Exists, OuterRef, Prefetch, Q
  10from django.db.transaction import atomic
  11from django.db.utils import IntegrityError
  12from django.urls import reverse_lazy
  13from django.utils.http import urlencode
  14from django.utils.text import slugify
  15from django.utils.timezone import now
  16from django.utils.translation import gettext as _
  17from django.utils.translation import gettext_lazy
  18from django_filters.filters import (
  19    BooleanFilter,
  20    CharFilter,
  21    IsoDateTimeFilter,
  22    ModelMultipleChoiceFilter,
  23    MultipleChoiceFilter,
  24    UUIDFilter,
  25)
  26from django_filters.filterset import FilterSet
  27from djangoql.schema import BoolField, StrField
  28from drf_spectacular.types import OpenApiTypes
  29from drf_spectacular.utils import (
  30    OpenApiParameter,
  31    OpenApiResponse,
  32    extend_schema,
  33    extend_schema_field,
  34    inline_serializer,
  35)
  36from rest_framework.authentication import SessionAuthentication
  37from rest_framework.decorators import action
  38from rest_framework.exceptions import ValidationError
  39from rest_framework.fields import (
  40    BooleanField,
  41    CharField,
  42    ChoiceField,
  43    DateTimeField,
  44    IntegerField,
  45    ListField,
  46    SerializerMethodField,
  47    UUIDField,
  48)
  49from rest_framework.permissions import IsAuthenticated
  50from rest_framework.request import Request
  51from rest_framework.response import Response
  52from rest_framework.serializers import (
  53    ListSerializer,
  54    PrimaryKeyRelatedField,
  55)
  56from rest_framework.validators import UniqueValidator
  57from rest_framework.viewsets import ModelViewSet
  58from structlog.stdlib import get_logger
  59
  60from authentik.api.authentication import TokenAuthentication
  61from authentik.api.search.fields import (
  62    ChoiceSearchField,
  63    JSONSearchField,
  64)
  65from authentik.api.validation import validate
  66from authentik.blueprints.v1.importer import SERIALIZER_CONTEXT_BLUEPRINT
  67from authentik.brands.models import Brand
  68from authentik.core.api.used_by import UsedByMixin
  69from authentik.core.api.utils import (
  70    JSONDictField,
  71    LinkSerializer,
  72    ModelSerializer,
  73    PassiveSerializer,
  74)
  75from authentik.core.middleware import (
  76    SESSION_KEY_IMPERSONATE_ORIGINAL_USER,
  77    SESSION_KEY_IMPERSONATE_USER,
  78)
  79from authentik.core.models import (
  80    USER_ATTRIBUTE_TOKEN_EXPIRING,
  81    USER_PATH_SERVICE_ACCOUNT,
  82    USERNAME_MAX_LENGTH,
  83    Group,
  84    Session,
  85    Token,
  86    TokenIntents,
  87    User,
  88    UserTypes,
  89    default_token_duration,
  90)
  91from authentik.endpoints.connectors.agent.auth import AgentAuth
  92from authentik.events.models import Event, EventAction
  93from authentik.flows.exceptions import FlowNonApplicableException
  94from authentik.flows.models import FlowToken
  95from authentik.flows.planner import PLAN_CONTEXT_PENDING_USER, FlowPlanner
  96from authentik.flows.views.executor import QS_KEY_TOKEN
  97from authentik.lib.avatars import get_avatar
  98from authentik.lib.utils.reflection import ConditionalInheritance
  99from authentik.lib.utils.time import timedelta_from_string, timedelta_string_validator
 100from authentik.rbac.api.roles import RoleSerializer
 101from authentik.rbac.decorators import permission_required
 102from authentik.rbac.models import Role, get_permission_choices
 103from authentik.stages.email.flow import pickle_flow_token_for_email
 104from authentik.stages.email.models import EmailStage
 105from authentik.stages.email.tasks import send_mails
 106from authentik.stages.email.utils import TemplateEmailMessage
 107
 108LOGGER = get_logger()
 109
 110INVALID_PASSWORD_HASH_MESSAGE = gettext_lazy(
 111    "Invalid password hash format. Must be a valid Django password hash."
 112)
 113
 114
 115class ParamUserSerializer(PassiveSerializer):
 116    """Partial serializer for query parameters to select a user"""
 117
 118    user = PrimaryKeyRelatedField(queryset=User.objects.all().exclude_anonymous(), required=False)
 119
 120
 121class PartialGroupSerializer(ModelSerializer):
 122    """Partial Group Serializer, does not include child relations."""
 123
 124    attributes = JSONDictField(required=False)
 125
 126    class Meta:
 127        model = Group
 128        fields = [
 129            "pk",
 130            "num_pk",
 131            "name",
 132            "is_superuser",
 133            "attributes",
 134        ]
 135
 136
 137class UserSerializer(ModelSerializer):
 138    """User Serializer"""
 139
 140    is_superuser = SerializerMethodField()
 141    avatar = SerializerMethodField()
 142    attributes = JSONDictField(required=False)
 143    groups = PrimaryKeyRelatedField(
 144        allow_empty=True,
 145        many=True,
 146        queryset=Group.objects.all().order_by("name"),
 147        default=list,
 148    )
 149    groups_obj = SerializerMethodField(allow_null=True)
 150    roles = PrimaryKeyRelatedField(
 151        allow_empty=True,
 152        many=True,
 153        queryset=Role.objects.all().order_by("name"),
 154        default=list,
 155    )
 156    roles_obj = SerializerMethodField(allow_null=True)
 157    uid = CharField(read_only=True)
 158    username = CharField(
 159        max_length=USERNAME_MAX_LENGTH,
 160        validators=[UniqueValidator(queryset=User.objects.all().order_by("username"))],
 161    )
 162
 163    @property
 164    def _should_include_groups(self) -> bool:
 165        request: Request = self.context.get("request", None)
 166        if not request:
 167            return True
 168        return str(request.query_params.get("include_groups", "true")).lower() == "true"
 169
 170    @property
 171    def _should_include_roles(self) -> bool:
 172        request: Request = self.context.get("request", None)
 173        if not request:
 174            return True
 175        return str(request.query_params.get("include_roles", "true")).lower() == "true"
 176
 177    @extend_schema_field(BooleanField)
 178    def get_is_superuser(self, instance: User) -> bool:
 179        """Use annotation if available to avoid N+1 query"""
 180        ann = getattr(instance, "_annotated_is_superuser", None)
 181        if ann is not None:
 182            return ann
 183        return instance.is_superuser
 184
 185    @extend_schema_field(PartialGroupSerializer(many=True))
 186    def get_groups_obj(self, instance: User) -> list[PartialGroupSerializer] | None:
 187        if not self._should_include_groups:
 188            return None
 189        return PartialGroupSerializer(instance.groups, many=True).data
 190
 191    @extend_schema_field(RoleSerializer(many=True))
 192    def get_roles_obj(self, instance: User) -> list[RoleSerializer] | None:
 193        if not self._should_include_roles:
 194            return None
 195        return RoleSerializer(instance.roles, many=True).data
 196
 197    def __init__(self, *args, **kwargs):
 198        """Setting password and permissions directly is allowed only in blueprints."""
 199        super().__init__(*args, **kwargs)
 200        if SERIALIZER_CONTEXT_BLUEPRINT in self.context:
 201            self.fields["password"] = CharField(required=False, allow_null=True)
 202            self.fields["password_hash"] = CharField(required=False, allow_null=True)
 203            self.fields["permissions"] = ListField(
 204                required=False,
 205                child=ChoiceField(choices=get_permission_choices()),
 206            )
 207
 208    def create(self, validated_data: dict) -> User:
 209        """Create a user, with blueprint-only password and permission writes."""
 210        is_blueprint = SERIALIZER_CONTEXT_BLUEPRINT in self.context
 211        if is_blueprint:
 212            password = validated_data.pop("password", None)
 213            password_hash = validated_data.pop("password_hash", None)
 214            permissions = validated_data.pop("permissions", [])
 215            self._validate_password_inputs(password, password_hash)
 216
 217        instance: User = super().create(validated_data)
 218        if is_blueprint:
 219            self._set_password(instance, password, password_hash)
 220            perms_qs = Permission.objects.filter(
 221                codename__in=[permission.split(".")[1] for permission in permissions]
 222            ).values_list("content_type__app_label", "codename")
 223            perms_list = [f"{ct}.{name}" for ct, name in perms_qs]
 224            instance.assign_perms_to_managed_role(perms_list)
 225        self._ensure_password_not_empty(instance)
 226        return instance
 227
 228    def update(self, instance: User, validated_data: dict) -> User:
 229        """Update a user, with blueprint-only password and permission writes."""
 230        is_blueprint = SERIALIZER_CONTEXT_BLUEPRINT in self.context
 231        if is_blueprint:
 232            password = validated_data.pop("password", None)
 233            password_hash = validated_data.pop("password_hash", None)
 234            permissions = validated_data.pop("permissions", [])
 235            self._validate_password_inputs(password, password_hash)
 236
 237        instance = super().update(instance, validated_data)
 238        if is_blueprint:
 239            self._set_password(instance, password, password_hash)
 240            perms_qs = Permission.objects.filter(
 241                codename__in=[permission.split(".")[1] for permission in permissions]
 242            ).values_list("content_type__app_label", "codename")
 243            perms_list = [f"{ct}.{name}" for ct, name in perms_qs]
 244            instance.assign_perms_to_managed_role(perms_list)
 245        self._ensure_password_not_empty(instance)
 246        return instance
 247
 248    def _validate_password_inputs(self, password: str | None, password_hash: str | None):
 249        """Validate mutually-exclusive password inputs before any model mutation."""
 250        if password is not None and password_hash is not None:
 251            raise ValidationError(_("Cannot set both password and password_hash. Use only one."))
 252        if password_hash is None:
 253            return
 254        try:
 255            User.validate_password_hash(password_hash)
 256        except ValueError as exc:
 257            LOGGER.warning("Failed to identify password hash format", exc_info=exc)
 258            raise ValidationError(INVALID_PASSWORD_HASH_MESSAGE) from exc
 259
 260    def _set_password(self, instance: User, password: str | None, password_hash: str | None = None):
 261        """Set password from plain text or hash."""
 262        if password_hash is not None:
 263            instance.set_password_from_hash(password_hash)
 264            instance.save()
 265        elif password:
 266            instance.set_password(password)
 267            instance.save()
 268
 269    def _ensure_password_not_empty(self, instance: User):
 270        """Store an explicit unusable password instead of an empty password field."""
 271        if len(instance.password) == 0:
 272            instance.set_unusable_password()
 273            instance.save()
 274
 275    def get_avatar(self, user: User) -> str:
 276        """User's avatar, either a http/https URL or a data URI"""
 277        return get_avatar(user, self.context.get("request"))
 278
 279    def validate_path(self, path: str) -> str:
 280        """Validate path"""
 281        if path[:1] == "/" or path[-1] == "/":
 282            raise ValidationError(_("No leading or trailing slashes allowed."))
 283        for segment in path.split("/"):
 284            if segment == "":
 285                raise ValidationError(_("No empty segments in user path allowed."))
 286        return path
 287
 288    def validate_type(self, user_type: str) -> str:
 289        """Validate user type, internal_service_account is an internal value"""
 290        if (
 291            self.instance
 292            and self.instance.type == UserTypes.INTERNAL_SERVICE_ACCOUNT
 293            and user_type != UserTypes.INTERNAL_SERVICE_ACCOUNT.value
 294        ):
 295            raise ValidationError(_("Can't change internal service account to other user type."))
 296        if not self.instance and user_type == UserTypes.INTERNAL_SERVICE_ACCOUNT.value:
 297            raise ValidationError(_("Setting a user to internal service account is not allowed."))
 298        return user_type
 299
 300    def validate_groups(self, groups: list) -> list:
 301        """Require enable_group_superuser permission when adding a user to a superuser group."""
 302        request: Request = self.context.get("request", None)
 303        if not request:
 304            return groups
 305        current_groups = set(self.instance.groups.all()) if self.instance else set()
 306        for group in groups:
 307            if not group.is_superuser:
 308                continue
 309            if group in current_groups:
 310                continue
 311            if not request.user.has_perm("authentik_core.enable_group_superuser"):
 312                raise ValidationError(
 313                    _("User does not have permission to add members to a superuser group.")
 314                )
 315        return groups
 316
 317    def validate_roles(self, roles: list) -> list:
 318        """Require change_role permission when assigning new roles to a user."""
 319        request: Request = self.context.get("request", None)
 320        if not request:
 321            return roles
 322        current_roles = set(self.instance.roles.all()) if self.instance else set()
 323        new_roles = [r for r in roles if r not in current_roles]
 324        if not new_roles:
 325            return roles
 326        if not request.user.has_perm("authentik_rbac.change_role"):
 327            raise ValidationError(_("User does not have permission to assign roles."))
 328        return roles
 329
 330    def validate(self, attrs: dict) -> dict:
 331        if self.instance and self.instance.type == UserTypes.INTERNAL_SERVICE_ACCOUNT:
 332            raise ValidationError(_("Can't modify internal service account users"))
 333        return super().validate(attrs)
 334
 335    class Meta:
 336        model = User
 337        fields = [
 338            "pk",
 339            "username",
 340            "name",
 341            "is_active",
 342            "last_login",
 343            "date_joined",
 344            "is_superuser",
 345            "groups",
 346            "groups_obj",
 347            "roles",
 348            "roles_obj",
 349            "email",
 350            "avatar",
 351            "attributes",
 352            "uid",
 353            "path",
 354            "type",
 355            "uuid",
 356            "password_change_date",
 357            "last_updated",
 358        ]
 359        extra_kwargs = {
 360            "name": {"allow_blank": True},
 361            "date_joined": {"read_only": True},
 362            "password_change_date": {"read_only": True},
 363        }
 364
 365
 366class UserSelfSerializer(ModelSerializer):
 367    """User Serializer for information a user can retrieve about themselves"""
 368
 369    is_superuser = BooleanField(read_only=True)
 370    avatar = SerializerMethodField()
 371    groups = SerializerMethodField()
 372    roles = SerializerMethodField()
 373    uid = CharField(read_only=True)
 374    settings = SerializerMethodField()
 375    system_permissions = SerializerMethodField()
 376
 377    def get_avatar(self, user: User) -> str:
 378        """User's avatar, either a http/https URL or a data URI"""
 379        return get_avatar(user, self.context.get("request"))
 380
 381    @extend_schema_field(
 382        ListSerializer(
 383            child=inline_serializer(
 384                "UserSelfGroups",
 385                {
 386                    "name": CharField(read_only=True),
 387                    "pk": CharField(read_only=True),
 388                },
 389            )
 390        )
 391    )
 392    def get_groups(self, _: User):
 393        """Return only the group names a user is member of"""
 394        for group in self.instance.all_groups().order_by("name"):
 395            yield {
 396                "name": group.name,
 397                "pk": group.pk,
 398            }
 399
 400    @extend_schema_field(
 401        ListSerializer(
 402            child=inline_serializer(
 403                "UserSelfRoles",
 404                {
 405                    "name": CharField(read_only=True),
 406                    "pk": CharField(read_only=True),
 407                },
 408            )
 409        )
 410    )
 411    def get_roles(self, _: User):
 412        """Return only the roles a user is member of"""
 413        for role in self.instance.all_roles().order_by("name"):
 414            yield {
 415                "name": role.name,
 416                "pk": role.pk,
 417            }
 418
 419    def get_settings(self, user: User) -> dict[str, Any]:
 420        """Get user settings with brand and group settings applied"""
 421        return user.group_attributes(self._context["request"]).get("settings", {})
 422
 423    def get_system_permissions(self, user: User) -> list[str]:
 424        """Get all system permissions assigned to the user"""
 425        return list(
 426            x.split(".", maxsplit=1)[1]
 427            for x in user.get_all_permissions()
 428            if x.startswith("authentik_rbac")
 429        )
 430
 431    class Meta:
 432        model = User
 433        fields = [
 434            "pk",
 435            "username",
 436            "name",
 437            "is_active",
 438            "is_superuser",
 439            "groups",
 440            "roles",
 441            "email",
 442            "avatar",
 443            "uid",
 444            "settings",
 445            "type",
 446            "system_permissions",
 447        ]
 448        extra_kwargs = {
 449            "is_active": {"read_only": True},
 450            "name": {"allow_blank": True},
 451        }
 452
 453
 454class SessionUserSerializer(PassiveSerializer):
 455    """Response for the /user/me endpoint, returns the currently active user (as `user` property)
 456    and, if this user is being impersonated, the original user in the `original` property.
 457    """
 458
 459    user = UserSelfSerializer()
 460    original = UserSelfSerializer(required=False)
 461
 462
 463class UserPasswordSetSerializer(PassiveSerializer):
 464    """Payload to set a users' password directly"""
 465
 466    password = CharField(required=True)
 467
 468
 469class UserPasswordHashSetSerializer(PassiveSerializer):
 470    """Payload to set a users' password hash directly"""
 471
 472    password = CharField(required=True)
 473
 474
 475class UserServiceAccountSerializer(PassiveSerializer):
 476    """Payload to create a service account"""
 477
 478    name = CharField(
 479        required=True,
 480        validators=[UniqueValidator(queryset=User.objects.all().order_by("username"))],
 481    )
 482    create_group = BooleanField(default=False)
 483    expiring = BooleanField(default=True)
 484    expires = DateTimeField(
 485        required=False,
 486        help_text="If not provided, valid for 360 days",
 487    )
 488
 489
 490class UserRecoveryLinkSerializer(PassiveSerializer):
 491    """Payload to create a recovery link"""
 492
 493    token_duration = CharField(required=False)
 494
 495
 496class UserRecoveryEmailSerializer(UserRecoveryLinkSerializer):
 497    """Payload to create and email a recovery link"""
 498
 499    email_stage = UUIDField()
 500
 501
 502class UsersFilter(FilterSet):
 503    """Filter for users"""
 504
 505    attributes = CharFilter(
 506        field_name="attributes",
 507        lookup_expr="",
 508        label="Attributes",
 509        method="filter_attributes",
 510    )
 511
 512    date_joined__lt = IsoDateTimeFilter(field_name="date_joined", lookup_expr="lt")
 513    date_joined = IsoDateTimeFilter(field_name="date_joined")
 514    date_joined__gt = IsoDateTimeFilter(field_name="date_joined", lookup_expr="gt")
 515
 516    last_updated__lt = IsoDateTimeFilter(field_name="last_updated", lookup_expr="lt")
 517    last_updated = IsoDateTimeFilter(field_name="last_updated")
 518    last_updated__gt = IsoDateTimeFilter(field_name="last_updated", lookup_expr="gt")
 519
 520    last_login__lt = IsoDateTimeFilter(field_name="last_login", lookup_expr="lt")
 521    last_login = IsoDateTimeFilter(field_name="last_login")
 522    last_login__gt = IsoDateTimeFilter(field_name="last_login", lookup_expr="gt")
 523    last_login__isnull = BooleanFilter(field_name="last_login", lookup_expr="isnull")
 524
 525    is_superuser = BooleanFilter(field_name="groups", method="filter_is_superuser")
 526    uuid = UUIDFilter(field_name="uuid")
 527
 528    path = CharFilter(field_name="path")
 529    path_startswith = CharFilter(field_name="path", lookup_expr="startswith")
 530
 531    type = MultipleChoiceFilter(choices=UserTypes.choices, field_name="type")
 532
 533    groups_by_name = ModelMultipleChoiceFilter(
 534        field_name="groups__name",
 535        to_field_name="name",
 536        queryset=Group.objects.all().order_by("name"),
 537    )
 538    groups_by_pk = ModelMultipleChoiceFilter(
 539        field_name="groups",
 540        queryset=Group.objects.all().order_by("name"),
 541    )
 542
 543    roles_by_name = ModelMultipleChoiceFilter(
 544        field_name="roles__name",
 545        to_field_name="name",
 546        queryset=Role.objects.all().order_by("name"),
 547    )
 548    roles_by_pk = ModelMultipleChoiceFilter(
 549        field_name="roles",
 550        queryset=Role.objects.all().order_by("name"),
 551    )
 552
 553    def filter_is_superuser(self, queryset, name, value):
 554        if value:
 555            return queryset.filter(groups__is_superuser=True).distinct()
 556        return queryset.exclude(groups__is_superuser=True).distinct()
 557
 558    def filter_attributes(self, queryset, name, value):
 559        """Filter attributes by query args"""
 560        try:
 561            value = loads(value)
 562        except ValueError:
 563            raise ValidationError(_("filter: failed to parse JSON")) from None
 564        if not isinstance(value, dict):
 565            raise ValidationError(_("filter: value must be key:value mapping"))
 566        qs = {}
 567        for key, _value in value.items():
 568            qs[f"attributes__{key}"] = _value
 569        try:
 570            __ = len(queryset.filter(**qs))
 571            return queryset.filter(**qs)
 572        except ValueError:
 573            return queryset
 574
 575    class Meta:
 576        model = User
 577        fields = [
 578            "username",
 579            "email",
 580            "date_joined",
 581            "last_updated",
 582            "last_login",
 583            "name",
 584            "is_active",
 585            "is_superuser",
 586            "attributes",
 587            "groups_by_name",
 588            "groups_by_pk",
 589            "roles_by_name",
 590            "roles_by_pk",
 591            "type",
 592        ]
 593
 594
 595class UserViewSet(
 596    ConditionalInheritance(
 597        "authentik.enterprise.stages.account_lockdown.api.UserAccountLockdownMixin"
 598    ),
 599    ConditionalInheritance("authentik.enterprise.reports.api.reports.ExportMixin"),
 600    UsedByMixin,
 601    ModelViewSet,
 602):
 603    """User Viewset"""
 604
 605    queryset = User.objects.none()
 606    ordering = ["username", "date_joined", "last_updated", "last_login"]
 607    serializer_class = UserSerializer
 608    filterset_class = UsersFilter
 609    search_fields = ["email", "name", "uuid", "username"]
 610    authentication_classes = [
 611        TokenAuthentication,
 612        SessionAuthentication,
 613        AgentAuth,
 614    ]
 615
 616    def get_ql_fields(self):
 617        return [
 618            StrField(User, "username"),
 619            StrField(User, "name"),
 620            StrField(User, "email"),
 621            StrField(User, "path"),
 622            BoolField(User, "is_active", nullable=True),
 623            ChoiceSearchField(User, "type"),
 624            JSONSearchField(User, "attributes"),
 625        ]
 626
 627    def get_queryset(self):
 628        base_qs = User.objects.all().exclude_anonymous()
 629        # Always prefetch groups since group PKs are always serialized.
 630        # Use full prefetch when include_groups=true (for groups_obj), ID-only otherwise.
 631        if self.serializer_class(context={"request": self.request})._should_include_groups:
 632            base_qs = base_qs.prefetch_related("groups")
 633        else:
 634            base_qs = base_qs.prefetch_related(
 635                Prefetch("groups", queryset=Group.objects.all().only("group_uuid"))
 636            )
 637        if self.serializer_class(context={"request": self.request})._should_include_roles:
 638            base_qs = base_qs.prefetch_related("roles")
 639        else:
 640            base_qs = base_qs.prefetch_related(
 641                Prefetch("roles", queryset=Role.objects.all().only("uuid"))
 642            )
 643        # Annotate is_superuser to avoid N+1 query per user
 644        base_qs = base_qs.annotate(
 645            _annotated_is_superuser=Exists(
 646                Group.objects.filter(
 647                    is_superuser=True,
 648                ).filter(
 649                    Q(users=OuterRef("pk")) | Q(descendant_nodes__descendant__users=OuterRef("pk"))
 650                )
 651            )
 652        )
 653        return base_qs
 654
 655    @extend_schema(
 656        parameters=[
 657            OpenApiParameter("include_groups", bool, default=True),
 658            OpenApiParameter("include_roles", bool, default=True),
 659        ]
 660    )
 661    def list(self, request, *args, **kwargs):
 662        return super().list(request, *args, **kwargs)
 663
 664    def _create_recovery_link(
 665        self, token_duration: str | None, for_email=False
 666    ) -> tuple[str, Token]:
 667        """Create a recovery link (when the current brand has a recovery flow set),
 668        that can either be shown to an admin or sent to the user directly"""
 669        brand: Brand = self.request.brand
 670        # Check that there is a recovery flow, if not return an error
 671        flow = brand.flow_recovery
 672        if not flow:
 673            raise ValidationError({"non_field_errors": _("No recovery flow set.")})
 674        user: User = self.get_object()
 675        planner = FlowPlanner(flow)
 676        planner.allow_empty_flows = True
 677        self.request._request.user = AnonymousUser()
 678        try:
 679            plan = planner.plan(
 680                self.request._request,
 681                {
 682                    PLAN_CONTEXT_PENDING_USER: user,
 683                },
 684            )
 685        except FlowNonApplicableException:
 686            raise ValidationError(
 687                {"non_field_errors": _("Recovery flow not applicable to user")}
 688            ) from None
 689        _plan = FlowToken.pickle(plan)
 690        if for_email:
 691            _plan = pickle_flow_token_for_email(plan)
 692        expires = default_token_duration()
 693        if token_duration:
 694            timedelta_string_validator(token_duration)
 695            expires = now() + timedelta_from_string(token_duration)
 696        token, __ = FlowToken.objects.update_or_create(
 697            identifier=f"{user.uid}-password-reset",
 698            defaults={
 699                "user": user,
 700                "flow": flow,
 701                "_plan": _plan,
 702                "revoke_on_execution": not for_email,
 703                "expires": expires,
 704            },
 705        )
 706        querystring = urlencode({QS_KEY_TOKEN: token.key})
 707        link = self.request.build_absolute_uri(
 708            reverse_lazy("authentik_core:if-flow", kwargs={"flow_slug": flow.slug})
 709            + f"?{querystring}"
 710        )
 711        return link, token
 712
 713    @permission_required(None, ["authentik_core.add_user", "authentik_core.add_token"])
 714    @extend_schema(
 715        request=UserServiceAccountSerializer,
 716        responses={
 717            200: inline_serializer(
 718                "UserServiceAccountResponse",
 719                {
 720                    "username": CharField(required=True),
 721                    "token": CharField(required=True),
 722                    "user_uid": CharField(required=True),
 723                    "user_pk": IntegerField(required=True),
 724                    "group_pk": CharField(required=False),
 725                },
 726            )
 727        },
 728    )
 729    @action(
 730        detail=False,
 731        methods=["POST"],
 732        pagination_class=None,
 733        filter_backends=[],
 734    )
 735    @validate(UserServiceAccountSerializer)
 736    def service_account(self, request: Request, body: UserServiceAccountSerializer) -> Response:
 737        """Create a new user account that is marked as a service account"""
 738        expires = body.validated_data.get("expires", now() + timedelta(days=360))
 739
 740        username = body.validated_data["name"]
 741        expiring = body.validated_data["expiring"]
 742        with atomic():
 743            try:
 744                user: User = User.objects.create(
 745                    username=username,
 746                    name=username,
 747                    type=UserTypes.SERVICE_ACCOUNT,
 748                    attributes={USER_ATTRIBUTE_TOKEN_EXPIRING: expiring},
 749                    path=USER_PATH_SERVICE_ACCOUNT,
 750                )
 751                user.set_unusable_password()
 752                user.save()
 753
 754                response = {
 755                    "username": user.username,
 756                    "user_uid": user.uid,
 757                    "user_pk": user.pk,
 758                }
 759                if body.validated_data["create_group"] and self.request.user.has_perm(
 760                    "authentik_core.add_group"
 761                ):
 762                    group = Group.objects.create(name=username)
 763                    group.users.add(user)
 764                    response["group_pk"] = str(group.pk)
 765                token = Token.objects.create(
 766                    identifier=slugify(f"service-account-{username}-password"),
 767                    intent=TokenIntents.INTENT_APP_PASSWORD,
 768                    user=user,
 769                    expires=expires,
 770                    expiring=expiring,
 771                )
 772                response["token"] = token.key
 773                return Response(response)
 774            except IntegrityError as exc:
 775                error_msg = str(exc).lower()
 776
 777                if "unique" in error_msg:
 778                    return Response(
 779                        data={
 780                            "non_field_errors": [
 781                                _("A user/group with these details already exists")
 782                            ]
 783                        },
 784                        status=400,
 785                    )
 786                else:
 787                    LOGGER.warning("Service account creation failed", exc=exc)
 788                    return Response(
 789                        data={"non_field_errors": [_("Unable to create user")]},
 790                        status=400,
 791                    )
 792            except (ValueError, TypeError) as exc:
 793                LOGGER.error("Unexpected error during service account creation", exc=exc)
 794                return Response(
 795                    data={"non_field_errors": [_("Unknown error occurred")]},
 796                    status=500,
 797                )
 798
 799    @extend_schema(responses={200: SessionUserSerializer(many=False)})
 800    @action(
 801        url_path="me",
 802        url_name="me",
 803        detail=False,
 804        pagination_class=None,
 805        filter_backends=[],
 806    )
 807    def user_me(self, request: Request) -> Response:
 808        """Get information about current user"""
 809        context = {"request": request}
 810        serializer = SessionUserSerializer(
 811            data={"user": UserSelfSerializer(instance=request.user, context=context).data}
 812        )
 813        if SESSION_KEY_IMPERSONATE_USER in request._request.session:
 814            serializer.initial_data["original"] = UserSelfSerializer(
 815                instance=request._request.session[SESSION_KEY_IMPERSONATE_ORIGINAL_USER],
 816                context=context,
 817            ).data
 818        self.request.session.modified = True
 819        return Response(serializer.initial_data)
 820
 821    def _update_session_hash_after_password_change(self, request: Request, user: User):
 822        if user.pk == request.user.pk and SESSION_KEY_IMPERSONATE_USER not in self.request.session:
 823            LOGGER.debug("Updating session hash after password change")
 824            update_session_auth_hash(self.request, user)
 825
 826    @permission_required("authentik_core.reset_user_password")
 827    @extend_schema(
 828        request=UserPasswordSetSerializer,
 829        responses={
 830            204: OpenApiResponse(description="Successfully changed password"),
 831            400: OpenApiResponse(description="Bad request"),
 832        },
 833    )
 834    @action(
 835        detail=True,
 836        methods=["POST"],
 837        permission_classes=[IsAuthenticated],
 838    )
 839    @validate(UserPasswordSetSerializer)
 840    def set_password(self, request: Request, pk: int, body: UserPasswordSetSerializer) -> Response:
 841        """Set password for user"""
 842        user: User = self.get_object()
 843        try:
 844            user.set_password(body.validated_data["password"], request=request)
 845            user.save()
 846        except (ValidationError, IntegrityError) as exc:
 847            LOGGER.debug("Failed to set password", exc=exc)
 848            return Response(status=400)
 849        self._update_session_hash_after_password_change(request, user)
 850        return Response(status=204)
 851
 852    @permission_required("authentik_core.reset_user_password")
 853    @extend_schema(
 854        request=UserPasswordHashSetSerializer,
 855        responses={
 856            204: OpenApiResponse(description="Successfully changed password"),
 857            400: OpenApiResponse(description="Bad request"),
 858        },
 859    )
 860    @action(
 861        detail=True,
 862        methods=["POST"],
 863        permission_classes=[IsAuthenticated],
 864    )
 865    @validate(UserPasswordHashSetSerializer)
 866    def set_password_hash(
 867        self, request: Request, pk: int, body: UserPasswordHashSetSerializer
 868    ) -> Response:
 869        """Set a user's password from a pre-hashed Django password value.
 870
 871        Submit the Django password hash in the shared ``password`` request field.
 872
 873        This updates authentik's local password verifier only. It does not attempt
 874        to propagate the password change to LDAP or Kerberos because no raw password
 875        is available from the request payload.
 876        """
 877        user: User = self.get_object()
 878        try:
 879            user.set_password_from_hash(body.validated_data["password"], request=request)
 880            user.save()
 881        except ValueError as exc:
 882            LOGGER.debug("Failed to set password hash", exc=exc)
 883            return Response(data={"password": [INVALID_PASSWORD_HASH_MESSAGE]}, status=400)
 884        except (ValidationError, IntegrityError) as exc:
 885            LOGGER.debug("Failed to set password hash", exc=exc)
 886            return Response(status=400)
 887        self._update_session_hash_after_password_change(request, user)
 888        return Response(status=204)
 889
 890    @permission_required("authentik_core.reset_user_password")
 891    @extend_schema(
 892        request=UserRecoveryLinkSerializer,
 893        responses={
 894            "200": LinkSerializer(many=False),
 895        },
 896    )
 897    @action(detail=True, pagination_class=None, filter_backends=[], methods=["POST"])
 898    @validate(UserRecoveryLinkSerializer)
 899    def recovery(self, request: Request, pk: int, body: UserRecoveryLinkSerializer) -> Response:
 900        """Create a temporary link that a user can use to recover their account"""
 901        link, _ = self._create_recovery_link(
 902            token_duration=body.validated_data.get("token_duration")
 903        )
 904        return Response({"link": link})
 905
 906    @permission_required("authentik_core.reset_user_password")
 907    @extend_schema(
 908        request=UserRecoveryEmailSerializer,
 909        responses={
 910            "204": OpenApiResponse(description="Successfully sent recover email"),
 911        },
 912    )
 913    @action(detail=True, pagination_class=None, filter_backends=[], methods=["POST"])
 914    @validate(UserRecoveryEmailSerializer)
 915    def recovery_email(
 916        self, request: Request, pk: int, body: UserRecoveryEmailSerializer
 917    ) -> Response:
 918        """Send an email with a temporary link that a user can use to recover their account"""
 919        email_error_message = _("User does not have an email address set.")
 920        stage_error_message = _("Email stage not found.")
 921        user: User = self.get_object()
 922        if not user.email:
 923            LOGGER.debug("User doesn't have an email address")
 924            raise ValidationError({"non_field_errors": email_error_message})
 925        if not (stage := EmailStage.objects.filter(pk=body.validated_data["email_stage"]).first()):
 926            LOGGER.debug("Email stage does not exist")
 927            raise ValidationError({"non_field_errors": stage_error_message})
 928        if not request.user.has_perm("authentik_stages_email.view_emailstage", stage):
 929            LOGGER.debug("User has no view access to email stage")
 930            raise ValidationError({"non_field_errors": stage_error_message})
 931        link, token = self._create_recovery_link(
 932            token_duration=body.validated_data.get("token_duration"), for_email=True
 933        )
 934        message = TemplateEmailMessage(
 935            subject=_(stage.subject),
 936            to=[(user.name, user.email)],
 937            template_name=stage.template,
 938            language=user.locale(request),
 939            template_context={
 940                "url": link,
 941                "user": user,
 942                "expires": token.expires,
 943            },
 944        )
 945        send_mails(stage, message)
 946        return Response(status=204)
 947
 948    @permission_required("authentik_core.impersonate")
 949    @extend_schema(
 950        request=inline_serializer(
 951            "ImpersonationSerializer",
 952            {
 953                "reason": CharField(required=True),
 954            },
 955        ),
 956        responses={
 957            204: OpenApiResponse(description="Successfully started impersonation"),
 958        },
 959    )
 960    @action(detail=True, methods=["POST"], permission_classes=[IsAuthenticated])
 961    def impersonate(self, request: Request, pk: int) -> Response:
 962        """Impersonate a user"""
 963        if not request.tenant.impersonation:
 964            LOGGER.debug("User attempted to impersonate", user=request.user)
 965            return Response(status=401)
 966        user_to_be = self.get_object()
 967        reason = request.data.get("reason", "")
 968        # Check both object-level perms and global perms
 969        if not request.user.has_perm(
 970            "authentik_core.impersonate", user_to_be
 971        ) and not request.user.has_perm("authentik_core.impersonate"):
 972            LOGGER.debug(
 973                "User attempted to impersonate without permissions",
 974                user=request.user,
 975            )
 976            return Response(status=403)
 977        if user_to_be.pk == self.request.user.pk:
 978            LOGGER.debug("User attempted to impersonate themselves", user=request.user)
 979            return Response(status=401)
 980        if not reason and request.tenant.impersonation_require_reason:
 981            LOGGER.debug(
 982                "User attempted to impersonate without providing a reason",
 983                user=request.user,
 984            )
 985            raise ValidationError({"reason": _("This field is required.")})
 986
 987        request.session[SESSION_KEY_IMPERSONATE_ORIGINAL_USER] = request.user
 988        request.session[SESSION_KEY_IMPERSONATE_USER] = user_to_be
 989
 990        Event.new(EventAction.IMPERSONATION_STARTED, reason=reason).from_http(request, user_to_be)
 991
 992        return Response(status=204)
 993
 994    @extend_schema(
 995        request=None,
 996        responses={
 997            "204": OpenApiResponse(description="Successfully ended impersonation"),
 998        },
 999    )
1000    @action(detail=False, methods=["GET"])
1001    def impersonate_end(self, request: Request) -> Response:
1002        """End Impersonation a user"""
1003        if (
1004            SESSION_KEY_IMPERSONATE_USER not in request.session
1005            or SESSION_KEY_IMPERSONATE_ORIGINAL_USER not in request.session
1006        ):
1007            LOGGER.debug("Can't end impersonation", user=request.user)
1008            return Response(status=204)
1009
1010        original_user = request.session[SESSION_KEY_IMPERSONATE_ORIGINAL_USER]
1011
1012        del request.session[SESSION_KEY_IMPERSONATE_USER]
1013        del request.session[SESSION_KEY_IMPERSONATE_ORIGINAL_USER]
1014
1015        Event.new(EventAction.IMPERSONATION_ENDED).from_http(request, original_user)
1016
1017        return Response(status=204)
1018
1019    @extend_schema(
1020        responses={
1021            200: inline_serializer(
1022                "UserPathSerializer",
1023                {"paths": ListField(child=CharField(), read_only=True)},
1024            )
1025        },
1026        parameters=[
1027            OpenApiParameter(
1028                name="search",
1029                location=OpenApiParameter.QUERY,
1030                type=OpenApiTypes.STR,
1031            )
1032        ],
1033    )
1034    @action(detail=False, pagination_class=None)
1035    def paths(self, request: Request) -> Response:
1036        """Get all user paths"""
1037        return Response(
1038            data={
1039                "paths": list(
1040                    self.filter_queryset(self.get_queryset())
1041                    .values("path")
1042                    .distinct()
1043                    .order_by("path")
1044                    .values_list("path", flat=True)
1045                )
1046            }
1047        )
1048
1049    def partial_update(self, request: Request, *args, **kwargs) -> Response:
1050        response = super().partial_update(request, *args, **kwargs)
1051        instance: User = self.get_object()
1052        if not instance.is_active:
1053            Session.objects.filter(authenticatedsession__user=instance).delete()
1054            LOGGER.debug("Deleted user's sessions", user=instance.username)
1055        return response
LOGGER = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
INVALID_PASSWORD_HASH_MESSAGE = 'Invalid password hash format. Must be a valid Django password hash.'
class ParamUserSerializer(authentik.core.api.utils.PassiveSerializer):
116class ParamUserSerializer(PassiveSerializer):
117    """Partial serializer for query parameters to select a user"""
118
119    user = PrimaryKeyRelatedField(queryset=User.objects.all().exclude_anonymous(), required=False)

Partial serializer for query parameters to select a user

user
class PartialGroupSerializer(authentik.core.api.utils.ModelSerializer):
122class PartialGroupSerializer(ModelSerializer):
123    """Partial Group Serializer, does not include child relations."""
124
125    attributes = JSONDictField(required=False)
126
127    class Meta:
128        model = Group
129        fields = [
130            "pk",
131            "num_pk",
132            "name",
133            "is_superuser",
134            "attributes",
135        ]

Partial Group Serializer, does not include child relations.

attributes
class PartialGroupSerializer.Meta:
127    class Meta:
128        model = Group
129        fields = [
130            "pk",
131            "num_pk",
132            "name",
133            "is_superuser",
134            "attributes",
135        ]
model = <class 'authentik.core.models.Group'>
fields = ['pk', 'num_pk', 'name', 'is_superuser', 'attributes']
class UserSerializer(authentik.core.api.utils.ModelSerializer):
138class UserSerializer(ModelSerializer):
139    """User Serializer"""
140
141    is_superuser = SerializerMethodField()
142    avatar = SerializerMethodField()
143    attributes = JSONDictField(required=False)
144    groups = PrimaryKeyRelatedField(
145        allow_empty=True,
146        many=True,
147        queryset=Group.objects.all().order_by("name"),
148        default=list,
149    )
150    groups_obj = SerializerMethodField(allow_null=True)
151    roles = PrimaryKeyRelatedField(
152        allow_empty=True,
153        many=True,
154        queryset=Role.objects.all().order_by("name"),
155        default=list,
156    )
157    roles_obj = SerializerMethodField(allow_null=True)
158    uid = CharField(read_only=True)
159    username = CharField(
160        max_length=USERNAME_MAX_LENGTH,
161        validators=[UniqueValidator(queryset=User.objects.all().order_by("username"))],
162    )
163
164    @property
165    def _should_include_groups(self) -> bool:
166        request: Request = self.context.get("request", None)
167        if not request:
168            return True
169        return str(request.query_params.get("include_groups", "true")).lower() == "true"
170
171    @property
172    def _should_include_roles(self) -> bool:
173        request: Request = self.context.get("request", None)
174        if not request:
175            return True
176        return str(request.query_params.get("include_roles", "true")).lower() == "true"
177
178    @extend_schema_field(BooleanField)
179    def get_is_superuser(self, instance: User) -> bool:
180        """Use annotation if available to avoid N+1 query"""
181        ann = getattr(instance, "_annotated_is_superuser", None)
182        if ann is not None:
183            return ann
184        return instance.is_superuser
185
186    @extend_schema_field(PartialGroupSerializer(many=True))
187    def get_groups_obj(self, instance: User) -> list[PartialGroupSerializer] | None:
188        if not self._should_include_groups:
189            return None
190        return PartialGroupSerializer(instance.groups, many=True).data
191
192    @extend_schema_field(RoleSerializer(many=True))
193    def get_roles_obj(self, instance: User) -> list[RoleSerializer] | None:
194        if not self._should_include_roles:
195            return None
196        return RoleSerializer(instance.roles, many=True).data
197
198    def __init__(self, *args, **kwargs):
199        """Setting password and permissions directly is allowed only in blueprints."""
200        super().__init__(*args, **kwargs)
201        if SERIALIZER_CONTEXT_BLUEPRINT in self.context:
202            self.fields["password"] = CharField(required=False, allow_null=True)
203            self.fields["password_hash"] = CharField(required=False, allow_null=True)
204            self.fields["permissions"] = ListField(
205                required=False,
206                child=ChoiceField(choices=get_permission_choices()),
207            )
208
209    def create(self, validated_data: dict) -> User:
210        """Create a user, with blueprint-only password and permission writes."""
211        is_blueprint = SERIALIZER_CONTEXT_BLUEPRINT in self.context
212        if is_blueprint:
213            password = validated_data.pop("password", None)
214            password_hash = validated_data.pop("password_hash", None)
215            permissions = validated_data.pop("permissions", [])
216            self._validate_password_inputs(password, password_hash)
217
218        instance: User = super().create(validated_data)
219        if is_blueprint:
220            self._set_password(instance, password, password_hash)
221            perms_qs = Permission.objects.filter(
222                codename__in=[permission.split(".")[1] for permission in permissions]
223            ).values_list("content_type__app_label", "codename")
224            perms_list = [f"{ct}.{name}" for ct, name in perms_qs]
225            instance.assign_perms_to_managed_role(perms_list)
226        self._ensure_password_not_empty(instance)
227        return instance
228
229    def update(self, instance: User, validated_data: dict) -> User:
230        """Update a user, with blueprint-only password and permission writes."""
231        is_blueprint = SERIALIZER_CONTEXT_BLUEPRINT in self.context
232        if is_blueprint:
233            password = validated_data.pop("password", None)
234            password_hash = validated_data.pop("password_hash", None)
235            permissions = validated_data.pop("permissions", [])
236            self._validate_password_inputs(password, password_hash)
237
238        instance = super().update(instance, validated_data)
239        if is_blueprint:
240            self._set_password(instance, password, password_hash)
241            perms_qs = Permission.objects.filter(
242                codename__in=[permission.split(".")[1] for permission in permissions]
243            ).values_list("content_type__app_label", "codename")
244            perms_list = [f"{ct}.{name}" for ct, name in perms_qs]
245            instance.assign_perms_to_managed_role(perms_list)
246        self._ensure_password_not_empty(instance)
247        return instance
248
249    def _validate_password_inputs(self, password: str | None, password_hash: str | None):
250        """Validate mutually-exclusive password inputs before any model mutation."""
251        if password is not None and password_hash is not None:
252            raise ValidationError(_("Cannot set both password and password_hash. Use only one."))
253        if password_hash is None:
254            return
255        try:
256            User.validate_password_hash(password_hash)
257        except ValueError as exc:
258            LOGGER.warning("Failed to identify password hash format", exc_info=exc)
259            raise ValidationError(INVALID_PASSWORD_HASH_MESSAGE) from exc
260
261    def _set_password(self, instance: User, password: str | None, password_hash: str | None = None):
262        """Set password from plain text or hash."""
263        if password_hash is not None:
264            instance.set_password_from_hash(password_hash)
265            instance.save()
266        elif password:
267            instance.set_password(password)
268            instance.save()
269
270    def _ensure_password_not_empty(self, instance: User):
271        """Store an explicit unusable password instead of an empty password field."""
272        if len(instance.password) == 0:
273            instance.set_unusable_password()
274            instance.save()
275
276    def get_avatar(self, user: User) -> str:
277        """User's avatar, either a http/https URL or a data URI"""
278        return get_avatar(user, self.context.get("request"))
279
280    def validate_path(self, path: str) -> str:
281        """Validate path"""
282        if path[:1] == "/" or path[-1] == "/":
283            raise ValidationError(_("No leading or trailing slashes allowed."))
284        for segment in path.split("/"):
285            if segment == "":
286                raise ValidationError(_("No empty segments in user path allowed."))
287        return path
288
289    def validate_type(self, user_type: str) -> str:
290        """Validate user type, internal_service_account is an internal value"""
291        if (
292            self.instance
293            and self.instance.type == UserTypes.INTERNAL_SERVICE_ACCOUNT
294            and user_type != UserTypes.INTERNAL_SERVICE_ACCOUNT.value
295        ):
296            raise ValidationError(_("Can't change internal service account to other user type."))
297        if not self.instance and user_type == UserTypes.INTERNAL_SERVICE_ACCOUNT.value:
298            raise ValidationError(_("Setting a user to internal service account is not allowed."))
299        return user_type
300
301    def validate_groups(self, groups: list) -> list:
302        """Require enable_group_superuser permission when adding a user to a superuser group."""
303        request: Request = self.context.get("request", None)
304        if not request:
305            return groups
306        current_groups = set(self.instance.groups.all()) if self.instance else set()
307        for group in groups:
308            if not group.is_superuser:
309                continue
310            if group in current_groups:
311                continue
312            if not request.user.has_perm("authentik_core.enable_group_superuser"):
313                raise ValidationError(
314                    _("User does not have permission to add members to a superuser group.")
315                )
316        return groups
317
318    def validate_roles(self, roles: list) -> list:
319        """Require change_role permission when assigning new roles to a user."""
320        request: Request = self.context.get("request", None)
321        if not request:
322            return roles
323        current_roles = set(self.instance.roles.all()) if self.instance else set()
324        new_roles = [r for r in roles if r not in current_roles]
325        if not new_roles:
326            return roles
327        if not request.user.has_perm("authentik_rbac.change_role"):
328            raise ValidationError(_("User does not have permission to assign roles."))
329        return roles
330
331    def validate(self, attrs: dict) -> dict:
332        if self.instance and self.instance.type == UserTypes.INTERNAL_SERVICE_ACCOUNT:
333            raise ValidationError(_("Can't modify internal service account users"))
334        return super().validate(attrs)
335
336    class Meta:
337        model = User
338        fields = [
339            "pk",
340            "username",
341            "name",
342            "is_active",
343            "last_login",
344            "date_joined",
345            "is_superuser",
346            "groups",
347            "groups_obj",
348            "roles",
349            "roles_obj",
350            "email",
351            "avatar",
352            "attributes",
353            "uid",
354            "path",
355            "type",
356            "uuid",
357            "password_change_date",
358            "last_updated",
359        ]
360        extra_kwargs = {
361            "name": {"allow_blank": True},
362            "date_joined": {"read_only": True},
363            "password_change_date": {"read_only": True},
364        }

User Serializer

UserSerializer(*args, **kwargs)
198    def __init__(self, *args, **kwargs):
199        """Setting password and permissions directly is allowed only in blueprints."""
200        super().__init__(*args, **kwargs)
201        if SERIALIZER_CONTEXT_BLUEPRINT in self.context:
202            self.fields["password"] = CharField(required=False, allow_null=True)
203            self.fields["password_hash"] = CharField(required=False, allow_null=True)
204            self.fields["permissions"] = ListField(
205                required=False,
206                child=ChoiceField(choices=get_permission_choices()),
207            )

Setting password and permissions directly is allowed only in blueprints.

is_superuser
avatar
attributes
groups
groups_obj
roles
roles_obj
uid
username
@extend_schema_field(BooleanField)
def get_is_superuser(self, instance: authentik.core.models.User) -> bool:
178    @extend_schema_field(BooleanField)
179    def get_is_superuser(self, instance: User) -> bool:
180        """Use annotation if available to avoid N+1 query"""
181        ann = getattr(instance, "_annotated_is_superuser", None)
182        if ann is not None:
183            return ann
184        return instance.is_superuser

Use annotation if available to avoid N+1 query

@extend_schema_field(PartialGroupSerializer(many=True))
def get_groups_obj( self, instance: authentik.core.models.User) -> list[PartialGroupSerializer] | None:
186    @extend_schema_field(PartialGroupSerializer(many=True))
187    def get_groups_obj(self, instance: User) -> list[PartialGroupSerializer] | None:
188        if not self._should_include_groups:
189            return None
190        return PartialGroupSerializer(instance.groups, many=True).data
@extend_schema_field(RoleSerializer(many=True))
def get_roles_obj( self, instance: authentik.core.models.User) -> list[authentik.rbac.api.roles.RoleSerializer] | None:
192    @extend_schema_field(RoleSerializer(many=True))
193    def get_roles_obj(self, instance: User) -> list[RoleSerializer] | None:
194        if not self._should_include_roles:
195            return None
196        return RoleSerializer(instance.roles, many=True).data
def create(self, validated_data: dict) -> authentik.core.models.User:
209    def create(self, validated_data: dict) -> User:
210        """Create a user, with blueprint-only password and permission writes."""
211        is_blueprint = SERIALIZER_CONTEXT_BLUEPRINT in self.context
212        if is_blueprint:
213            password = validated_data.pop("password", None)
214            password_hash = validated_data.pop("password_hash", None)
215            permissions = validated_data.pop("permissions", [])
216            self._validate_password_inputs(password, password_hash)
217
218        instance: User = super().create(validated_data)
219        if is_blueprint:
220            self._set_password(instance, password, password_hash)
221            perms_qs = Permission.objects.filter(
222                codename__in=[permission.split(".")[1] for permission in permissions]
223            ).values_list("content_type__app_label", "codename")
224            perms_list = [f"{ct}.{name}" for ct, name in perms_qs]
225            instance.assign_perms_to_managed_role(perms_list)
226        self._ensure_password_not_empty(instance)
227        return instance

Create a user, with blueprint-only password and permission writes.

def update( self, instance: authentik.core.models.User, validated_data: dict) -> authentik.core.models.User:
229    def update(self, instance: User, validated_data: dict) -> User:
230        """Update a user, with blueprint-only password and permission writes."""
231        is_blueprint = SERIALIZER_CONTEXT_BLUEPRINT in self.context
232        if is_blueprint:
233            password = validated_data.pop("password", None)
234            password_hash = validated_data.pop("password_hash", None)
235            permissions = validated_data.pop("permissions", [])
236            self._validate_password_inputs(password, password_hash)
237
238        instance = super().update(instance, validated_data)
239        if is_blueprint:
240            self._set_password(instance, password, password_hash)
241            perms_qs = Permission.objects.filter(
242                codename__in=[permission.split(".")[1] for permission in permissions]
243            ).values_list("content_type__app_label", "codename")
244            perms_list = [f"{ct}.{name}" for ct, name in perms_qs]
245            instance.assign_perms_to_managed_role(perms_list)
246        self._ensure_password_not_empty(instance)
247        return instance

Update a user, with blueprint-only password and permission writes.

def get_avatar(self, user: authentik.core.models.User) -> str:
276    def get_avatar(self, user: User) -> str:
277        """User's avatar, either a http/https URL or a data URI"""
278        return get_avatar(user, self.context.get("request"))

User's avatar, either a http/https URL or a data URI

def validate_path(self, path: str) -> str:
280    def validate_path(self, path: str) -> str:
281        """Validate path"""
282        if path[:1] == "/" or path[-1] == "/":
283            raise ValidationError(_("No leading or trailing slashes allowed."))
284        for segment in path.split("/"):
285            if segment == "":
286                raise ValidationError(_("No empty segments in user path allowed."))
287        return path

Validate path

def validate_type(self, user_type: str) -> str:
289    def validate_type(self, user_type: str) -> str:
290        """Validate user type, internal_service_account is an internal value"""
291        if (
292            self.instance
293            and self.instance.type == UserTypes.INTERNAL_SERVICE_ACCOUNT
294            and user_type != UserTypes.INTERNAL_SERVICE_ACCOUNT.value
295        ):
296            raise ValidationError(_("Can't change internal service account to other user type."))
297        if not self.instance and user_type == UserTypes.INTERNAL_SERVICE_ACCOUNT.value:
298            raise ValidationError(_("Setting a user to internal service account is not allowed."))
299        return user_type

Validate user type, internal_service_account is an internal value

def validate_groups(self, groups: list) -> list:
301    def validate_groups(self, groups: list) -> list:
302        """Require enable_group_superuser permission when adding a user to a superuser group."""
303        request: Request = self.context.get("request", None)
304        if not request:
305            return groups
306        current_groups = set(self.instance.groups.all()) if self.instance else set()
307        for group in groups:
308            if not group.is_superuser:
309                continue
310            if group in current_groups:
311                continue
312            if not request.user.has_perm("authentik_core.enable_group_superuser"):
313                raise ValidationError(
314                    _("User does not have permission to add members to a superuser group.")
315                )
316        return groups

Require enable_group_superuser permission when adding a user to a superuser group.

def validate_roles(self, roles: list) -> list:
318    def validate_roles(self, roles: list) -> list:
319        """Require change_role permission when assigning new roles to a user."""
320        request: Request = self.context.get("request", None)
321        if not request:
322            return roles
323        current_roles = set(self.instance.roles.all()) if self.instance else set()
324        new_roles = [r for r in roles if r not in current_roles]
325        if not new_roles:
326            return roles
327        if not request.user.has_perm("authentik_rbac.change_role"):
328            raise ValidationError(_("User does not have permission to assign roles."))
329        return roles

Require change_role permission when assigning new roles to a user.

def validate(self, attrs: dict) -> dict:
331    def validate(self, attrs: dict) -> dict:
332        if self.instance and self.instance.type == UserTypes.INTERNAL_SERVICE_ACCOUNT:
333            raise ValidationError(_("Can't modify internal service account users"))
334        return super().validate(attrs)
class UserSerializer.Meta:
336    class Meta:
337        model = User
338        fields = [
339            "pk",
340            "username",
341            "name",
342            "is_active",
343            "last_login",
344            "date_joined",
345            "is_superuser",
346            "groups",
347            "groups_obj",
348            "roles",
349            "roles_obj",
350            "email",
351            "avatar",
352            "attributes",
353            "uid",
354            "path",
355            "type",
356            "uuid",
357            "password_change_date",
358            "last_updated",
359        ]
360        extra_kwargs = {
361            "name": {"allow_blank": True},
362            "date_joined": {"read_only": True},
363            "password_change_date": {"read_only": True},
364        }
model = <class 'authentik.core.models.User'>
fields = ['pk', 'username', 'name', 'is_active', 'last_login', 'date_joined', 'is_superuser', 'groups', 'groups_obj', 'roles', 'roles_obj', 'email', 'avatar', 'attributes', 'uid', 'path', 'type', 'uuid', 'password_change_date', 'last_updated']
extra_kwargs = {'name': {'allow_blank': True}, 'date_joined': {'read_only': True}, 'password_change_date': {'read_only': True}}
class UserSelfSerializer(authentik.core.api.utils.ModelSerializer):
367class UserSelfSerializer(ModelSerializer):
368    """User Serializer for information a user can retrieve about themselves"""
369
370    is_superuser = BooleanField(read_only=True)
371    avatar = SerializerMethodField()
372    groups = SerializerMethodField()
373    roles = SerializerMethodField()
374    uid = CharField(read_only=True)
375    settings = SerializerMethodField()
376    system_permissions = SerializerMethodField()
377
378    def get_avatar(self, user: User) -> str:
379        """User's avatar, either a http/https URL or a data URI"""
380        return get_avatar(user, self.context.get("request"))
381
382    @extend_schema_field(
383        ListSerializer(
384            child=inline_serializer(
385                "UserSelfGroups",
386                {
387                    "name": CharField(read_only=True),
388                    "pk": CharField(read_only=True),
389                },
390            )
391        )
392    )
393    def get_groups(self, _: User):
394        """Return only the group names a user is member of"""
395        for group in self.instance.all_groups().order_by("name"):
396            yield {
397                "name": group.name,
398                "pk": group.pk,
399            }
400
401    @extend_schema_field(
402        ListSerializer(
403            child=inline_serializer(
404                "UserSelfRoles",
405                {
406                    "name": CharField(read_only=True),
407                    "pk": CharField(read_only=True),
408                },
409            )
410        )
411    )
412    def get_roles(self, _: User):
413        """Return only the roles a user is member of"""
414        for role in self.instance.all_roles().order_by("name"):
415            yield {
416                "name": role.name,
417                "pk": role.pk,
418            }
419
420    def get_settings(self, user: User) -> dict[str, Any]:
421        """Get user settings with brand and group settings applied"""
422        return user.group_attributes(self._context["request"]).get("settings", {})
423
424    def get_system_permissions(self, user: User) -> list[str]:
425        """Get all system permissions assigned to the user"""
426        return list(
427            x.split(".", maxsplit=1)[1]
428            for x in user.get_all_permissions()
429            if x.startswith("authentik_rbac")
430        )
431
432    class Meta:
433        model = User
434        fields = [
435            "pk",
436            "username",
437            "name",
438            "is_active",
439            "is_superuser",
440            "groups",
441            "roles",
442            "email",
443            "avatar",
444            "uid",
445            "settings",
446            "type",
447            "system_permissions",
448        ]
449        extra_kwargs = {
450            "is_active": {"read_only": True},
451            "name": {"allow_blank": True},
452        }

User Serializer for information a user can retrieve about themselves

is_superuser
avatar
groups
roles
uid
settings
system_permissions
def get_avatar(self, user: authentik.core.models.User) -> str:
378    def get_avatar(self, user: User) -> str:
379        """User's avatar, either a http/https URL or a data URI"""
380        return get_avatar(user, self.context.get("request"))

User's avatar, either a http/https URL or a data URI

@extend_schema_field(ListSerializer(child=inline_serializer('UserSelfGroups', {'name': CharField(read_only=True), 'pk': CharField(read_only=True)})))
def get_groups(self, _: authentik.core.models.User):
382    @extend_schema_field(
383        ListSerializer(
384            child=inline_serializer(
385                "UserSelfGroups",
386                {
387                    "name": CharField(read_only=True),
388                    "pk": CharField(read_only=True),
389                },
390            )
391        )
392    )
393    def get_groups(self, _: User):
394        """Return only the group names a user is member of"""
395        for group in self.instance.all_groups().order_by("name"):
396            yield {
397                "name": group.name,
398                "pk": group.pk,
399            }

Return only the group names a user is member of

@extend_schema_field(ListSerializer(child=inline_serializer('UserSelfRoles', {'name': CharField(read_only=True), 'pk': CharField(read_only=True)})))
def get_roles(self, _: authentik.core.models.User):
401    @extend_schema_field(
402        ListSerializer(
403            child=inline_serializer(
404                "UserSelfRoles",
405                {
406                    "name": CharField(read_only=True),
407                    "pk": CharField(read_only=True),
408                },
409            )
410        )
411    )
412    def get_roles(self, _: User):
413        """Return only the roles a user is member of"""
414        for role in self.instance.all_roles().order_by("name"):
415            yield {
416                "name": role.name,
417                "pk": role.pk,
418            }

Return only the roles a user is member of

def get_settings(self, user: authentik.core.models.User) -> dict[str, typing.Any]:
420    def get_settings(self, user: User) -> dict[str, Any]:
421        """Get user settings with brand and group settings applied"""
422        return user.group_attributes(self._context["request"]).get("settings", {})

Get user settings with brand and group settings applied

def get_system_permissions(self, user: authentik.core.models.User) -> list[str]:
424    def get_system_permissions(self, user: User) -> list[str]:
425        """Get all system permissions assigned to the user"""
426        return list(
427            x.split(".", maxsplit=1)[1]
428            for x in user.get_all_permissions()
429            if x.startswith("authentik_rbac")
430        )

Get all system permissions assigned to the user

class UserSelfSerializer.Meta:
432    class Meta:
433        model = User
434        fields = [
435            "pk",
436            "username",
437            "name",
438            "is_active",
439            "is_superuser",
440            "groups",
441            "roles",
442            "email",
443            "avatar",
444            "uid",
445            "settings",
446            "type",
447            "system_permissions",
448        ]
449        extra_kwargs = {
450            "is_active": {"read_only": True},
451            "name": {"allow_blank": True},
452        }
model = <class 'authentik.core.models.User'>
fields = ['pk', 'username', 'name', 'is_active', 'is_superuser', 'groups', 'roles', 'email', 'avatar', 'uid', 'settings', 'type', 'system_permissions']
extra_kwargs = {'is_active': {'read_only': True}, 'name': {'allow_blank': True}}
class SessionUserSerializer(authentik.core.api.utils.PassiveSerializer):
455class SessionUserSerializer(PassiveSerializer):
456    """Response for the /user/me endpoint, returns the currently active user (as `user` property)
457    and, if this user is being impersonated, the original user in the `original` property.
458    """
459
460    user = UserSelfSerializer()
461    original = UserSelfSerializer(required=False)

Response for the /user/me endpoint, returns the currently active user (as user property) and, if this user is being impersonated, the original user in the original property.

user
original
class UserPasswordSetSerializer(authentik.core.api.utils.PassiveSerializer):
464class UserPasswordSetSerializer(PassiveSerializer):
465    """Payload to set a users' password directly"""
466
467    password = CharField(required=True)

Payload to set a users' password directly

password
class UserPasswordHashSetSerializer(authentik.core.api.utils.PassiveSerializer):
470class UserPasswordHashSetSerializer(PassiveSerializer):
471    """Payload to set a users' password hash directly"""
472
473    password = CharField(required=True)

Payload to set a users' password hash directly

password
class UserServiceAccountSerializer(authentik.core.api.utils.PassiveSerializer):
476class UserServiceAccountSerializer(PassiveSerializer):
477    """Payload to create a service account"""
478
479    name = CharField(
480        required=True,
481        validators=[UniqueValidator(queryset=User.objects.all().order_by("username"))],
482    )
483    create_group = BooleanField(default=False)
484    expiring = BooleanField(default=True)
485    expires = DateTimeField(
486        required=False,
487        help_text="If not provided, valid for 360 days",
488    )

Payload to create a service account

name
create_group
expiring
expires
class UserRecoveryLinkSerializer(authentik.core.api.utils.PassiveSerializer):
491class UserRecoveryLinkSerializer(PassiveSerializer):
492    """Payload to create a recovery link"""
493
494    token_duration = CharField(required=False)

Payload to create a recovery link

token_duration
class UserRecoveryEmailSerializer(UserRecoveryLinkSerializer):
497class UserRecoveryEmailSerializer(UserRecoveryLinkSerializer):
498    """Payload to create and email a recovery link"""
499
500    email_stage = UUIDField()

Payload to create and email a recovery link

email_stage
class UsersFilter(django_filters.filterset.FilterSet):
503class UsersFilter(FilterSet):
504    """Filter for users"""
505
506    attributes = CharFilter(
507        field_name="attributes",
508        lookup_expr="",
509        label="Attributes",
510        method="filter_attributes",
511    )
512
513    date_joined__lt = IsoDateTimeFilter(field_name="date_joined", lookup_expr="lt")
514    date_joined = IsoDateTimeFilter(field_name="date_joined")
515    date_joined__gt = IsoDateTimeFilter(field_name="date_joined", lookup_expr="gt")
516
517    last_updated__lt = IsoDateTimeFilter(field_name="last_updated", lookup_expr="lt")
518    last_updated = IsoDateTimeFilter(field_name="last_updated")
519    last_updated__gt = IsoDateTimeFilter(field_name="last_updated", lookup_expr="gt")
520
521    last_login__lt = IsoDateTimeFilter(field_name="last_login", lookup_expr="lt")
522    last_login = IsoDateTimeFilter(field_name="last_login")
523    last_login__gt = IsoDateTimeFilter(field_name="last_login", lookup_expr="gt")
524    last_login__isnull = BooleanFilter(field_name="last_login", lookup_expr="isnull")
525
526    is_superuser = BooleanFilter(field_name="groups", method="filter_is_superuser")
527    uuid = UUIDFilter(field_name="uuid")
528
529    path = CharFilter(field_name="path")
530    path_startswith = CharFilter(field_name="path", lookup_expr="startswith")
531
532    type = MultipleChoiceFilter(choices=UserTypes.choices, field_name="type")
533
534    groups_by_name = ModelMultipleChoiceFilter(
535        field_name="groups__name",
536        to_field_name="name",
537        queryset=Group.objects.all().order_by("name"),
538    )
539    groups_by_pk = ModelMultipleChoiceFilter(
540        field_name="groups",
541        queryset=Group.objects.all().order_by("name"),
542    )
543
544    roles_by_name = ModelMultipleChoiceFilter(
545        field_name="roles__name",
546        to_field_name="name",
547        queryset=Role.objects.all().order_by("name"),
548    )
549    roles_by_pk = ModelMultipleChoiceFilter(
550        field_name="roles",
551        queryset=Role.objects.all().order_by("name"),
552    )
553
554    def filter_is_superuser(self, queryset, name, value):
555        if value:
556            return queryset.filter(groups__is_superuser=True).distinct()
557        return queryset.exclude(groups__is_superuser=True).distinct()
558
559    def filter_attributes(self, queryset, name, value):
560        """Filter attributes by query args"""
561        try:
562            value = loads(value)
563        except ValueError:
564            raise ValidationError(_("filter: failed to parse JSON")) from None
565        if not isinstance(value, dict):
566            raise ValidationError(_("filter: value must be key:value mapping"))
567        qs = {}
568        for key, _value in value.items():
569            qs[f"attributes__{key}"] = _value
570        try:
571            __ = len(queryset.filter(**qs))
572            return queryset.filter(**qs)
573        except ValueError:
574            return queryset
575
576    class Meta:
577        model = User
578        fields = [
579            "username",
580            "email",
581            "date_joined",
582            "last_updated",
583            "last_login",
584            "name",
585            "is_active",
586            "is_superuser",
587            "attributes",
588            "groups_by_name",
589            "groups_by_pk",
590            "roles_by_name",
591            "roles_by_pk",
592            "type",
593        ]

Filter for users

attributes
date_joined__lt
date_joined
date_joined__gt
last_updated__lt
last_updated
last_updated__gt
last_login__lt
last_login
last_login__gt
last_login__isnull
is_superuser
uuid
path
path_startswith
type
groups_by_name
groups_by_pk
roles_by_name
roles_by_pk
def filter_is_superuser(self, queryset, name, value):
554    def filter_is_superuser(self, queryset, name, value):
555        if value:
556            return queryset.filter(groups__is_superuser=True).distinct()
557        return queryset.exclude(groups__is_superuser=True).distinct()
def filter_attributes(self, queryset, name, value):
559    def filter_attributes(self, queryset, name, value):
560        """Filter attributes by query args"""
561        try:
562            value = loads(value)
563        except ValueError:
564            raise ValidationError(_("filter: failed to parse JSON")) from None
565        if not isinstance(value, dict):
566            raise ValidationError(_("filter: value must be key:value mapping"))
567        qs = {}
568        for key, _value in value.items():
569            qs[f"attributes__{key}"] = _value
570        try:
571            __ = len(queryset.filter(**qs))
572            return queryset.filter(**qs)
573        except ValueError:
574            return queryset

Filter attributes by query args

declared_filters = OrderedDict({'attributes': <django_filters.filters.CharFilter object>, 'date_joined__lt': <django_filters.filters.IsoDateTimeFilter object>, 'date_joined': <django_filters.filters.IsoDateTimeFilter object>, 'date_joined__gt': <django_filters.filters.IsoDateTimeFilter object>, 'last_updated__lt': <django_filters.filters.IsoDateTimeFilter object>, 'last_updated': <django_filters.filters.IsoDateTimeFilter object>, 'last_updated__gt': <django_filters.filters.IsoDateTimeFilter object>, 'last_login__lt': <django_filters.filters.IsoDateTimeFilter object>, 'last_login': <django_filters.filters.IsoDateTimeFilter object>, 'last_login__gt': <django_filters.filters.IsoDateTimeFilter object>, 'last_login__isnull': <django_filters.filters.BooleanFilter object>, 'is_superuser': <django_filters.filters.BooleanFilter object>, 'uuid': <django_filters.filters.UUIDFilter object>, 'path': <django_filters.filters.CharFilter object>, 'path_startswith': <django_filters.filters.CharFilter object>, 'type': <django_filters.filters.MultipleChoiceFilter object>, 'groups_by_name': <django_filters.filters.ModelMultipleChoiceFilter object>, 'groups_by_pk': <django_filters.filters.ModelMultipleChoiceFilter object>, 'roles_by_name': <django_filters.filters.ModelMultipleChoiceFilter object>, 'roles_by_pk': <django_filters.filters.ModelMultipleChoiceFilter object>})
base_filters = OrderedDict({'username': <django_filters.filters.CharFilter object>, 'email': <django_filters.filters.CharFilter object>, 'date_joined': <django_filters.filters.IsoDateTimeFilter object>, 'last_updated': <django_filters.filters.IsoDateTimeFilter object>, 'last_login': <django_filters.filters.IsoDateTimeFilter object>, 'name': <django_filters.filters.CharFilter object>, 'is_active': <django_filters.filters.BooleanFilter object>, 'is_superuser': <django_filters.filters.BooleanFilter object>, 'attributes': <django_filters.filters.CharFilter object>, 'groups_by_name': <django_filters.filters.ModelMultipleChoiceFilter object>, 'groups_by_pk': <django_filters.filters.ModelMultipleChoiceFilter object>, 'roles_by_name': <django_filters.filters.ModelMultipleChoiceFilter object>, 'roles_by_pk': <django_filters.filters.ModelMultipleChoiceFilter object>, 'type': <django_filters.filters.MultipleChoiceFilter object>, 'date_joined__lt': <django_filters.filters.IsoDateTimeFilter object>, 'date_joined__gt': <django_filters.filters.IsoDateTimeFilter object>, 'last_updated__lt': <django_filters.filters.IsoDateTimeFilter object>, 'last_updated__gt': <django_filters.filters.IsoDateTimeFilter object>, 'last_login__lt': <django_filters.filters.IsoDateTimeFilter object>, 'last_login__gt': <django_filters.filters.IsoDateTimeFilter object>, 'last_login__isnull': <django_filters.filters.BooleanFilter object>, 'uuid': <django_filters.filters.UUIDFilter object>, 'path': <django_filters.filters.CharFilter object>, 'path_startswith': <django_filters.filters.CharFilter object>})
class UsersFilter.Meta:
576    class Meta:
577        model = User
578        fields = [
579            "username",
580            "email",
581            "date_joined",
582            "last_updated",
583            "last_login",
584            "name",
585            "is_active",
586            "is_superuser",
587            "attributes",
588            "groups_by_name",
589            "groups_by_pk",
590            "roles_by_name",
591            "roles_by_pk",
592            "type",
593        ]
model = <class 'authentik.core.models.User'>
fields = ['username', 'email', 'date_joined', 'last_updated', 'last_login', 'name', 'is_active', 'is_superuser', 'attributes', 'groups_by_name', 'groups_by_pk', 'roles_by_name', 'roles_by_pk', 'type']
 596class UserViewSet(
 597    ConditionalInheritance(
 598        "authentik.enterprise.stages.account_lockdown.api.UserAccountLockdownMixin"
 599    ),
 600    ConditionalInheritance("authentik.enterprise.reports.api.reports.ExportMixin"),
 601    UsedByMixin,
 602    ModelViewSet,
 603):
 604    """User Viewset"""
 605
 606    queryset = User.objects.none()
 607    ordering = ["username", "date_joined", "last_updated", "last_login"]
 608    serializer_class = UserSerializer
 609    filterset_class = UsersFilter
 610    search_fields = ["email", "name", "uuid", "username"]
 611    authentication_classes = [
 612        TokenAuthentication,
 613        SessionAuthentication,
 614        AgentAuth,
 615    ]
 616
 617    def get_ql_fields(self):
 618        return [
 619            StrField(User, "username"),
 620            StrField(User, "name"),
 621            StrField(User, "email"),
 622            StrField(User, "path"),
 623            BoolField(User, "is_active", nullable=True),
 624            ChoiceSearchField(User, "type"),
 625            JSONSearchField(User, "attributes"),
 626        ]
 627
 628    def get_queryset(self):
 629        base_qs = User.objects.all().exclude_anonymous()
 630        # Always prefetch groups since group PKs are always serialized.
 631        # Use full prefetch when include_groups=true (for groups_obj), ID-only otherwise.
 632        if self.serializer_class(context={"request": self.request})._should_include_groups:
 633            base_qs = base_qs.prefetch_related("groups")
 634        else:
 635            base_qs = base_qs.prefetch_related(
 636                Prefetch("groups", queryset=Group.objects.all().only("group_uuid"))
 637            )
 638        if self.serializer_class(context={"request": self.request})._should_include_roles:
 639            base_qs = base_qs.prefetch_related("roles")
 640        else:
 641            base_qs = base_qs.prefetch_related(
 642                Prefetch("roles", queryset=Role.objects.all().only("uuid"))
 643            )
 644        # Annotate is_superuser to avoid N+1 query per user
 645        base_qs = base_qs.annotate(
 646            _annotated_is_superuser=Exists(
 647                Group.objects.filter(
 648                    is_superuser=True,
 649                ).filter(
 650                    Q(users=OuterRef("pk")) | Q(descendant_nodes__descendant__users=OuterRef("pk"))
 651                )
 652            )
 653        )
 654        return base_qs
 655
 656    @extend_schema(
 657        parameters=[
 658            OpenApiParameter("include_groups", bool, default=True),
 659            OpenApiParameter("include_roles", bool, default=True),
 660        ]
 661    )
 662    def list(self, request, *args, **kwargs):
 663        return super().list(request, *args, **kwargs)
 664
 665    def _create_recovery_link(
 666        self, token_duration: str | None, for_email=False
 667    ) -> tuple[str, Token]:
 668        """Create a recovery link (when the current brand has a recovery flow set),
 669        that can either be shown to an admin or sent to the user directly"""
 670        brand: Brand = self.request.brand
 671        # Check that there is a recovery flow, if not return an error
 672        flow = brand.flow_recovery
 673        if not flow:
 674            raise ValidationError({"non_field_errors": _("No recovery flow set.")})
 675        user: User = self.get_object()
 676        planner = FlowPlanner(flow)
 677        planner.allow_empty_flows = True
 678        self.request._request.user = AnonymousUser()
 679        try:
 680            plan = planner.plan(
 681                self.request._request,
 682                {
 683                    PLAN_CONTEXT_PENDING_USER: user,
 684                },
 685            )
 686        except FlowNonApplicableException:
 687            raise ValidationError(
 688                {"non_field_errors": _("Recovery flow not applicable to user")}
 689            ) from None
 690        _plan = FlowToken.pickle(plan)
 691        if for_email:
 692            _plan = pickle_flow_token_for_email(plan)
 693        expires = default_token_duration()
 694        if token_duration:
 695            timedelta_string_validator(token_duration)
 696            expires = now() + timedelta_from_string(token_duration)
 697        token, __ = FlowToken.objects.update_or_create(
 698            identifier=f"{user.uid}-password-reset",
 699            defaults={
 700                "user": user,
 701                "flow": flow,
 702                "_plan": _plan,
 703                "revoke_on_execution": not for_email,
 704                "expires": expires,
 705            },
 706        )
 707        querystring = urlencode({QS_KEY_TOKEN: token.key})
 708        link = self.request.build_absolute_uri(
 709            reverse_lazy("authentik_core:if-flow", kwargs={"flow_slug": flow.slug})
 710            + f"?{querystring}"
 711        )
 712        return link, token
 713
 714    @permission_required(None, ["authentik_core.add_user", "authentik_core.add_token"])
 715    @extend_schema(
 716        request=UserServiceAccountSerializer,
 717        responses={
 718            200: inline_serializer(
 719                "UserServiceAccountResponse",
 720                {
 721                    "username": CharField(required=True),
 722                    "token": CharField(required=True),
 723                    "user_uid": CharField(required=True),
 724                    "user_pk": IntegerField(required=True),
 725                    "group_pk": CharField(required=False),
 726                },
 727            )
 728        },
 729    )
 730    @action(
 731        detail=False,
 732        methods=["POST"],
 733        pagination_class=None,
 734        filter_backends=[],
 735    )
 736    @validate(UserServiceAccountSerializer)
 737    def service_account(self, request: Request, body: UserServiceAccountSerializer) -> Response:
 738        """Create a new user account that is marked as a service account"""
 739        expires = body.validated_data.get("expires", now() + timedelta(days=360))
 740
 741        username = body.validated_data["name"]
 742        expiring = body.validated_data["expiring"]
 743        with atomic():
 744            try:
 745                user: User = User.objects.create(
 746                    username=username,
 747                    name=username,
 748                    type=UserTypes.SERVICE_ACCOUNT,
 749                    attributes={USER_ATTRIBUTE_TOKEN_EXPIRING: expiring},
 750                    path=USER_PATH_SERVICE_ACCOUNT,
 751                )
 752                user.set_unusable_password()
 753                user.save()
 754
 755                response = {
 756                    "username": user.username,
 757                    "user_uid": user.uid,
 758                    "user_pk": user.pk,
 759                }
 760                if body.validated_data["create_group"] and self.request.user.has_perm(
 761                    "authentik_core.add_group"
 762                ):
 763                    group = Group.objects.create(name=username)
 764                    group.users.add(user)
 765                    response["group_pk"] = str(group.pk)
 766                token = Token.objects.create(
 767                    identifier=slugify(f"service-account-{username}-password"),
 768                    intent=TokenIntents.INTENT_APP_PASSWORD,
 769                    user=user,
 770                    expires=expires,
 771                    expiring=expiring,
 772                )
 773                response["token"] = token.key
 774                return Response(response)
 775            except IntegrityError as exc:
 776                error_msg = str(exc).lower()
 777
 778                if "unique" in error_msg:
 779                    return Response(
 780                        data={
 781                            "non_field_errors": [
 782                                _("A user/group with these details already exists")
 783                            ]
 784                        },
 785                        status=400,
 786                    )
 787                else:
 788                    LOGGER.warning("Service account creation failed", exc=exc)
 789                    return Response(
 790                        data={"non_field_errors": [_("Unable to create user")]},
 791                        status=400,
 792                    )
 793            except (ValueError, TypeError) as exc:
 794                LOGGER.error("Unexpected error during service account creation", exc=exc)
 795                return Response(
 796                    data={"non_field_errors": [_("Unknown error occurred")]},
 797                    status=500,
 798                )
 799
 800    @extend_schema(responses={200: SessionUserSerializer(many=False)})
 801    @action(
 802        url_path="me",
 803        url_name="me",
 804        detail=False,
 805        pagination_class=None,
 806        filter_backends=[],
 807    )
 808    def user_me(self, request: Request) -> Response:
 809        """Get information about current user"""
 810        context = {"request": request}
 811        serializer = SessionUserSerializer(
 812            data={"user": UserSelfSerializer(instance=request.user, context=context).data}
 813        )
 814        if SESSION_KEY_IMPERSONATE_USER in request._request.session:
 815            serializer.initial_data["original"] = UserSelfSerializer(
 816                instance=request._request.session[SESSION_KEY_IMPERSONATE_ORIGINAL_USER],
 817                context=context,
 818            ).data
 819        self.request.session.modified = True
 820        return Response(serializer.initial_data)
 821
 822    def _update_session_hash_after_password_change(self, request: Request, user: User):
 823        if user.pk == request.user.pk and SESSION_KEY_IMPERSONATE_USER not in self.request.session:
 824            LOGGER.debug("Updating session hash after password change")
 825            update_session_auth_hash(self.request, user)
 826
 827    @permission_required("authentik_core.reset_user_password")
 828    @extend_schema(
 829        request=UserPasswordSetSerializer,
 830        responses={
 831            204: OpenApiResponse(description="Successfully changed password"),
 832            400: OpenApiResponse(description="Bad request"),
 833        },
 834    )
 835    @action(
 836        detail=True,
 837        methods=["POST"],
 838        permission_classes=[IsAuthenticated],
 839    )
 840    @validate(UserPasswordSetSerializer)
 841    def set_password(self, request: Request, pk: int, body: UserPasswordSetSerializer) -> Response:
 842        """Set password for user"""
 843        user: User = self.get_object()
 844        try:
 845            user.set_password(body.validated_data["password"], request=request)
 846            user.save()
 847        except (ValidationError, IntegrityError) as exc:
 848            LOGGER.debug("Failed to set password", exc=exc)
 849            return Response(status=400)
 850        self._update_session_hash_after_password_change(request, user)
 851        return Response(status=204)
 852
 853    @permission_required("authentik_core.reset_user_password")
 854    @extend_schema(
 855        request=UserPasswordHashSetSerializer,
 856        responses={
 857            204: OpenApiResponse(description="Successfully changed password"),
 858            400: OpenApiResponse(description="Bad request"),
 859        },
 860    )
 861    @action(
 862        detail=True,
 863        methods=["POST"],
 864        permission_classes=[IsAuthenticated],
 865    )
 866    @validate(UserPasswordHashSetSerializer)
 867    def set_password_hash(
 868        self, request: Request, pk: int, body: UserPasswordHashSetSerializer
 869    ) -> Response:
 870        """Set a user's password from a pre-hashed Django password value.
 871
 872        Submit the Django password hash in the shared ``password`` request field.
 873
 874        This updates authentik's local password verifier only. It does not attempt
 875        to propagate the password change to LDAP or Kerberos because no raw password
 876        is available from the request payload.
 877        """
 878        user: User = self.get_object()
 879        try:
 880            user.set_password_from_hash(body.validated_data["password"], request=request)
 881            user.save()
 882        except ValueError as exc:
 883            LOGGER.debug("Failed to set password hash", exc=exc)
 884            return Response(data={"password": [INVALID_PASSWORD_HASH_MESSAGE]}, status=400)
 885        except (ValidationError, IntegrityError) as exc:
 886            LOGGER.debug("Failed to set password hash", exc=exc)
 887            return Response(status=400)
 888        self._update_session_hash_after_password_change(request, user)
 889        return Response(status=204)
 890
 891    @permission_required("authentik_core.reset_user_password")
 892    @extend_schema(
 893        request=UserRecoveryLinkSerializer,
 894        responses={
 895            "200": LinkSerializer(many=False),
 896        },
 897    )
 898    @action(detail=True, pagination_class=None, filter_backends=[], methods=["POST"])
 899    @validate(UserRecoveryLinkSerializer)
 900    def recovery(self, request: Request, pk: int, body: UserRecoveryLinkSerializer) -> Response:
 901        """Create a temporary link that a user can use to recover their account"""
 902        link, _ = self._create_recovery_link(
 903            token_duration=body.validated_data.get("token_duration")
 904        )
 905        return Response({"link": link})
 906
 907    @permission_required("authentik_core.reset_user_password")
 908    @extend_schema(
 909        request=UserRecoveryEmailSerializer,
 910        responses={
 911            "204": OpenApiResponse(description="Successfully sent recover email"),
 912        },
 913    )
 914    @action(detail=True, pagination_class=None, filter_backends=[], methods=["POST"])
 915    @validate(UserRecoveryEmailSerializer)
 916    def recovery_email(
 917        self, request: Request, pk: int, body: UserRecoveryEmailSerializer
 918    ) -> Response:
 919        """Send an email with a temporary link that a user can use to recover their account"""
 920        email_error_message = _("User does not have an email address set.")
 921        stage_error_message = _("Email stage not found.")
 922        user: User = self.get_object()
 923        if not user.email:
 924            LOGGER.debug("User doesn't have an email address")
 925            raise ValidationError({"non_field_errors": email_error_message})
 926        if not (stage := EmailStage.objects.filter(pk=body.validated_data["email_stage"]).first()):
 927            LOGGER.debug("Email stage does not exist")
 928            raise ValidationError({"non_field_errors": stage_error_message})
 929        if not request.user.has_perm("authentik_stages_email.view_emailstage", stage):
 930            LOGGER.debug("User has no view access to email stage")
 931            raise ValidationError({"non_field_errors": stage_error_message})
 932        link, token = self._create_recovery_link(
 933            token_duration=body.validated_data.get("token_duration"), for_email=True
 934        )
 935        message = TemplateEmailMessage(
 936            subject=_(stage.subject),
 937            to=[(user.name, user.email)],
 938            template_name=stage.template,
 939            language=user.locale(request),
 940            template_context={
 941                "url": link,
 942                "user": user,
 943                "expires": token.expires,
 944            },
 945        )
 946        send_mails(stage, message)
 947        return Response(status=204)
 948
 949    @permission_required("authentik_core.impersonate")
 950    @extend_schema(
 951        request=inline_serializer(
 952            "ImpersonationSerializer",
 953            {
 954                "reason": CharField(required=True),
 955            },
 956        ),
 957        responses={
 958            204: OpenApiResponse(description="Successfully started impersonation"),
 959        },
 960    )
 961    @action(detail=True, methods=["POST"], permission_classes=[IsAuthenticated])
 962    def impersonate(self, request: Request, pk: int) -> Response:
 963        """Impersonate a user"""
 964        if not request.tenant.impersonation:
 965            LOGGER.debug("User attempted to impersonate", user=request.user)
 966            return Response(status=401)
 967        user_to_be = self.get_object()
 968        reason = request.data.get("reason", "")
 969        # Check both object-level perms and global perms
 970        if not request.user.has_perm(
 971            "authentik_core.impersonate", user_to_be
 972        ) and not request.user.has_perm("authentik_core.impersonate"):
 973            LOGGER.debug(
 974                "User attempted to impersonate without permissions",
 975                user=request.user,
 976            )
 977            return Response(status=403)
 978        if user_to_be.pk == self.request.user.pk:
 979            LOGGER.debug("User attempted to impersonate themselves", user=request.user)
 980            return Response(status=401)
 981        if not reason and request.tenant.impersonation_require_reason:
 982            LOGGER.debug(
 983                "User attempted to impersonate without providing a reason",
 984                user=request.user,
 985            )
 986            raise ValidationError({"reason": _("This field is required.")})
 987
 988        request.session[SESSION_KEY_IMPERSONATE_ORIGINAL_USER] = request.user
 989        request.session[SESSION_KEY_IMPERSONATE_USER] = user_to_be
 990
 991        Event.new(EventAction.IMPERSONATION_STARTED, reason=reason).from_http(request, user_to_be)
 992
 993        return Response(status=204)
 994
 995    @extend_schema(
 996        request=None,
 997        responses={
 998            "204": OpenApiResponse(description="Successfully ended impersonation"),
 999        },
1000    )
1001    @action(detail=False, methods=["GET"])
1002    def impersonate_end(self, request: Request) -> Response:
1003        """End Impersonation a user"""
1004        if (
1005            SESSION_KEY_IMPERSONATE_USER not in request.session
1006            or SESSION_KEY_IMPERSONATE_ORIGINAL_USER not in request.session
1007        ):
1008            LOGGER.debug("Can't end impersonation", user=request.user)
1009            return Response(status=204)
1010
1011        original_user = request.session[SESSION_KEY_IMPERSONATE_ORIGINAL_USER]
1012
1013        del request.session[SESSION_KEY_IMPERSONATE_USER]
1014        del request.session[SESSION_KEY_IMPERSONATE_ORIGINAL_USER]
1015
1016        Event.new(EventAction.IMPERSONATION_ENDED).from_http(request, original_user)
1017
1018        return Response(status=204)
1019
1020    @extend_schema(
1021        responses={
1022            200: inline_serializer(
1023                "UserPathSerializer",
1024                {"paths": ListField(child=CharField(), read_only=True)},
1025            )
1026        },
1027        parameters=[
1028            OpenApiParameter(
1029                name="search",
1030                location=OpenApiParameter.QUERY,
1031                type=OpenApiTypes.STR,
1032            )
1033        ],
1034    )
1035    @action(detail=False, pagination_class=None)
1036    def paths(self, request: Request) -> Response:
1037        """Get all user paths"""
1038        return Response(
1039            data={
1040                "paths": list(
1041                    self.filter_queryset(self.get_queryset())
1042                    .values("path")
1043                    .distinct()
1044                    .order_by("path")
1045                    .values_list("path", flat=True)
1046                )
1047            }
1048        )
1049
1050    def partial_update(self, request: Request, *args, **kwargs) -> Response:
1051        response = super().partial_update(request, *args, **kwargs)
1052        instance: User = self.get_object()
1053        if not instance.is_active:
1054            Session.objects.filter(authenticatedsession__user=instance).delete()
1055            LOGGER.debug("Deleted user's sessions", user=instance.username)
1056        return response

User Viewset

queryset = <UserQuerySet []>
ordering = ['username', 'date_joined', 'last_updated', 'last_login']
serializer_class = <class 'UserSerializer'>
filterset_class = <class 'UsersFilter'>
search_fields = ['email', 'name', 'uuid', 'username']
authentication_classes = [<class 'authentik.api.authentication.TokenAuthentication'>, <class 'rest_framework.authentication.SessionAuthentication'>, <class 'authentik.endpoints.connectors.agent.auth.AgentAuth'>]
def get_ql_fields(self):
617    def get_ql_fields(self):
618        return [
619            StrField(User, "username"),
620            StrField(User, "name"),
621            StrField(User, "email"),
622            StrField(User, "path"),
623            BoolField(User, "is_active", nullable=True),
624            ChoiceSearchField(User, "type"),
625            JSONSearchField(User, "attributes"),
626        ]
def get_queryset(self):
628    def get_queryset(self):
629        base_qs = User.objects.all().exclude_anonymous()
630        # Always prefetch groups since group PKs are always serialized.
631        # Use full prefetch when include_groups=true (for groups_obj), ID-only otherwise.
632        if self.serializer_class(context={"request": self.request})._should_include_groups:
633            base_qs = base_qs.prefetch_related("groups")
634        else:
635            base_qs = base_qs.prefetch_related(
636                Prefetch("groups", queryset=Group.objects.all().only("group_uuid"))
637            )
638        if self.serializer_class(context={"request": self.request})._should_include_roles:
639            base_qs = base_qs.prefetch_related("roles")
640        else:
641            base_qs = base_qs.prefetch_related(
642                Prefetch("roles", queryset=Role.objects.all().only("uuid"))
643            )
644        # Annotate is_superuser to avoid N+1 query per user
645        base_qs = base_qs.annotate(
646            _annotated_is_superuser=Exists(
647                Group.objects.filter(
648                    is_superuser=True,
649                ).filter(
650                    Q(users=OuterRef("pk")) | Q(descendant_nodes__descendant__users=OuterRef("pk"))
651                )
652            )
653        )
654        return base_qs

Get the list of items for this view. This must be an iterable, and may be a queryset. Defaults to using self.queryset.

This method should always be used rather than accessing self.queryset directly, as self.queryset gets evaluated only once, and those results are cached for all subsequent requests.

You may want to override this if you need to provide different querysets depending on the incoming request.

(Eg. return a list of items that is specific to the user)

@extend_schema(parameters=[OpenApiParameter('include_groups', bool, default=True), OpenApiParameter('include_roles', bool, default=True)])
def list(self, request, *args, **kwargs):
656    @extend_schema(
657        parameters=[
658            OpenApiParameter("include_groups", bool, default=True),
659            OpenApiParameter("include_roles", bool, default=True),
660        ]
661    )
662    def list(self, request, *args, **kwargs):
663        return super().list(request, *args, **kwargs)
@permission_required(None, ['authentik_core.add_user', 'authentik_core.add_token'])
@extend_schema(request=UserServiceAccountSerializer, responses={200: inline_serializer('UserServiceAccountResponse', {'username': CharField(required=True), 'token': CharField(required=True), 'user_uid': CharField(required=True), 'user_pk': IntegerField(required=True), 'group_pk': CharField(required=False)})})
@action(detail=False, methods=['POST'], pagination_class=None, filter_backends=[])
@validate(UserServiceAccountSerializer)
def service_account( self, request: rest_framework.request.Request, body: UserServiceAccountSerializer) -> rest_framework.response.Response:
714    @permission_required(None, ["authentik_core.add_user", "authentik_core.add_token"])
715    @extend_schema(
716        request=UserServiceAccountSerializer,
717        responses={
718            200: inline_serializer(
719                "UserServiceAccountResponse",
720                {
721                    "username": CharField(required=True),
722                    "token": CharField(required=True),
723                    "user_uid": CharField(required=True),
724                    "user_pk": IntegerField(required=True),
725                    "group_pk": CharField(required=False),
726                },
727            )
728        },
729    )
730    @action(
731        detail=False,
732        methods=["POST"],
733        pagination_class=None,
734        filter_backends=[],
735    )
736    @validate(UserServiceAccountSerializer)
737    def service_account(self, request: Request, body: UserServiceAccountSerializer) -> Response:
738        """Create a new user account that is marked as a service account"""
739        expires = body.validated_data.get("expires", now() + timedelta(days=360))
740
741        username = body.validated_data["name"]
742        expiring = body.validated_data["expiring"]
743        with atomic():
744            try:
745                user: User = User.objects.create(
746                    username=username,
747                    name=username,
748                    type=UserTypes.SERVICE_ACCOUNT,
749                    attributes={USER_ATTRIBUTE_TOKEN_EXPIRING: expiring},
750                    path=USER_PATH_SERVICE_ACCOUNT,
751                )
752                user.set_unusable_password()
753                user.save()
754
755                response = {
756                    "username": user.username,
757                    "user_uid": user.uid,
758                    "user_pk": user.pk,
759                }
760                if body.validated_data["create_group"] and self.request.user.has_perm(
761                    "authentik_core.add_group"
762                ):
763                    group = Group.objects.create(name=username)
764                    group.users.add(user)
765                    response["group_pk"] = str(group.pk)
766                token = Token.objects.create(
767                    identifier=slugify(f"service-account-{username}-password"),
768                    intent=TokenIntents.INTENT_APP_PASSWORD,
769                    user=user,
770                    expires=expires,
771                    expiring=expiring,
772                )
773                response["token"] = token.key
774                return Response(response)
775            except IntegrityError as exc:
776                error_msg = str(exc).lower()
777
778                if "unique" in error_msg:
779                    return Response(
780                        data={
781                            "non_field_errors": [
782                                _("A user/group with these details already exists")
783                            ]
784                        },
785                        status=400,
786                    )
787                else:
788                    LOGGER.warning("Service account creation failed", exc=exc)
789                    return Response(
790                        data={"non_field_errors": [_("Unable to create user")]},
791                        status=400,
792                    )
793            except (ValueError, TypeError) as exc:
794                LOGGER.error("Unexpected error during service account creation", exc=exc)
795                return Response(
796                    data={"non_field_errors": [_("Unknown error occurred")]},
797                    status=500,
798                )

Create a new user account that is marked as a service account

@extend_schema(responses={200: SessionUserSerializer(many=False)})
@action(url_path='me', url_name='me', detail=False, pagination_class=None, filter_backends=[])
def user_me( self, request: rest_framework.request.Request) -> rest_framework.response.Response:
800    @extend_schema(responses={200: SessionUserSerializer(many=False)})
801    @action(
802        url_path="me",
803        url_name="me",
804        detail=False,
805        pagination_class=None,
806        filter_backends=[],
807    )
808    def user_me(self, request: Request) -> Response:
809        """Get information about current user"""
810        context = {"request": request}
811        serializer = SessionUserSerializer(
812            data={"user": UserSelfSerializer(instance=request.user, context=context).data}
813        )
814        if SESSION_KEY_IMPERSONATE_USER in request._request.session:
815            serializer.initial_data["original"] = UserSelfSerializer(
816                instance=request._request.session[SESSION_KEY_IMPERSONATE_ORIGINAL_USER],
817                context=context,
818            ).data
819        self.request.session.modified = True
820        return Response(serializer.initial_data)

Get information about current user

@permission_required('authentik_core.reset_user_password')
@extend_schema(request=UserPasswordSetSerializer, responses={204: OpenApiResponse(description='Successfully changed password'), 400: OpenApiResponse(description='Bad request')})
@action(detail=True, methods=['POST'], permission_classes=[IsAuthenticated])
@validate(UserPasswordSetSerializer)
def set_password( self, request: rest_framework.request.Request, pk: int, body: UserPasswordSetSerializer) -> rest_framework.response.Response:
827    @permission_required("authentik_core.reset_user_password")
828    @extend_schema(
829        request=UserPasswordSetSerializer,
830        responses={
831            204: OpenApiResponse(description="Successfully changed password"),
832            400: OpenApiResponse(description="Bad request"),
833        },
834    )
835    @action(
836        detail=True,
837        methods=["POST"],
838        permission_classes=[IsAuthenticated],
839    )
840    @validate(UserPasswordSetSerializer)
841    def set_password(self, request: Request, pk: int, body: UserPasswordSetSerializer) -> Response:
842        """Set password for user"""
843        user: User = self.get_object()
844        try:
845            user.set_password(body.validated_data["password"], request=request)
846            user.save()
847        except (ValidationError, IntegrityError) as exc:
848            LOGGER.debug("Failed to set password", exc=exc)
849            return Response(status=400)
850        self._update_session_hash_after_password_change(request, user)
851        return Response(status=204)

Set password for user

@permission_required('authentik_core.reset_user_password')
@extend_schema(request=UserPasswordHashSetSerializer, responses={204: OpenApiResponse(description='Successfully changed password'), 400: OpenApiResponse(description='Bad request')})
@action(detail=True, methods=['POST'], permission_classes=[IsAuthenticated])
@validate(UserPasswordHashSetSerializer)
def set_password_hash( self, request: rest_framework.request.Request, pk: int, body: UserPasswordHashSetSerializer) -> rest_framework.response.Response:
853    @permission_required("authentik_core.reset_user_password")
854    @extend_schema(
855        request=UserPasswordHashSetSerializer,
856        responses={
857            204: OpenApiResponse(description="Successfully changed password"),
858            400: OpenApiResponse(description="Bad request"),
859        },
860    )
861    @action(
862        detail=True,
863        methods=["POST"],
864        permission_classes=[IsAuthenticated],
865    )
866    @validate(UserPasswordHashSetSerializer)
867    def set_password_hash(
868        self, request: Request, pk: int, body: UserPasswordHashSetSerializer
869    ) -> Response:
870        """Set a user's password from a pre-hashed Django password value.
871
872        Submit the Django password hash in the shared ``password`` request field.
873
874        This updates authentik's local password verifier only. It does not attempt
875        to propagate the password change to LDAP or Kerberos because no raw password
876        is available from the request payload.
877        """
878        user: User = self.get_object()
879        try:
880            user.set_password_from_hash(body.validated_data["password"], request=request)
881            user.save()
882        except ValueError as exc:
883            LOGGER.debug("Failed to set password hash", exc=exc)
884            return Response(data={"password": [INVALID_PASSWORD_HASH_MESSAGE]}, status=400)
885        except (ValidationError, IntegrityError) as exc:
886            LOGGER.debug("Failed to set password hash", exc=exc)
887            return Response(status=400)
888        self._update_session_hash_after_password_change(request, user)
889        return Response(status=204)

Set a user's password from a pre-hashed Django password value.

Submit the Django password hash in the shared password request field.

This updates authentik's local password verifier only. It does not attempt to propagate the password change to LDAP or Kerberos because no raw password is available from the request payload.

@permission_required('authentik_core.reset_user_password')
@extend_schema(request=UserRecoveryLinkSerializer, responses={'200': LinkSerializer(many=False)})
@action(detail=True, pagination_class=None, filter_backends=[], methods=['POST'])
@validate(UserRecoveryLinkSerializer)
def recovery( self, request: rest_framework.request.Request, pk: int, body: UserRecoveryLinkSerializer) -> rest_framework.response.Response:
891    @permission_required("authentik_core.reset_user_password")
892    @extend_schema(
893        request=UserRecoveryLinkSerializer,
894        responses={
895            "200": LinkSerializer(many=False),
896        },
897    )
898    @action(detail=True, pagination_class=None, filter_backends=[], methods=["POST"])
899    @validate(UserRecoveryLinkSerializer)
900    def recovery(self, request: Request, pk: int, body: UserRecoveryLinkSerializer) -> Response:
901        """Create a temporary link that a user can use to recover their account"""
902        link, _ = self._create_recovery_link(
903            token_duration=body.validated_data.get("token_duration")
904        )
905        return Response({"link": link})

Create a temporary link that a user can use to recover their account

@permission_required('authentik_core.reset_user_password')
@extend_schema(request=UserRecoveryEmailSerializer, responses={'204': OpenApiResponse(description='Successfully sent recover email')})
@action(detail=True, pagination_class=None, filter_backends=[], methods=['POST'])
@validate(UserRecoveryEmailSerializer)
def recovery_email( self, request: rest_framework.request.Request, pk: int, body: UserRecoveryEmailSerializer) -> rest_framework.response.Response:
907    @permission_required("authentik_core.reset_user_password")
908    @extend_schema(
909        request=UserRecoveryEmailSerializer,
910        responses={
911            "204": OpenApiResponse(description="Successfully sent recover email"),
912        },
913    )
914    @action(detail=True, pagination_class=None, filter_backends=[], methods=["POST"])
915    @validate(UserRecoveryEmailSerializer)
916    def recovery_email(
917        self, request: Request, pk: int, body: UserRecoveryEmailSerializer
918    ) -> Response:
919        """Send an email with a temporary link that a user can use to recover their account"""
920        email_error_message = _("User does not have an email address set.")
921        stage_error_message = _("Email stage not found.")
922        user: User = self.get_object()
923        if not user.email:
924            LOGGER.debug("User doesn't have an email address")
925            raise ValidationError({"non_field_errors": email_error_message})
926        if not (stage := EmailStage.objects.filter(pk=body.validated_data["email_stage"]).first()):
927            LOGGER.debug("Email stage does not exist")
928            raise ValidationError({"non_field_errors": stage_error_message})
929        if not request.user.has_perm("authentik_stages_email.view_emailstage", stage):
930            LOGGER.debug("User has no view access to email stage")
931            raise ValidationError({"non_field_errors": stage_error_message})
932        link, token = self._create_recovery_link(
933            token_duration=body.validated_data.get("token_duration"), for_email=True
934        )
935        message = TemplateEmailMessage(
936            subject=_(stage.subject),
937            to=[(user.name, user.email)],
938            template_name=stage.template,
939            language=user.locale(request),
940            template_context={
941                "url": link,
942                "user": user,
943                "expires": token.expires,
944            },
945        )
946        send_mails(stage, message)
947        return Response(status=204)

Send an email with a temporary link that a user can use to recover their account

@permission_required('authentik_core.impersonate')
@extend_schema(request=inline_serializer('ImpersonationSerializer', {'reason': CharField(required=True)}), responses={204: OpenApiResponse(description='Successfully started impersonation')})
@action(detail=True, methods=['POST'], permission_classes=[IsAuthenticated])
def impersonate( self, request: rest_framework.request.Request, pk: int) -> rest_framework.response.Response:
949    @permission_required("authentik_core.impersonate")
950    @extend_schema(
951        request=inline_serializer(
952            "ImpersonationSerializer",
953            {
954                "reason": CharField(required=True),
955            },
956        ),
957        responses={
958            204: OpenApiResponse(description="Successfully started impersonation"),
959        },
960    )
961    @action(detail=True, methods=["POST"], permission_classes=[IsAuthenticated])
962    def impersonate(self, request: Request, pk: int) -> Response:
963        """Impersonate a user"""
964        if not request.tenant.impersonation:
965            LOGGER.debug("User attempted to impersonate", user=request.user)
966            return Response(status=401)
967        user_to_be = self.get_object()
968        reason = request.data.get("reason", "")
969        # Check both object-level perms and global perms
970        if not request.user.has_perm(
971            "authentik_core.impersonate", user_to_be
972        ) and not request.user.has_perm("authentik_core.impersonate"):
973            LOGGER.debug(
974                "User attempted to impersonate without permissions",
975                user=request.user,
976            )
977            return Response(status=403)
978        if user_to_be.pk == self.request.user.pk:
979            LOGGER.debug("User attempted to impersonate themselves", user=request.user)
980            return Response(status=401)
981        if not reason and request.tenant.impersonation_require_reason:
982            LOGGER.debug(
983                "User attempted to impersonate without providing a reason",
984                user=request.user,
985            )
986            raise ValidationError({"reason": _("This field is required.")})
987
988        request.session[SESSION_KEY_IMPERSONATE_ORIGINAL_USER] = request.user
989        request.session[SESSION_KEY_IMPERSONATE_USER] = user_to_be
990
991        Event.new(EventAction.IMPERSONATION_STARTED, reason=reason).from_http(request, user_to_be)
992
993        return Response(status=204)

Impersonate a user

@extend_schema(request=None, responses={'204': OpenApiResponse(description='Successfully ended impersonation')})
@action(detail=False, methods=['GET'])
def impersonate_end( self, request: rest_framework.request.Request) -> rest_framework.response.Response:
 995    @extend_schema(
 996        request=None,
 997        responses={
 998            "204": OpenApiResponse(description="Successfully ended impersonation"),
 999        },
1000    )
1001    @action(detail=False, methods=["GET"])
1002    def impersonate_end(self, request: Request) -> Response:
1003        """End Impersonation a user"""
1004        if (
1005            SESSION_KEY_IMPERSONATE_USER not in request.session
1006            or SESSION_KEY_IMPERSONATE_ORIGINAL_USER not in request.session
1007        ):
1008            LOGGER.debug("Can't end impersonation", user=request.user)
1009            return Response(status=204)
1010
1011        original_user = request.session[SESSION_KEY_IMPERSONATE_ORIGINAL_USER]
1012
1013        del request.session[SESSION_KEY_IMPERSONATE_USER]
1014        del request.session[SESSION_KEY_IMPERSONATE_ORIGINAL_USER]
1015
1016        Event.new(EventAction.IMPERSONATION_ENDED).from_http(request, original_user)
1017
1018        return Response(status=204)

End Impersonation a user

@extend_schema(responses={200: inline_serializer('UserPathSerializer', {'paths': ListField(child=CharField(), read_only=True)})}, parameters=[OpenApiParameter(name='search', location=OpenApiParameter.QUERY, type=OpenApiTypes.STR)])
@action(detail=False, pagination_class=None)
def paths( self, request: rest_framework.request.Request) -> rest_framework.response.Response:
1020    @extend_schema(
1021        responses={
1022            200: inline_serializer(
1023                "UserPathSerializer",
1024                {"paths": ListField(child=CharField(), read_only=True)},
1025            )
1026        },
1027        parameters=[
1028            OpenApiParameter(
1029                name="search",
1030                location=OpenApiParameter.QUERY,
1031                type=OpenApiTypes.STR,
1032            )
1033        ],
1034    )
1035    @action(detail=False, pagination_class=None)
1036    def paths(self, request: Request) -> Response:
1037        """Get all user paths"""
1038        return Response(
1039            data={
1040                "paths": list(
1041                    self.filter_queryset(self.get_queryset())
1042                    .values("path")
1043                    .distinct()
1044                    .order_by("path")
1045                    .values_list("path", flat=True)
1046                )
1047            }
1048        )

Get all user paths

def partial_update( self, request: rest_framework.request.Request, *args, **kwargs) -> rest_framework.response.Response:
1050    def partial_update(self, request: Request, *args, **kwargs) -> Response:
1051        response = super().partial_update(request, *args, **kwargs)
1052        instance: User = self.get_object()
1053        if not instance.is_active:
1054            Session.objects.filter(authenticatedsession__user=instance).delete()
1055            LOGGER.debug("Deleted user's sessions", user=instance.username)
1056        return response
name = None
description = None
suffix = None
detail = None
basename = None