authentik.core.api.users

User API Views

  1"""User API Views"""
  2
  3from datetime import timedelta
  4from json import loads
  5from typing import Any
  6
  7from django.contrib.auth import update_session_auth_hash
  8from django.contrib.auth.models import AnonymousUser, Permission
  9from django.db.transaction import atomic
 10from django.db.utils import IntegrityError
 11from django.urls import reverse_lazy
 12from django.utils.http import urlencode
 13from django.utils.text import slugify
 14from django.utils.timezone import now
 15from django.utils.translation import gettext as _
 16from django_filters.filters import (
 17    BooleanFilter,
 18    CharFilter,
 19    IsoDateTimeFilter,
 20    ModelMultipleChoiceFilter,
 21    MultipleChoiceFilter,
 22    UUIDFilter,
 23)
 24from django_filters.filterset import FilterSet
 25from drf_spectacular.types import OpenApiTypes
 26from drf_spectacular.utils import (
 27    OpenApiParameter,
 28    OpenApiResponse,
 29    extend_schema,
 30    extend_schema_field,
 31    inline_serializer,
 32)
 33from rest_framework.authentication import SessionAuthentication
 34from rest_framework.decorators import action
 35from rest_framework.exceptions import ValidationError
 36from rest_framework.fields import (
 37    BooleanField,
 38    CharField,
 39    ChoiceField,
 40    DateTimeField,
 41    IntegerField,
 42    ListField,
 43    SerializerMethodField,
 44    UUIDField,
 45)
 46from rest_framework.permissions import IsAuthenticated
 47from rest_framework.request import Request
 48from rest_framework.response import Response
 49from rest_framework.serializers import (
 50    ListSerializer,
 51    PrimaryKeyRelatedField,
 52)
 53from rest_framework.validators import UniqueValidator
 54from rest_framework.viewsets import ModelViewSet
 55from structlog.stdlib import get_logger
 56
 57from authentik.api.authentication import TokenAuthentication
 58from authentik.api.validation import validate
 59from authentik.blueprints.v1.importer import SERIALIZER_CONTEXT_BLUEPRINT
 60from authentik.brands.models import Brand
 61from authentik.core.api.used_by import UsedByMixin
 62from authentik.core.api.utils import (
 63    JSONDictField,
 64    LinkSerializer,
 65    ModelSerializer,
 66    PassiveSerializer,
 67)
 68from authentik.core.middleware import (
 69    SESSION_KEY_IMPERSONATE_ORIGINAL_USER,
 70    SESSION_KEY_IMPERSONATE_USER,
 71)
 72from authentik.core.models import (
 73    USER_ATTRIBUTE_TOKEN_EXPIRING,
 74    USER_PATH_SERVICE_ACCOUNT,
 75    USERNAME_MAX_LENGTH,
 76    Group,
 77    Session,
 78    Token,
 79    TokenIntents,
 80    User,
 81    UserTypes,
 82    default_token_duration,
 83)
 84from authentik.endpoints.connectors.agent.auth import AgentAuth
 85from authentik.events.models import Event, EventAction
 86from authentik.flows.exceptions import FlowNonApplicableException
 87from authentik.flows.models import FlowToken
 88from authentik.flows.planner import PLAN_CONTEXT_PENDING_USER, FlowPlanner
 89from authentik.flows.views.executor import QS_KEY_TOKEN
 90from authentik.lib.avatars import get_avatar
 91from authentik.lib.utils.reflection import ConditionalInheritance
 92from authentik.lib.utils.time import timedelta_from_string, timedelta_string_validator
 93from authentik.rbac.api.roles import RoleSerializer
 94from authentik.rbac.decorators import permission_required
 95from authentik.rbac.models import Role, get_permission_choices
 96from authentik.stages.email.flow import pickle_flow_token_for_email
 97from authentik.stages.email.models import EmailStage
 98from authentik.stages.email.tasks import send_mails
 99from authentik.stages.email.utils import TemplateEmailMessage
100
101LOGGER = get_logger()
102
103
104class ParamUserSerializer(PassiveSerializer):
105    """Partial serializer for query parameters to select a user"""
106
107    user = PrimaryKeyRelatedField(queryset=User.objects.all().exclude_anonymous(), required=False)
108
109
110class PartialGroupSerializer(ModelSerializer):
111    """Partial Group Serializer, does not include child relations."""
112
113    attributes = JSONDictField(required=False)
114
115    class Meta:
116        model = Group
117        fields = [
118            "pk",
119            "num_pk",
120            "name",
121            "is_superuser",
122            "attributes",
123        ]
124
125
126class UserSerializer(ModelSerializer):
127    """User Serializer"""
128
129    is_superuser = BooleanField(read_only=True)
130    avatar = SerializerMethodField()
131    attributes = JSONDictField(required=False)
132    groups = PrimaryKeyRelatedField(
133        allow_empty=True,
134        many=True,
135        queryset=Group.objects.all().order_by("name"),
136        default=list,
137    )
138    groups_obj = SerializerMethodField(allow_null=True)
139    roles = PrimaryKeyRelatedField(
140        allow_empty=True,
141        many=True,
142        queryset=Role.objects.all().order_by("name"),
143        default=list,
144    )
145    roles_obj = SerializerMethodField(allow_null=True)
146    uid = CharField(read_only=True)
147    username = CharField(
148        max_length=USERNAME_MAX_LENGTH,
149        validators=[UniqueValidator(queryset=User.objects.all().order_by("username"))],
150    )
151
152    @property
153    def _should_include_groups(self) -> bool:
154        request: Request = self.context.get("request", None)
155        if not request:
156            return True
157        return str(request.query_params.get("include_groups", "true")).lower() == "true"
158
159    @property
160    def _should_include_roles(self) -> bool:
161        request: Request = self.context.get("request", None)
162        if not request:
163            return True
164        return str(request.query_params.get("include_roles", "true")).lower() == "true"
165
166    @extend_schema_field(PartialGroupSerializer(many=True))
167    def get_groups_obj(self, instance: User) -> list[PartialGroupSerializer] | None:
168        if not self._should_include_groups:
169            return None
170        return PartialGroupSerializer(instance.groups, many=True).data
171
172    @extend_schema_field(RoleSerializer(many=True))
173    def get_roles_obj(self, instance: User) -> list[RoleSerializer] | None:
174        if not self._should_include_roles:
175            return None
176        return RoleSerializer(instance.roles, many=True).data
177
178    def __init__(self, *args, **kwargs):
179        super().__init__(*args, **kwargs)
180        if SERIALIZER_CONTEXT_BLUEPRINT in self.context:
181            self.fields["password"] = CharField(required=False, allow_null=True)
182            self.fields["permissions"] = ListField(
183                required=False,
184                child=ChoiceField(choices=get_permission_choices()),
185            )
186
187    def create(self, validated_data: dict) -> User:
188        """If this serializer is used in the blueprint context, we allow for
189        directly setting a password. However should be done via the `set_password`
190        method instead of directly setting it like rest_framework."""
191        password = validated_data.pop("password", None)
192        perms_qs = Permission.objects.filter(
193            codename__in=[x.split(".")[1] for x in validated_data.pop("permissions", [])]
194        ).values_list("content_type__app_label", "codename")
195        perms_list = [f"{ct}.{name}" for ct, name in list(perms_qs)]
196        instance: User = super().create(validated_data)
197        self._set_password(instance, password)
198        instance.assign_perms_to_managed_role(perms_list)
199        return instance
200
201    def update(self, instance: User, validated_data: dict) -> User:
202        """Same as `create` above, set the password directly if we're in a blueprint
203        context"""
204        password = validated_data.pop("password", None)
205        perms_qs = Permission.objects.filter(
206            codename__in=[x.split(".")[1] for x in validated_data.pop("permissions", [])]
207        ).values_list("content_type__app_label", "codename")
208        perms_list = [f"{ct}.{name}" for ct, name in list(perms_qs)]
209        instance = super().update(instance, validated_data)
210        self._set_password(instance, password)
211        instance.assign_perms_to_managed_role(perms_list)
212        return instance
213
214    def _set_password(self, instance: User, password: str | None):
215        """Set password of user if we're in a blueprint context, and if it's an empty
216        string then use an unusable password"""
217        if SERIALIZER_CONTEXT_BLUEPRINT in self.context and password:
218            instance.set_password(password)
219            instance.save()
220        if len(instance.password) == 0:
221            instance.set_unusable_password()
222            instance.save()
223
224    def get_avatar(self, user: User) -> str:
225        """User's avatar, either a http/https URL or a data URI"""
226        return get_avatar(user, self.context.get("request"))
227
228    def validate_path(self, path: str) -> str:
229        """Validate path"""
230        if path[:1] == "/" or path[-1] == "/":
231            raise ValidationError(_("No leading or trailing slashes allowed."))
232        for segment in path.split("/"):
233            if segment == "":
234                raise ValidationError(_("No empty segments in user path allowed."))
235        return path
236
237    def validate_type(self, user_type: str) -> str:
238        """Validate user type, internal_service_account is an internal value"""
239        if (
240            self.instance
241            and self.instance.type == UserTypes.INTERNAL_SERVICE_ACCOUNT
242            and user_type != UserTypes.INTERNAL_SERVICE_ACCOUNT.value
243        ):
244            raise ValidationError(_("Can't change internal service account to other user type."))
245        if not self.instance and user_type == UserTypes.INTERNAL_SERVICE_ACCOUNT.value:
246            raise ValidationError(_("Setting a user to internal service account is not allowed."))
247        return user_type
248
249    def validate(self, attrs: dict) -> dict:
250        if self.instance and self.instance.type == UserTypes.INTERNAL_SERVICE_ACCOUNT:
251            raise ValidationError(_("Can't modify internal service account users"))
252        return super().validate(attrs)
253
254    class Meta:
255        model = User
256        fields = [
257            "pk",
258            "username",
259            "name",
260            "is_active",
261            "last_login",
262            "date_joined",
263            "is_superuser",
264            "groups",
265            "groups_obj",
266            "roles",
267            "roles_obj",
268            "email",
269            "avatar",
270            "attributes",
271            "uid",
272            "path",
273            "type",
274            "uuid",
275            "password_change_date",
276            "last_updated",
277        ]
278        extra_kwargs = {
279            "name": {"allow_blank": True},
280            "date_joined": {"read_only": True},
281            "password_change_date": {"read_only": True},
282        }
283
284
285class UserSelfSerializer(ModelSerializer):
286    """User Serializer for information a user can retrieve about themselves"""
287
288    is_superuser = BooleanField(read_only=True)
289    avatar = SerializerMethodField()
290    groups = SerializerMethodField()
291    roles = SerializerMethodField()
292    uid = CharField(read_only=True)
293    settings = SerializerMethodField()
294    system_permissions = SerializerMethodField()
295
296    def get_avatar(self, user: User) -> str:
297        """User's avatar, either a http/https URL or a data URI"""
298        return get_avatar(user, self.context.get("request"))
299
300    @extend_schema_field(
301        ListSerializer(
302            child=inline_serializer(
303                "UserSelfGroups",
304                {
305                    "name": CharField(read_only=True),
306                    "pk": CharField(read_only=True),
307                },
308            )
309        )
310    )
311    def get_groups(self, _: User):
312        """Return only the group names a user is member of"""
313        for group in self.instance.all_groups().order_by("name"):
314            yield {
315                "name": group.name,
316                "pk": group.pk,
317            }
318
319    @extend_schema_field(
320        ListSerializer(
321            child=inline_serializer(
322                "UserSelfRoles",
323                {
324                    "name": CharField(read_only=True),
325                    "pk": CharField(read_only=True),
326                },
327            )
328        )
329    )
330    def get_roles(self, _: User):
331        """Return only the roles a user is member of"""
332        for role in self.instance.all_roles().order_by("name"):
333            yield {
334                "name": role.name,
335                "pk": role.pk,
336            }
337
338    def get_settings(self, user: User) -> dict[str, Any]:
339        """Get user settings with brand and group settings applied"""
340        return user.group_attributes(self._context["request"]).get("settings", {})
341
342    def get_system_permissions(self, user: User) -> list[str]:
343        """Get all system permissions assigned to the user"""
344        return list(
345            x.split(".", maxsplit=1)[1]
346            for x in user.get_all_permissions()
347            if x.startswith("authentik_rbac")
348        )
349
350    class Meta:
351        model = User
352        fields = [
353            "pk",
354            "username",
355            "name",
356            "is_active",
357            "is_superuser",
358            "groups",
359            "roles",
360            "email",
361            "avatar",
362            "uid",
363            "settings",
364            "type",
365            "system_permissions",
366        ]
367        extra_kwargs = {
368            "is_active": {"read_only": True},
369            "name": {"allow_blank": True},
370        }
371
372
373class SessionUserSerializer(PassiveSerializer):
374    """Response for the /user/me endpoint, returns the currently active user (as `user` property)
375    and, if this user is being impersonated, the original user in the `original` property.
376    """
377
378    user = UserSelfSerializer()
379    original = UserSelfSerializer(required=False)
380
381
382class UserPasswordSetSerializer(PassiveSerializer):
383    """Payload to set a users' password directly"""
384
385    password = CharField(required=True)
386
387
388class UserServiceAccountSerializer(PassiveSerializer):
389    """Payload to create a service account"""
390
391    name = CharField(
392        required=True,
393        validators=[UniqueValidator(queryset=User.objects.all().order_by("username"))],
394    )
395    create_group = BooleanField(default=False)
396    expiring = BooleanField(default=True)
397    expires = DateTimeField(
398        required=False,
399        help_text="If not provided, valid for 360 days",
400    )
401
402
403class UserRecoveryLinkSerializer(PassiveSerializer):
404    """Payload to create a recovery link"""
405
406    token_duration = CharField(required=False)
407
408
409class UserRecoveryEmailSerializer(UserRecoveryLinkSerializer):
410    """Payload to create and email a recovery link"""
411
412    email_stage = UUIDField()
413
414
415class UsersFilter(FilterSet):
416    """Filter for users"""
417
418    attributes = CharFilter(
419        field_name="attributes",
420        lookup_expr="",
421        label="Attributes",
422        method="filter_attributes",
423    )
424
425    date_joined__lt = IsoDateTimeFilter(field_name="date_joined", lookup_expr="lt")
426    date_joined = IsoDateTimeFilter(field_name="date_joined")
427    date_joined__gt = IsoDateTimeFilter(field_name="date_joined", lookup_expr="gt")
428
429    last_updated__lt = IsoDateTimeFilter(field_name="last_updated", lookup_expr="lt")
430    last_updated = IsoDateTimeFilter(field_name="last_updated")
431    last_updated__gt = IsoDateTimeFilter(field_name="last_updated", lookup_expr="gt")
432
433    last_login__lt = IsoDateTimeFilter(field_name="last_login", lookup_expr="lt")
434    last_login = IsoDateTimeFilter(field_name="last_login")
435    last_login__gt = IsoDateTimeFilter(field_name="last_login", lookup_expr="gt")
436    last_login__isnull = BooleanFilter(field_name="last_login", lookup_expr="isnull")
437
438    is_superuser = BooleanFilter(field_name="groups", method="filter_is_superuser")
439    uuid = UUIDFilter(field_name="uuid")
440
441    path = CharFilter(field_name="path")
442    path_startswith = CharFilter(field_name="path", lookup_expr="startswith")
443
444    type = MultipleChoiceFilter(choices=UserTypes.choices, field_name="type")
445
446    groups_by_name = ModelMultipleChoiceFilter(
447        field_name="groups__name",
448        to_field_name="name",
449        queryset=Group.objects.all().order_by("name"),
450    )
451    groups_by_pk = ModelMultipleChoiceFilter(
452        field_name="groups",
453        queryset=Group.objects.all().order_by("name"),
454    )
455
456    roles_by_name = ModelMultipleChoiceFilter(
457        field_name="roles__name",
458        to_field_name="name",
459        queryset=Role.objects.all().order_by("name"),
460    )
461    roles_by_pk = ModelMultipleChoiceFilter(
462        field_name="roles",
463        queryset=Role.objects.all().order_by("name"),
464    )
465
466    def filter_is_superuser(self, queryset, name, value):
467        if value:
468            return queryset.filter(groups__is_superuser=True).distinct()
469        return queryset.exclude(groups__is_superuser=True).distinct()
470
471    def filter_attributes(self, queryset, name, value):
472        """Filter attributes by query args"""
473        try:
474            value = loads(value)
475        except ValueError:
476            raise ValidationError(_("filter: failed to parse JSON")) from None
477        if not isinstance(value, dict):
478            raise ValidationError(_("filter: value must be key:value mapping"))
479        qs = {}
480        for key, _value in value.items():
481            qs[f"attributes__{key}"] = _value
482        try:
483            __ = len(queryset.filter(**qs))
484            return queryset.filter(**qs)
485        except ValueError:
486            return queryset
487
488    class Meta:
489        model = User
490        fields = [
491            "username",
492            "email",
493            "date_joined",
494            "last_updated",
495            "last_login",
496            "name",
497            "is_active",
498            "is_superuser",
499            "attributes",
500            "groups_by_name",
501            "groups_by_pk",
502            "roles_by_name",
503            "roles_by_pk",
504            "type",
505        ]
506
507
508class UserViewSet(
509    ConditionalInheritance("authentik.enterprise.reports.api.reports.ExportMixin"),
510    UsedByMixin,
511    ModelViewSet,
512):
513    """User Viewset"""
514
515    queryset = User.objects.none()
516    ordering = ["username", "date_joined", "last_updated", "last_login"]
517    serializer_class = UserSerializer
518    filterset_class = UsersFilter
519    search_fields = ["email", "name", "uuid", "username"]
520    authentication_classes = [
521        TokenAuthentication,
522        SessionAuthentication,
523        AgentAuth,
524    ]
525
526    def get_ql_fields(self):
527        from djangoql.schema import BoolField, StrField
528
529        from authentik.enterprise.search.fields import (
530            ChoiceSearchField,
531            JSONSearchField,
532        )
533
534        return [
535            StrField(User, "username"),
536            StrField(User, "name"),
537            StrField(User, "email"),
538            StrField(User, "path"),
539            BoolField(User, "is_active", nullable=True),
540            ChoiceSearchField(User, "type"),
541            JSONSearchField(User, "attributes"),
542        ]
543
544    def get_queryset(self):
545        base_qs = User.objects.all().exclude_anonymous()
546        if self.serializer_class(context={"request": self.request})._should_include_groups:
547            base_qs = base_qs.prefetch_related("groups")
548        if self.serializer_class(context={"request": self.request})._should_include_roles:
549            base_qs = base_qs.prefetch_related("roles")
550        return base_qs
551
552    @extend_schema(
553        parameters=[
554            OpenApiParameter("include_groups", bool, default=True),
555            OpenApiParameter("include_roles", bool, default=True),
556        ]
557    )
558    def list(self, request, *args, **kwargs):
559        return super().list(request, *args, **kwargs)
560
561    def _create_recovery_link(
562        self, token_duration: str | None, for_email=False
563    ) -> tuple[str, Token]:
564        """Create a recovery link (when the current brand has a recovery flow set),
565        that can either be shown to an admin or sent to the user directly"""
566        brand: Brand = self.request.brand
567        # Check that there is a recovery flow, if not return an error
568        flow = brand.flow_recovery
569        if not flow:
570            raise ValidationError({"non_field_errors": _("No recovery flow set.")})
571        user: User = self.get_object()
572        planner = FlowPlanner(flow)
573        planner.allow_empty_flows = True
574        self.request._request.user = AnonymousUser()
575        try:
576            plan = planner.plan(
577                self.request._request,
578                {
579                    PLAN_CONTEXT_PENDING_USER: user,
580                },
581            )
582        except FlowNonApplicableException:
583            raise ValidationError(
584                {"non_field_errors": _("Recovery flow not applicable to user")}
585            ) from None
586        _plan = FlowToken.pickle(plan)
587        if for_email:
588            _plan = pickle_flow_token_for_email(plan)
589        expires = default_token_duration()
590        if token_duration:
591            timedelta_string_validator(token_duration)
592            expires = now() + timedelta_from_string(token_duration)
593        token, __ = FlowToken.objects.update_or_create(
594            identifier=f"{user.uid}-password-reset",
595            defaults={
596                "user": user,
597                "flow": flow,
598                "_plan": _plan,
599                "revoke_on_execution": not for_email,
600                "expires": expires,
601            },
602        )
603        querystring = urlencode({QS_KEY_TOKEN: token.key})
604        link = self.request.build_absolute_uri(
605            reverse_lazy("authentik_core:if-flow", kwargs={"flow_slug": flow.slug})
606            + f"?{querystring}"
607        )
608        return link, token
609
610    @permission_required(None, ["authentik_core.add_user", "authentik_core.add_token"])
611    @extend_schema(
612        request=UserServiceAccountSerializer,
613        responses={
614            200: inline_serializer(
615                "UserServiceAccountResponse",
616                {
617                    "username": CharField(required=True),
618                    "token": CharField(required=True),
619                    "user_uid": CharField(required=True),
620                    "user_pk": IntegerField(required=True),
621                    "group_pk": CharField(required=False),
622                },
623            )
624        },
625    )
626    @action(
627        detail=False,
628        methods=["POST"],
629        pagination_class=None,
630        filter_backends=[],
631    )
632    @validate(UserServiceAccountSerializer)
633    def service_account(self, request: Request, body: UserServiceAccountSerializer) -> Response:
634        """Create a new user account that is marked as a service account"""
635        expires = body.validated_data.get("expires", now() + timedelta(days=360))
636
637        username = body.validated_data["name"]
638        expiring = body.validated_data["expiring"]
639        with atomic():
640            try:
641                user: User = User.objects.create(
642                    username=username,
643                    name=username,
644                    type=UserTypes.SERVICE_ACCOUNT,
645                    attributes={USER_ATTRIBUTE_TOKEN_EXPIRING: expiring},
646                    path=USER_PATH_SERVICE_ACCOUNT,
647                )
648                user.set_unusable_password()
649                user.save()
650
651                response = {
652                    "username": user.username,
653                    "user_uid": user.uid,
654                    "user_pk": user.pk,
655                }
656                if body.validated_data["create_group"] and self.request.user.has_perm(
657                    "authentik_core.add_group"
658                ):
659                    group = Group.objects.create(name=username)
660                    group.users.add(user)
661                    response["group_pk"] = str(group.pk)
662                token = Token.objects.create(
663                    identifier=slugify(f"service-account-{username}-password"),
664                    intent=TokenIntents.INTENT_APP_PASSWORD,
665                    user=user,
666                    expires=expires,
667                    expiring=expiring,
668                )
669                response["token"] = token.key
670                return Response(response)
671            except IntegrityError as exc:
672                error_msg = str(exc).lower()
673
674                if "unique" in error_msg:
675                    return Response(
676                        data={
677                            "non_field_errors": [
678                                _("A user/group with these details already exists")
679                            ]
680                        },
681                        status=400,
682                    )
683                else:
684                    LOGGER.warning("Service account creation failed", exc=exc)
685                    return Response(
686                        data={"non_field_errors": [_("Unable to create user")]},
687                        status=400,
688                    )
689            except (ValueError, TypeError) as exc:
690                LOGGER.error("Unexpected error during service account creation", exc=exc)
691                return Response(
692                    data={"non_field_errors": [_("Unknown error occurred")]},
693                    status=500,
694                )
695
696    @extend_schema(responses={200: SessionUserSerializer(many=False)})
697    @action(
698        url_path="me",
699        url_name="me",
700        detail=False,
701        pagination_class=None,
702        filter_backends=[],
703    )
704    def user_me(self, request: Request) -> Response:
705        """Get information about current user"""
706        context = {"request": request}
707        serializer = SessionUserSerializer(
708            data={"user": UserSelfSerializer(instance=request.user, context=context).data}
709        )
710        if SESSION_KEY_IMPERSONATE_USER in request._request.session:
711            serializer.initial_data["original"] = UserSelfSerializer(
712                instance=request._request.session[SESSION_KEY_IMPERSONATE_ORIGINAL_USER],
713                context=context,
714            ).data
715        self.request.session.modified = True
716        return Response(serializer.initial_data)
717
718    @permission_required("authentik_core.reset_user_password")
719    @extend_schema(
720        request=UserPasswordSetSerializer,
721        responses={
722            204: OpenApiResponse(description="Successfully changed password"),
723            400: OpenApiResponse(description="Bad request"),
724        },
725    )
726    @action(
727        detail=True,
728        methods=["POST"],
729        permission_classes=[IsAuthenticated],
730    )
731    @validate(UserPasswordSetSerializer)
732    def set_password(self, request: Request, pk: int, body: UserPasswordSetSerializer) -> Response:
733        """Set password for user"""
734        user: User = self.get_object()
735        try:
736            user.set_password(body.validated_data["password"], request=request)
737            user.save()
738        except (ValidationError, IntegrityError) as exc:
739            LOGGER.debug("Failed to set password", exc=exc)
740            return Response(status=400)
741        if user.pk == request.user.pk and SESSION_KEY_IMPERSONATE_USER not in self.request.session:
742            LOGGER.debug("Updating session hash after password change")
743            update_session_auth_hash(self.request, user)
744        return Response(status=204)
745
746    @permission_required("authentik_core.reset_user_password")
747    @extend_schema(
748        request=UserRecoveryLinkSerializer,
749        responses={
750            "200": LinkSerializer(many=False),
751        },
752    )
753    @action(detail=True, pagination_class=None, filter_backends=[], methods=["POST"])
754    @validate(UserRecoveryLinkSerializer)
755    def recovery(self, request: Request, pk: int, body: UserRecoveryLinkSerializer) -> Response:
756        """Create a temporary link that a user can use to recover their account"""
757        link, _ = self._create_recovery_link(
758            token_duration=body.validated_data.get("token_duration")
759        )
760        return Response({"link": link})
761
762    @permission_required("authentik_core.reset_user_password")
763    @extend_schema(
764        request=UserRecoveryEmailSerializer,
765        responses={
766            "204": OpenApiResponse(description="Successfully sent recover email"),
767        },
768    )
769    @action(detail=True, pagination_class=None, filter_backends=[], methods=["POST"])
770    @validate(UserRecoveryEmailSerializer)
771    def recovery_email(
772        self, request: Request, pk: int, body: UserRecoveryEmailSerializer
773    ) -> Response:
774        """Send an email with a temporary link that a user can use to recover their account"""
775        email_error_message = _("User does not have an email address set.")
776        stage_error_message = _("Email stage not found.")
777        user: User = self.get_object()
778        if not user.email:
779            LOGGER.debug("User doesn't have an email address")
780            raise ValidationError({"non_field_errors": email_error_message})
781        if not (stage := EmailStage.objects.filter(pk=body.validated_data["email_stage"]).first()):
782            LOGGER.debug("Email stage does not exist")
783            raise ValidationError({"non_field_errors": stage_error_message})
784        if not request.user.has_perm("authentik_stages_email.view_emailstage", stage):
785            LOGGER.debug("User has no view access to email stage")
786            raise ValidationError({"non_field_errors": stage_error_message})
787        link, token = self._create_recovery_link(
788            token_duration=body.validated_data.get("token_duration"), for_email=True
789        )
790        message = TemplateEmailMessage(
791            subject=_(stage.subject),
792            to=[(user.name, user.email)],
793            template_name=stage.template,
794            language=user.locale(request),
795            template_context={
796                "url": link,
797                "user": user,
798                "expires": token.expires,
799            },
800        )
801        send_mails(stage, message)
802        return Response(status=204)
803
804    @permission_required("authentik_core.impersonate")
805    @extend_schema(
806        request=inline_serializer(
807            "ImpersonationSerializer",
808            {
809                "reason": CharField(required=True),
810            },
811        ),
812        responses={
813            204: OpenApiResponse(description="Successfully started impersonation"),
814        },
815    )
816    @action(detail=True, methods=["POST"], permission_classes=[IsAuthenticated])
817    def impersonate(self, request: Request, pk: int) -> Response:
818        """Impersonate a user"""
819        if not request.tenant.impersonation:
820            LOGGER.debug("User attempted to impersonate", user=request.user)
821            return Response(status=401)
822        user_to_be = self.get_object()
823        reason = request.data.get("reason", "")
824        # Check both object-level perms and global perms
825        if not request.user.has_perm(
826            "authentik_core.impersonate", user_to_be
827        ) and not request.user.has_perm("authentik_core.impersonate"):
828            LOGGER.debug(
829                "User attempted to impersonate without permissions",
830                user=request.user,
831            )
832            return Response(status=403)
833        if user_to_be.pk == self.request.user.pk:
834            LOGGER.debug("User attempted to impersonate themselves", user=request.user)
835            return Response(status=401)
836        if not reason and request.tenant.impersonation_require_reason:
837            LOGGER.debug(
838                "User attempted to impersonate without providing a reason",
839                user=request.user,
840            )
841            raise ValidationError({"reason": _("This field is required.")})
842
843        request.session[SESSION_KEY_IMPERSONATE_ORIGINAL_USER] = request.user
844        request.session[SESSION_KEY_IMPERSONATE_USER] = user_to_be
845
846        Event.new(EventAction.IMPERSONATION_STARTED, reason=reason).from_http(request, user_to_be)
847
848        return Response(status=204)
849
850    @extend_schema(
851        request=None,
852        responses={
853            "204": OpenApiResponse(description="Successfully ended impersonation"),
854        },
855    )
856    @action(detail=False, methods=["GET"])
857    def impersonate_end(self, request: Request) -> Response:
858        """End Impersonation a user"""
859        if (
860            SESSION_KEY_IMPERSONATE_USER not in request.session
861            or SESSION_KEY_IMPERSONATE_ORIGINAL_USER not in request.session
862        ):
863            LOGGER.debug("Can't end impersonation", user=request.user)
864            return Response(status=204)
865
866        original_user = request.session[SESSION_KEY_IMPERSONATE_ORIGINAL_USER]
867
868        del request.session[SESSION_KEY_IMPERSONATE_USER]
869        del request.session[SESSION_KEY_IMPERSONATE_ORIGINAL_USER]
870
871        Event.new(EventAction.IMPERSONATION_ENDED).from_http(request, original_user)
872
873        return Response(status=204)
874
875    @extend_schema(
876        responses={
877            200: inline_serializer(
878                "UserPathSerializer",
879                {"paths": ListField(child=CharField(), read_only=True)},
880            )
881        },
882        parameters=[
883            OpenApiParameter(
884                name="search",
885                location=OpenApiParameter.QUERY,
886                type=OpenApiTypes.STR,
887            )
888        ],
889    )
890    @action(detail=False, pagination_class=None)
891    def paths(self, request: Request) -> Response:
892        """Get all user paths"""
893        return Response(
894            data={
895                "paths": list(
896                    self.filter_queryset(self.get_queryset())
897                    .values("path")
898                    .distinct()
899                    .order_by("path")
900                    .values_list("path", flat=True)
901                )
902            }
903        )
904
905    def partial_update(self, request: Request, *args, **kwargs) -> Response:
906        response = super().partial_update(request, *args, **kwargs)
907        instance: User = self.get_object()
908        if not instance.is_active:
909            Session.objects.filter(authenticatedsession__user=instance).delete()
910            LOGGER.debug("Deleted user's sessions", user=instance.username)
911        return response
LOGGER = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
class ParamUserSerializer(authentik.core.api.utils.PassiveSerializer):
105class ParamUserSerializer(PassiveSerializer):
106    """Partial serializer for query parameters to select a user"""
107
108    user = PrimaryKeyRelatedField(queryset=User.objects.all().exclude_anonymous(), required=False)

Partial serializer for query parameters to select a user

user
class PartialGroupSerializer(authentik.core.api.utils.ModelSerializer):
111class PartialGroupSerializer(ModelSerializer):
112    """Partial Group Serializer, does not include child relations."""
113
114    attributes = JSONDictField(required=False)
115
116    class Meta:
117        model = Group
118        fields = [
119            "pk",
120            "num_pk",
121            "name",
122            "is_superuser",
123            "attributes",
124        ]

Partial Group Serializer, does not include child relations.

attributes
class PartialGroupSerializer.Meta:
116    class Meta:
117        model = Group
118        fields = [
119            "pk",
120            "num_pk",
121            "name",
122            "is_superuser",
123            "attributes",
124        ]
model = <class 'authentik.core.models.Group'>
fields = ['pk', 'num_pk', 'name', 'is_superuser', 'attributes']
class UserSerializer(authentik.core.api.utils.ModelSerializer):
127class UserSerializer(ModelSerializer):
128    """User Serializer"""
129
130    is_superuser = BooleanField(read_only=True)
131    avatar = SerializerMethodField()
132    attributes = JSONDictField(required=False)
133    groups = PrimaryKeyRelatedField(
134        allow_empty=True,
135        many=True,
136        queryset=Group.objects.all().order_by("name"),
137        default=list,
138    )
139    groups_obj = SerializerMethodField(allow_null=True)
140    roles = PrimaryKeyRelatedField(
141        allow_empty=True,
142        many=True,
143        queryset=Role.objects.all().order_by("name"),
144        default=list,
145    )
146    roles_obj = SerializerMethodField(allow_null=True)
147    uid = CharField(read_only=True)
148    username = CharField(
149        max_length=USERNAME_MAX_LENGTH,
150        validators=[UniqueValidator(queryset=User.objects.all().order_by("username"))],
151    )
152
153    @property
154    def _should_include_groups(self) -> bool:
155        request: Request = self.context.get("request", None)
156        if not request:
157            return True
158        return str(request.query_params.get("include_groups", "true")).lower() == "true"
159
160    @property
161    def _should_include_roles(self) -> bool:
162        request: Request = self.context.get("request", None)
163        if not request:
164            return True
165        return str(request.query_params.get("include_roles", "true")).lower() == "true"
166
167    @extend_schema_field(PartialGroupSerializer(many=True))
168    def get_groups_obj(self, instance: User) -> list[PartialGroupSerializer] | None:
169        if not self._should_include_groups:
170            return None
171        return PartialGroupSerializer(instance.groups, many=True).data
172
173    @extend_schema_field(RoleSerializer(many=True))
174    def get_roles_obj(self, instance: User) -> list[RoleSerializer] | None:
175        if not self._should_include_roles:
176            return None
177        return RoleSerializer(instance.roles, many=True).data
178
179    def __init__(self, *args, **kwargs):
180        super().__init__(*args, **kwargs)
181        if SERIALIZER_CONTEXT_BLUEPRINT in self.context:
182            self.fields["password"] = CharField(required=False, allow_null=True)
183            self.fields["permissions"] = ListField(
184                required=False,
185                child=ChoiceField(choices=get_permission_choices()),
186            )
187
188    def create(self, validated_data: dict) -> User:
189        """If this serializer is used in the blueprint context, we allow for
190        directly setting a password. However should be done via the `set_password`
191        method instead of directly setting it like rest_framework."""
192        password = validated_data.pop("password", None)
193        perms_qs = Permission.objects.filter(
194            codename__in=[x.split(".")[1] for x in validated_data.pop("permissions", [])]
195        ).values_list("content_type__app_label", "codename")
196        perms_list = [f"{ct}.{name}" for ct, name in list(perms_qs)]
197        instance: User = super().create(validated_data)
198        self._set_password(instance, password)
199        instance.assign_perms_to_managed_role(perms_list)
200        return instance
201
202    def update(self, instance: User, validated_data: dict) -> User:
203        """Same as `create` above, set the password directly if we're in a blueprint
204        context"""
205        password = validated_data.pop("password", None)
206        perms_qs = Permission.objects.filter(
207            codename__in=[x.split(".")[1] for x in validated_data.pop("permissions", [])]
208        ).values_list("content_type__app_label", "codename")
209        perms_list = [f"{ct}.{name}" for ct, name in list(perms_qs)]
210        instance = super().update(instance, validated_data)
211        self._set_password(instance, password)
212        instance.assign_perms_to_managed_role(perms_list)
213        return instance
214
215    def _set_password(self, instance: User, password: str | None):
216        """Set password of user if we're in a blueprint context, and if it's an empty
217        string then use an unusable password"""
218        if SERIALIZER_CONTEXT_BLUEPRINT in self.context and password:
219            instance.set_password(password)
220            instance.save()
221        if len(instance.password) == 0:
222            instance.set_unusable_password()
223            instance.save()
224
225    def get_avatar(self, user: User) -> str:
226        """User's avatar, either a http/https URL or a data URI"""
227        return get_avatar(user, self.context.get("request"))
228
229    def validate_path(self, path: str) -> str:
230        """Validate path"""
231        if path[:1] == "/" or path[-1] == "/":
232            raise ValidationError(_("No leading or trailing slashes allowed."))
233        for segment in path.split("/"):
234            if segment == "":
235                raise ValidationError(_("No empty segments in user path allowed."))
236        return path
237
238    def validate_type(self, user_type: str) -> str:
239        """Validate user type, internal_service_account is an internal value"""
240        if (
241            self.instance
242            and self.instance.type == UserTypes.INTERNAL_SERVICE_ACCOUNT
243            and user_type != UserTypes.INTERNAL_SERVICE_ACCOUNT.value
244        ):
245            raise ValidationError(_("Can't change internal service account to other user type."))
246        if not self.instance and user_type == UserTypes.INTERNAL_SERVICE_ACCOUNT.value:
247            raise ValidationError(_("Setting a user to internal service account is not allowed."))
248        return user_type
249
250    def validate(self, attrs: dict) -> dict:
251        if self.instance and self.instance.type == UserTypes.INTERNAL_SERVICE_ACCOUNT:
252            raise ValidationError(_("Can't modify internal service account users"))
253        return super().validate(attrs)
254
255    class Meta:
256        model = User
257        fields = [
258            "pk",
259            "username",
260            "name",
261            "is_active",
262            "last_login",
263            "date_joined",
264            "is_superuser",
265            "groups",
266            "groups_obj",
267            "roles",
268            "roles_obj",
269            "email",
270            "avatar",
271            "attributes",
272            "uid",
273            "path",
274            "type",
275            "uuid",
276            "password_change_date",
277            "last_updated",
278        ]
279        extra_kwargs = {
280            "name": {"allow_blank": True},
281            "date_joined": {"read_only": True},
282            "password_change_date": {"read_only": True},
283        }

User Serializer

UserSerializer(*args, **kwargs)
179    def __init__(self, *args, **kwargs):
180        super().__init__(*args, **kwargs)
181        if SERIALIZER_CONTEXT_BLUEPRINT in self.context:
182            self.fields["password"] = CharField(required=False, allow_null=True)
183            self.fields["permissions"] = ListField(
184                required=False,
185                child=ChoiceField(choices=get_permission_choices()),
186            )
is_superuser
avatar
attributes
groups
groups_obj
roles
roles_obj
uid
username
@extend_schema_field(PartialGroupSerializer(many=True))
def get_groups_obj( self, instance: authentik.core.models.User) -> list[PartialGroupSerializer] | None:
167    @extend_schema_field(PartialGroupSerializer(many=True))
168    def get_groups_obj(self, instance: User) -> list[PartialGroupSerializer] | None:
169        if not self._should_include_groups:
170            return None
171        return PartialGroupSerializer(instance.groups, many=True).data
@extend_schema_field(RoleSerializer(many=True))
def get_roles_obj( self, instance: authentik.core.models.User) -> list[authentik.rbac.api.roles.RoleSerializer] | None:
173    @extend_schema_field(RoleSerializer(many=True))
174    def get_roles_obj(self, instance: User) -> list[RoleSerializer] | None:
175        if not self._should_include_roles:
176            return None
177        return RoleSerializer(instance.roles, many=True).data
def create(self, validated_data: dict) -> authentik.core.models.User:
188    def create(self, validated_data: dict) -> User:
189        """If this serializer is used in the blueprint context, we allow for
190        directly setting a password. However should be done via the `set_password`
191        method instead of directly setting it like rest_framework."""
192        password = validated_data.pop("password", None)
193        perms_qs = Permission.objects.filter(
194            codename__in=[x.split(".")[1] for x in validated_data.pop("permissions", [])]
195        ).values_list("content_type__app_label", "codename")
196        perms_list = [f"{ct}.{name}" for ct, name in list(perms_qs)]
197        instance: User = super().create(validated_data)
198        self._set_password(instance, password)
199        instance.assign_perms_to_managed_role(perms_list)
200        return instance

If this serializer is used in the blueprint context, we allow for directly setting a password. However should be done via the set_password method instead of directly setting it like rest_framework.

def update( self, instance: authentik.core.models.User, validated_data: dict) -> authentik.core.models.User:
202    def update(self, instance: User, validated_data: dict) -> User:
203        """Same as `create` above, set the password directly if we're in a blueprint
204        context"""
205        password = validated_data.pop("password", None)
206        perms_qs = Permission.objects.filter(
207            codename__in=[x.split(".")[1] for x in validated_data.pop("permissions", [])]
208        ).values_list("content_type__app_label", "codename")
209        perms_list = [f"{ct}.{name}" for ct, name in list(perms_qs)]
210        instance = super().update(instance, validated_data)
211        self._set_password(instance, password)
212        instance.assign_perms_to_managed_role(perms_list)
213        return instance

Same as create above, set the password directly if we're in a blueprint context

def get_avatar(self, user: authentik.core.models.User) -> str:
225    def get_avatar(self, user: User) -> str:
226        """User's avatar, either a http/https URL or a data URI"""
227        return get_avatar(user, self.context.get("request"))

User's avatar, either a http/https URL or a data URI

def validate_path(self, path: str) -> str:
229    def validate_path(self, path: str) -> str:
230        """Validate path"""
231        if path[:1] == "/" or path[-1] == "/":
232            raise ValidationError(_("No leading or trailing slashes allowed."))
233        for segment in path.split("/"):
234            if segment == "":
235                raise ValidationError(_("No empty segments in user path allowed."))
236        return path

Validate path

def validate_type(self, user_type: str) -> str:
238    def validate_type(self, user_type: str) -> str:
239        """Validate user type, internal_service_account is an internal value"""
240        if (
241            self.instance
242            and self.instance.type == UserTypes.INTERNAL_SERVICE_ACCOUNT
243            and user_type != UserTypes.INTERNAL_SERVICE_ACCOUNT.value
244        ):
245            raise ValidationError(_("Can't change internal service account to other user type."))
246        if not self.instance and user_type == UserTypes.INTERNAL_SERVICE_ACCOUNT.value:
247            raise ValidationError(_("Setting a user to internal service account is not allowed."))
248        return user_type

Validate user type, internal_service_account is an internal value

def validate(self, attrs: dict) -> dict:
250    def validate(self, attrs: dict) -> dict:
251        if self.instance and self.instance.type == UserTypes.INTERNAL_SERVICE_ACCOUNT:
252            raise ValidationError(_("Can't modify internal service account users"))
253        return super().validate(attrs)
class UserSerializer.Meta:
255    class Meta:
256        model = User
257        fields = [
258            "pk",
259            "username",
260            "name",
261            "is_active",
262            "last_login",
263            "date_joined",
264            "is_superuser",
265            "groups",
266            "groups_obj",
267            "roles",
268            "roles_obj",
269            "email",
270            "avatar",
271            "attributes",
272            "uid",
273            "path",
274            "type",
275            "uuid",
276            "password_change_date",
277            "last_updated",
278        ]
279        extra_kwargs = {
280            "name": {"allow_blank": True},
281            "date_joined": {"read_only": True},
282            "password_change_date": {"read_only": True},
283        }
model = <class 'authentik.core.models.User'>
fields = ['pk', 'username', 'name', 'is_active', 'last_login', 'date_joined', 'is_superuser', 'groups', 'groups_obj', 'roles', 'roles_obj', 'email', 'avatar', 'attributes', 'uid', 'path', 'type', 'uuid', 'password_change_date', 'last_updated']
extra_kwargs = {'name': {'allow_blank': True}, 'date_joined': {'read_only': True}, 'password_change_date': {'read_only': True}}
class UserSelfSerializer(authentik.core.api.utils.ModelSerializer):
286class UserSelfSerializer(ModelSerializer):
287    """User Serializer for information a user can retrieve about themselves"""
288
289    is_superuser = BooleanField(read_only=True)
290    avatar = SerializerMethodField()
291    groups = SerializerMethodField()
292    roles = SerializerMethodField()
293    uid = CharField(read_only=True)
294    settings = SerializerMethodField()
295    system_permissions = SerializerMethodField()
296
297    def get_avatar(self, user: User) -> str:
298        """User's avatar, either a http/https URL or a data URI"""
299        return get_avatar(user, self.context.get("request"))
300
301    @extend_schema_field(
302        ListSerializer(
303            child=inline_serializer(
304                "UserSelfGroups",
305                {
306                    "name": CharField(read_only=True),
307                    "pk": CharField(read_only=True),
308                },
309            )
310        )
311    )
312    def get_groups(self, _: User):
313        """Return only the group names a user is member of"""
314        for group in self.instance.all_groups().order_by("name"):
315            yield {
316                "name": group.name,
317                "pk": group.pk,
318            }
319
320    @extend_schema_field(
321        ListSerializer(
322            child=inline_serializer(
323                "UserSelfRoles",
324                {
325                    "name": CharField(read_only=True),
326                    "pk": CharField(read_only=True),
327                },
328            )
329        )
330    )
331    def get_roles(self, _: User):
332        """Return only the roles a user is member of"""
333        for role in self.instance.all_roles().order_by("name"):
334            yield {
335                "name": role.name,
336                "pk": role.pk,
337            }
338
339    def get_settings(self, user: User) -> dict[str, Any]:
340        """Get user settings with brand and group settings applied"""
341        return user.group_attributes(self._context["request"]).get("settings", {})
342
343    def get_system_permissions(self, user: User) -> list[str]:
344        """Get all system permissions assigned to the user"""
345        return list(
346            x.split(".", maxsplit=1)[1]
347            for x in user.get_all_permissions()
348            if x.startswith("authentik_rbac")
349        )
350
351    class Meta:
352        model = User
353        fields = [
354            "pk",
355            "username",
356            "name",
357            "is_active",
358            "is_superuser",
359            "groups",
360            "roles",
361            "email",
362            "avatar",
363            "uid",
364            "settings",
365            "type",
366            "system_permissions",
367        ]
368        extra_kwargs = {
369            "is_active": {"read_only": True},
370            "name": {"allow_blank": True},
371        }

User Serializer for information a user can retrieve about themselves

is_superuser
avatar
groups
roles
uid
settings
system_permissions
def get_avatar(self, user: authentik.core.models.User) -> str:
297    def get_avatar(self, user: User) -> str:
298        """User's avatar, either a http/https URL or a data URI"""
299        return get_avatar(user, self.context.get("request"))

User's avatar, either a http/https URL or a data URI

@extend_schema_field(ListSerializer(child=inline_serializer('UserSelfGroups', {'name': CharField(read_only=True), 'pk': CharField(read_only=True)})))
def get_groups(self, _: authentik.core.models.User):
301    @extend_schema_field(
302        ListSerializer(
303            child=inline_serializer(
304                "UserSelfGroups",
305                {
306                    "name": CharField(read_only=True),
307                    "pk": CharField(read_only=True),
308                },
309            )
310        )
311    )
312    def get_groups(self, _: User):
313        """Return only the group names a user is member of"""
314        for group in self.instance.all_groups().order_by("name"):
315            yield {
316                "name": group.name,
317                "pk": group.pk,
318            }

Return only the group names a user is member of

@extend_schema_field(ListSerializer(child=inline_serializer('UserSelfRoles', {'name': CharField(read_only=True), 'pk': CharField(read_only=True)})))
def get_roles(self, _: authentik.core.models.User):
320    @extend_schema_field(
321        ListSerializer(
322            child=inline_serializer(
323                "UserSelfRoles",
324                {
325                    "name": CharField(read_only=True),
326                    "pk": CharField(read_only=True),
327                },
328            )
329        )
330    )
331    def get_roles(self, _: User):
332        """Return only the roles a user is member of"""
333        for role in self.instance.all_roles().order_by("name"):
334            yield {
335                "name": role.name,
336                "pk": role.pk,
337            }

Return only the roles a user is member of

def get_settings(self, user: authentik.core.models.User) -> dict[str, typing.Any]:
339    def get_settings(self, user: User) -> dict[str, Any]:
340        """Get user settings with brand and group settings applied"""
341        return user.group_attributes(self._context["request"]).get("settings", {})

Get user settings with brand and group settings applied

def get_system_permissions(self, user: authentik.core.models.User) -> list[str]:
343    def get_system_permissions(self, user: User) -> list[str]:
344        """Get all system permissions assigned to the user"""
345        return list(
346            x.split(".", maxsplit=1)[1]
347            for x in user.get_all_permissions()
348            if x.startswith("authentik_rbac")
349        )

Get all system permissions assigned to the user

class UserSelfSerializer.Meta:
351    class Meta:
352        model = User
353        fields = [
354            "pk",
355            "username",
356            "name",
357            "is_active",
358            "is_superuser",
359            "groups",
360            "roles",
361            "email",
362            "avatar",
363            "uid",
364            "settings",
365            "type",
366            "system_permissions",
367        ]
368        extra_kwargs = {
369            "is_active": {"read_only": True},
370            "name": {"allow_blank": True},
371        }
model = <class 'authentik.core.models.User'>
fields = ['pk', 'username', 'name', 'is_active', 'is_superuser', 'groups', 'roles', 'email', 'avatar', 'uid', 'settings', 'type', 'system_permissions']
extra_kwargs = {'is_active': {'read_only': True}, 'name': {'allow_blank': True}}
class SessionUserSerializer(authentik.core.api.utils.PassiveSerializer):
374class SessionUserSerializer(PassiveSerializer):
375    """Response for the /user/me endpoint, returns the currently active user (as `user` property)
376    and, if this user is being impersonated, the original user in the `original` property.
377    """
378
379    user = UserSelfSerializer()
380    original = UserSelfSerializer(required=False)

Response for the /user/me endpoint, returns the currently active user (as user property) and, if this user is being impersonated, the original user in the original property.

user
original
class UserPasswordSetSerializer(authentik.core.api.utils.PassiveSerializer):
383class UserPasswordSetSerializer(PassiveSerializer):
384    """Payload to set a users' password directly"""
385
386    password = CharField(required=True)

Payload to set a users' password directly

password
class UserServiceAccountSerializer(authentik.core.api.utils.PassiveSerializer):
389class UserServiceAccountSerializer(PassiveSerializer):
390    """Payload to create a service account"""
391
392    name = CharField(
393        required=True,
394        validators=[UniqueValidator(queryset=User.objects.all().order_by("username"))],
395    )
396    create_group = BooleanField(default=False)
397    expiring = BooleanField(default=True)
398    expires = DateTimeField(
399        required=False,
400        help_text="If not provided, valid for 360 days",
401    )

Payload to create a service account

name
create_group
expiring
expires
class UserRecoveryLinkSerializer(authentik.core.api.utils.PassiveSerializer):
404class UserRecoveryLinkSerializer(PassiveSerializer):
405    """Payload to create a recovery link"""
406
407    token_duration = CharField(required=False)

Payload to create a recovery link

token_duration
class UserRecoveryEmailSerializer(UserRecoveryLinkSerializer):
410class UserRecoveryEmailSerializer(UserRecoveryLinkSerializer):
411    """Payload to create and email a recovery link"""
412
413    email_stage = UUIDField()

Payload to create and email a recovery link

email_stage
class UsersFilter(django_filters.filterset.FilterSet):
416class UsersFilter(FilterSet):
417    """Filter for users"""
418
419    attributes = CharFilter(
420        field_name="attributes",
421        lookup_expr="",
422        label="Attributes",
423        method="filter_attributes",
424    )
425
426    date_joined__lt = IsoDateTimeFilter(field_name="date_joined", lookup_expr="lt")
427    date_joined = IsoDateTimeFilter(field_name="date_joined")
428    date_joined__gt = IsoDateTimeFilter(field_name="date_joined", lookup_expr="gt")
429
430    last_updated__lt = IsoDateTimeFilter(field_name="last_updated", lookup_expr="lt")
431    last_updated = IsoDateTimeFilter(field_name="last_updated")
432    last_updated__gt = IsoDateTimeFilter(field_name="last_updated", lookup_expr="gt")
433
434    last_login__lt = IsoDateTimeFilter(field_name="last_login", lookup_expr="lt")
435    last_login = IsoDateTimeFilter(field_name="last_login")
436    last_login__gt = IsoDateTimeFilter(field_name="last_login", lookup_expr="gt")
437    last_login__isnull = BooleanFilter(field_name="last_login", lookup_expr="isnull")
438
439    is_superuser = BooleanFilter(field_name="groups", method="filter_is_superuser")
440    uuid = UUIDFilter(field_name="uuid")
441
442    path = CharFilter(field_name="path")
443    path_startswith = CharFilter(field_name="path", lookup_expr="startswith")
444
445    type = MultipleChoiceFilter(choices=UserTypes.choices, field_name="type")
446
447    groups_by_name = ModelMultipleChoiceFilter(
448        field_name="groups__name",
449        to_field_name="name",
450        queryset=Group.objects.all().order_by("name"),
451    )
452    groups_by_pk = ModelMultipleChoiceFilter(
453        field_name="groups",
454        queryset=Group.objects.all().order_by("name"),
455    )
456
457    roles_by_name = ModelMultipleChoiceFilter(
458        field_name="roles__name",
459        to_field_name="name",
460        queryset=Role.objects.all().order_by("name"),
461    )
462    roles_by_pk = ModelMultipleChoiceFilter(
463        field_name="roles",
464        queryset=Role.objects.all().order_by("name"),
465    )
466
467    def filter_is_superuser(self, queryset, name, value):
468        if value:
469            return queryset.filter(groups__is_superuser=True).distinct()
470        return queryset.exclude(groups__is_superuser=True).distinct()
471
472    def filter_attributes(self, queryset, name, value):
473        """Filter attributes by query args"""
474        try:
475            value = loads(value)
476        except ValueError:
477            raise ValidationError(_("filter: failed to parse JSON")) from None
478        if not isinstance(value, dict):
479            raise ValidationError(_("filter: value must be key:value mapping"))
480        qs = {}
481        for key, _value in value.items():
482            qs[f"attributes__{key}"] = _value
483        try:
484            __ = len(queryset.filter(**qs))
485            return queryset.filter(**qs)
486        except ValueError:
487            return queryset
488
489    class Meta:
490        model = User
491        fields = [
492            "username",
493            "email",
494            "date_joined",
495            "last_updated",
496            "last_login",
497            "name",
498            "is_active",
499            "is_superuser",
500            "attributes",
501            "groups_by_name",
502            "groups_by_pk",
503            "roles_by_name",
504            "roles_by_pk",
505            "type",
506        ]

Filter for users

attributes
date_joined__lt
date_joined
date_joined__gt
last_updated__lt
last_updated
last_updated__gt
last_login__lt
last_login
last_login__gt
last_login__isnull
is_superuser
uuid
path
path_startswith
type
groups_by_name
groups_by_pk
roles_by_name
roles_by_pk
def filter_is_superuser(self, queryset, name, value):
467    def filter_is_superuser(self, queryset, name, value):
468        if value:
469            return queryset.filter(groups__is_superuser=True).distinct()
470        return queryset.exclude(groups__is_superuser=True).distinct()
def filter_attributes(self, queryset, name, value):
472    def filter_attributes(self, queryset, name, value):
473        """Filter attributes by query args"""
474        try:
475            value = loads(value)
476        except ValueError:
477            raise ValidationError(_("filter: failed to parse JSON")) from None
478        if not isinstance(value, dict):
479            raise ValidationError(_("filter: value must be key:value mapping"))
480        qs = {}
481        for key, _value in value.items():
482            qs[f"attributes__{key}"] = _value
483        try:
484            __ = len(queryset.filter(**qs))
485            return queryset.filter(**qs)
486        except ValueError:
487            return queryset

Filter attributes by query args

declared_filters = OrderedDict({'attributes': <django_filters.filters.CharFilter object>, 'date_joined__lt': <django_filters.filters.IsoDateTimeFilter object>, 'date_joined': <django_filters.filters.IsoDateTimeFilter object>, 'date_joined__gt': <django_filters.filters.IsoDateTimeFilter object>, 'last_updated__lt': <django_filters.filters.IsoDateTimeFilter object>, 'last_updated': <django_filters.filters.IsoDateTimeFilter object>, 'last_updated__gt': <django_filters.filters.IsoDateTimeFilter object>, 'last_login__lt': <django_filters.filters.IsoDateTimeFilter object>, 'last_login': <django_filters.filters.IsoDateTimeFilter object>, 'last_login__gt': <django_filters.filters.IsoDateTimeFilter object>, 'last_login__isnull': <django_filters.filters.BooleanFilter object>, 'is_superuser': <django_filters.filters.BooleanFilter object>, 'uuid': <django_filters.filters.UUIDFilter object>, 'path': <django_filters.filters.CharFilter object>, 'path_startswith': <django_filters.filters.CharFilter object>, 'type': <django_filters.filters.MultipleChoiceFilter object>, 'groups_by_name': <django_filters.filters.ModelMultipleChoiceFilter object>, 'groups_by_pk': <django_filters.filters.ModelMultipleChoiceFilter object>, 'roles_by_name': <django_filters.filters.ModelMultipleChoiceFilter object>, 'roles_by_pk': <django_filters.filters.ModelMultipleChoiceFilter object>})
base_filters = OrderedDict({'username': <django_filters.filters.CharFilter object>, 'email': <django_filters.filters.CharFilter object>, 'date_joined': <django_filters.filters.IsoDateTimeFilter object>, 'last_updated': <django_filters.filters.IsoDateTimeFilter object>, 'last_login': <django_filters.filters.IsoDateTimeFilter object>, 'name': <django_filters.filters.CharFilter object>, 'is_active': <django_filters.filters.BooleanFilter object>, 'is_superuser': <django_filters.filters.BooleanFilter object>, 'attributes': <django_filters.filters.CharFilter object>, 'groups_by_name': <django_filters.filters.ModelMultipleChoiceFilter object>, 'groups_by_pk': <django_filters.filters.ModelMultipleChoiceFilter object>, 'roles_by_name': <django_filters.filters.ModelMultipleChoiceFilter object>, 'roles_by_pk': <django_filters.filters.ModelMultipleChoiceFilter object>, 'type': <django_filters.filters.MultipleChoiceFilter object>, 'date_joined__lt': <django_filters.filters.IsoDateTimeFilter object>, 'date_joined__gt': <django_filters.filters.IsoDateTimeFilter object>, 'last_updated__lt': <django_filters.filters.IsoDateTimeFilter object>, 'last_updated__gt': <django_filters.filters.IsoDateTimeFilter object>, 'last_login__lt': <django_filters.filters.IsoDateTimeFilter object>, 'last_login__gt': <django_filters.filters.IsoDateTimeFilter object>, 'last_login__isnull': <django_filters.filters.BooleanFilter object>, 'uuid': <django_filters.filters.UUIDFilter object>, 'path': <django_filters.filters.CharFilter object>, 'path_startswith': <django_filters.filters.CharFilter object>})
class UsersFilter.Meta:
489    class Meta:
490        model = User
491        fields = [
492            "username",
493            "email",
494            "date_joined",
495            "last_updated",
496            "last_login",
497            "name",
498            "is_active",
499            "is_superuser",
500            "attributes",
501            "groups_by_name",
502            "groups_by_pk",
503            "roles_by_name",
504            "roles_by_pk",
505            "type",
506        ]
model = <class 'authentik.core.models.User'>
fields = ['username', 'email', 'date_joined', 'last_updated', 'last_login', 'name', 'is_active', 'is_superuser', 'attributes', 'groups_by_name', 'groups_by_pk', 'roles_by_name', 'roles_by_pk', 'type']
class UserViewSet(authentik.enterprise.reports.api.reports.ExportMixin, authentik.core.api.used_by.UsedByMixin, rest_framework.viewsets.ModelViewSet):
509class UserViewSet(
510    ConditionalInheritance("authentik.enterprise.reports.api.reports.ExportMixin"),
511    UsedByMixin,
512    ModelViewSet,
513):
514    """User Viewset"""
515
516    queryset = User.objects.none()
517    ordering = ["username", "date_joined", "last_updated", "last_login"]
518    serializer_class = UserSerializer
519    filterset_class = UsersFilter
520    search_fields = ["email", "name", "uuid", "username"]
521    authentication_classes = [
522        TokenAuthentication,
523        SessionAuthentication,
524        AgentAuth,
525    ]
526
527    def get_ql_fields(self):
528        from djangoql.schema import BoolField, StrField
529
530        from authentik.enterprise.search.fields import (
531            ChoiceSearchField,
532            JSONSearchField,
533        )
534
535        return [
536            StrField(User, "username"),
537            StrField(User, "name"),
538            StrField(User, "email"),
539            StrField(User, "path"),
540            BoolField(User, "is_active", nullable=True),
541            ChoiceSearchField(User, "type"),
542            JSONSearchField(User, "attributes"),
543        ]
544
545    def get_queryset(self):
546        base_qs = User.objects.all().exclude_anonymous()
547        if self.serializer_class(context={"request": self.request})._should_include_groups:
548            base_qs = base_qs.prefetch_related("groups")
549        if self.serializer_class(context={"request": self.request})._should_include_roles:
550            base_qs = base_qs.prefetch_related("roles")
551        return base_qs
552
553    @extend_schema(
554        parameters=[
555            OpenApiParameter("include_groups", bool, default=True),
556            OpenApiParameter("include_roles", bool, default=True),
557        ]
558    )
559    def list(self, request, *args, **kwargs):
560        return super().list(request, *args, **kwargs)
561
562    def _create_recovery_link(
563        self, token_duration: str | None, for_email=False
564    ) -> tuple[str, Token]:
565        """Create a recovery link (when the current brand has a recovery flow set),
566        that can either be shown to an admin or sent to the user directly"""
567        brand: Brand = self.request.brand
568        # Check that there is a recovery flow, if not return an error
569        flow = brand.flow_recovery
570        if not flow:
571            raise ValidationError({"non_field_errors": _("No recovery flow set.")})
572        user: User = self.get_object()
573        planner = FlowPlanner(flow)
574        planner.allow_empty_flows = True
575        self.request._request.user = AnonymousUser()
576        try:
577            plan = planner.plan(
578                self.request._request,
579                {
580                    PLAN_CONTEXT_PENDING_USER: user,
581                },
582            )
583        except FlowNonApplicableException:
584            raise ValidationError(
585                {"non_field_errors": _("Recovery flow not applicable to user")}
586            ) from None
587        _plan = FlowToken.pickle(plan)
588        if for_email:
589            _plan = pickle_flow_token_for_email(plan)
590        expires = default_token_duration()
591        if token_duration:
592            timedelta_string_validator(token_duration)
593            expires = now() + timedelta_from_string(token_duration)
594        token, __ = FlowToken.objects.update_or_create(
595            identifier=f"{user.uid}-password-reset",
596            defaults={
597                "user": user,
598                "flow": flow,
599                "_plan": _plan,
600                "revoke_on_execution": not for_email,
601                "expires": expires,
602            },
603        )
604        querystring = urlencode({QS_KEY_TOKEN: token.key})
605        link = self.request.build_absolute_uri(
606            reverse_lazy("authentik_core:if-flow", kwargs={"flow_slug": flow.slug})
607            + f"?{querystring}"
608        )
609        return link, token
610
611    @permission_required(None, ["authentik_core.add_user", "authentik_core.add_token"])
612    @extend_schema(
613        request=UserServiceAccountSerializer,
614        responses={
615            200: inline_serializer(
616                "UserServiceAccountResponse",
617                {
618                    "username": CharField(required=True),
619                    "token": CharField(required=True),
620                    "user_uid": CharField(required=True),
621                    "user_pk": IntegerField(required=True),
622                    "group_pk": CharField(required=False),
623                },
624            )
625        },
626    )
627    @action(
628        detail=False,
629        methods=["POST"],
630        pagination_class=None,
631        filter_backends=[],
632    )
633    @validate(UserServiceAccountSerializer)
634    def service_account(self, request: Request, body: UserServiceAccountSerializer) -> Response:
635        """Create a new user account that is marked as a service account"""
636        expires = body.validated_data.get("expires", now() + timedelta(days=360))
637
638        username = body.validated_data["name"]
639        expiring = body.validated_data["expiring"]
640        with atomic():
641            try:
642                user: User = User.objects.create(
643                    username=username,
644                    name=username,
645                    type=UserTypes.SERVICE_ACCOUNT,
646                    attributes={USER_ATTRIBUTE_TOKEN_EXPIRING: expiring},
647                    path=USER_PATH_SERVICE_ACCOUNT,
648                )
649                user.set_unusable_password()
650                user.save()
651
652                response = {
653                    "username": user.username,
654                    "user_uid": user.uid,
655                    "user_pk": user.pk,
656                }
657                if body.validated_data["create_group"] and self.request.user.has_perm(
658                    "authentik_core.add_group"
659                ):
660                    group = Group.objects.create(name=username)
661                    group.users.add(user)
662                    response["group_pk"] = str(group.pk)
663                token = Token.objects.create(
664                    identifier=slugify(f"service-account-{username}-password"),
665                    intent=TokenIntents.INTENT_APP_PASSWORD,
666                    user=user,
667                    expires=expires,
668                    expiring=expiring,
669                )
670                response["token"] = token.key
671                return Response(response)
672            except IntegrityError as exc:
673                error_msg = str(exc).lower()
674
675                if "unique" in error_msg:
676                    return Response(
677                        data={
678                            "non_field_errors": [
679                                _("A user/group with these details already exists")
680                            ]
681                        },
682                        status=400,
683                    )
684                else:
685                    LOGGER.warning("Service account creation failed", exc=exc)
686                    return Response(
687                        data={"non_field_errors": [_("Unable to create user")]},
688                        status=400,
689                    )
690            except (ValueError, TypeError) as exc:
691                LOGGER.error("Unexpected error during service account creation", exc=exc)
692                return Response(
693                    data={"non_field_errors": [_("Unknown error occurred")]},
694                    status=500,
695                )
696
697    @extend_schema(responses={200: SessionUserSerializer(many=False)})
698    @action(
699        url_path="me",
700        url_name="me",
701        detail=False,
702        pagination_class=None,
703        filter_backends=[],
704    )
705    def user_me(self, request: Request) -> Response:
706        """Get information about current user"""
707        context = {"request": request}
708        serializer = SessionUserSerializer(
709            data={"user": UserSelfSerializer(instance=request.user, context=context).data}
710        )
711        if SESSION_KEY_IMPERSONATE_USER in request._request.session:
712            serializer.initial_data["original"] = UserSelfSerializer(
713                instance=request._request.session[SESSION_KEY_IMPERSONATE_ORIGINAL_USER],
714                context=context,
715            ).data
716        self.request.session.modified = True
717        return Response(serializer.initial_data)
718
719    @permission_required("authentik_core.reset_user_password")
720    @extend_schema(
721        request=UserPasswordSetSerializer,
722        responses={
723            204: OpenApiResponse(description="Successfully changed password"),
724            400: OpenApiResponse(description="Bad request"),
725        },
726    )
727    @action(
728        detail=True,
729        methods=["POST"],
730        permission_classes=[IsAuthenticated],
731    )
732    @validate(UserPasswordSetSerializer)
733    def set_password(self, request: Request, pk: int, body: UserPasswordSetSerializer) -> Response:
734        """Set password for user"""
735        user: User = self.get_object()
736        try:
737            user.set_password(body.validated_data["password"], request=request)
738            user.save()
739        except (ValidationError, IntegrityError) as exc:
740            LOGGER.debug("Failed to set password", exc=exc)
741            return Response(status=400)
742        if user.pk == request.user.pk and SESSION_KEY_IMPERSONATE_USER not in self.request.session:
743            LOGGER.debug("Updating session hash after password change")
744            update_session_auth_hash(self.request, user)
745        return Response(status=204)
746
747    @permission_required("authentik_core.reset_user_password")
748    @extend_schema(
749        request=UserRecoveryLinkSerializer,
750        responses={
751            "200": LinkSerializer(many=False),
752        },
753    )
754    @action(detail=True, pagination_class=None, filter_backends=[], methods=["POST"])
755    @validate(UserRecoveryLinkSerializer)
756    def recovery(self, request: Request, pk: int, body: UserRecoveryLinkSerializer) -> Response:
757        """Create a temporary link that a user can use to recover their account"""
758        link, _ = self._create_recovery_link(
759            token_duration=body.validated_data.get("token_duration")
760        )
761        return Response({"link": link})
762
763    @permission_required("authentik_core.reset_user_password")
764    @extend_schema(
765        request=UserRecoveryEmailSerializer,
766        responses={
767            "204": OpenApiResponse(description="Successfully sent recover email"),
768        },
769    )
770    @action(detail=True, pagination_class=None, filter_backends=[], methods=["POST"])
771    @validate(UserRecoveryEmailSerializer)
772    def recovery_email(
773        self, request: Request, pk: int, body: UserRecoveryEmailSerializer
774    ) -> Response:
775        """Send an email with a temporary link that a user can use to recover their account"""
776        email_error_message = _("User does not have an email address set.")
777        stage_error_message = _("Email stage not found.")
778        user: User = self.get_object()
779        if not user.email:
780            LOGGER.debug("User doesn't have an email address")
781            raise ValidationError({"non_field_errors": email_error_message})
782        if not (stage := EmailStage.objects.filter(pk=body.validated_data["email_stage"]).first()):
783            LOGGER.debug("Email stage does not exist")
784            raise ValidationError({"non_field_errors": stage_error_message})
785        if not request.user.has_perm("authentik_stages_email.view_emailstage", stage):
786            LOGGER.debug("User has no view access to email stage")
787            raise ValidationError({"non_field_errors": stage_error_message})
788        link, token = self._create_recovery_link(
789            token_duration=body.validated_data.get("token_duration"), for_email=True
790        )
791        message = TemplateEmailMessage(
792            subject=_(stage.subject),
793            to=[(user.name, user.email)],
794            template_name=stage.template,
795            language=user.locale(request),
796            template_context={
797                "url": link,
798                "user": user,
799                "expires": token.expires,
800            },
801        )
802        send_mails(stage, message)
803        return Response(status=204)
804
805    @permission_required("authentik_core.impersonate")
806    @extend_schema(
807        request=inline_serializer(
808            "ImpersonationSerializer",
809            {
810                "reason": CharField(required=True),
811            },
812        ),
813        responses={
814            204: OpenApiResponse(description="Successfully started impersonation"),
815        },
816    )
817    @action(detail=True, methods=["POST"], permission_classes=[IsAuthenticated])
818    def impersonate(self, request: Request, pk: int) -> Response:
819        """Impersonate a user"""
820        if not request.tenant.impersonation:
821            LOGGER.debug("User attempted to impersonate", user=request.user)
822            return Response(status=401)
823        user_to_be = self.get_object()
824        reason = request.data.get("reason", "")
825        # Check both object-level perms and global perms
826        if not request.user.has_perm(
827            "authentik_core.impersonate", user_to_be
828        ) and not request.user.has_perm("authentik_core.impersonate"):
829            LOGGER.debug(
830                "User attempted to impersonate without permissions",
831                user=request.user,
832            )
833            return Response(status=403)
834        if user_to_be.pk == self.request.user.pk:
835            LOGGER.debug("User attempted to impersonate themselves", user=request.user)
836            return Response(status=401)
837        if not reason and request.tenant.impersonation_require_reason:
838            LOGGER.debug(
839                "User attempted to impersonate without providing a reason",
840                user=request.user,
841            )
842            raise ValidationError({"reason": _("This field is required.")})
843
844        request.session[SESSION_KEY_IMPERSONATE_ORIGINAL_USER] = request.user
845        request.session[SESSION_KEY_IMPERSONATE_USER] = user_to_be
846
847        Event.new(EventAction.IMPERSONATION_STARTED, reason=reason).from_http(request, user_to_be)
848
849        return Response(status=204)
850
851    @extend_schema(
852        request=None,
853        responses={
854            "204": OpenApiResponse(description="Successfully ended impersonation"),
855        },
856    )
857    @action(detail=False, methods=["GET"])
858    def impersonate_end(self, request: Request) -> Response:
859        """End Impersonation a user"""
860        if (
861            SESSION_KEY_IMPERSONATE_USER not in request.session
862            or SESSION_KEY_IMPERSONATE_ORIGINAL_USER not in request.session
863        ):
864            LOGGER.debug("Can't end impersonation", user=request.user)
865            return Response(status=204)
866
867        original_user = request.session[SESSION_KEY_IMPERSONATE_ORIGINAL_USER]
868
869        del request.session[SESSION_KEY_IMPERSONATE_USER]
870        del request.session[SESSION_KEY_IMPERSONATE_ORIGINAL_USER]
871
872        Event.new(EventAction.IMPERSONATION_ENDED).from_http(request, original_user)
873
874        return Response(status=204)
875
876    @extend_schema(
877        responses={
878            200: inline_serializer(
879                "UserPathSerializer",
880                {"paths": ListField(child=CharField(), read_only=True)},
881            )
882        },
883        parameters=[
884            OpenApiParameter(
885                name="search",
886                location=OpenApiParameter.QUERY,
887                type=OpenApiTypes.STR,
888            )
889        ],
890    )
891    @action(detail=False, pagination_class=None)
892    def paths(self, request: Request) -> Response:
893        """Get all user paths"""
894        return Response(
895            data={
896                "paths": list(
897                    self.filter_queryset(self.get_queryset())
898                    .values("path")
899                    .distinct()
900                    .order_by("path")
901                    .values_list("path", flat=True)
902                )
903            }
904        )
905
906    def partial_update(self, request: Request, *args, **kwargs) -> Response:
907        response = super().partial_update(request, *args, **kwargs)
908        instance: User = self.get_object()
909        if not instance.is_active:
910            Session.objects.filter(authenticatedsession__user=instance).delete()
911            LOGGER.debug("Deleted user's sessions", user=instance.username)
912        return response

User Viewset

queryset = <UserQuerySet []>
ordering = ['username', 'date_joined', 'last_updated', 'last_login']
serializer_class = <class 'UserSerializer'>
filterset_class = <class 'UsersFilter'>
search_fields = ['email', 'name', 'uuid', 'username']
authentication_classes = [<class 'authentik.api.authentication.TokenAuthentication'>, <class 'rest_framework.authentication.SessionAuthentication'>, <class 'authentik.endpoints.connectors.agent.auth.AgentAuth'>]
def get_ql_fields(self):
527    def get_ql_fields(self):
528        from djangoql.schema import BoolField, StrField
529
530        from authentik.enterprise.search.fields import (
531            ChoiceSearchField,
532            JSONSearchField,
533        )
534
535        return [
536            StrField(User, "username"),
537            StrField(User, "name"),
538            StrField(User, "email"),
539            StrField(User, "path"),
540            BoolField(User, "is_active", nullable=True),
541            ChoiceSearchField(User, "type"),
542            JSONSearchField(User, "attributes"),
543        ]
def get_queryset(self):
545    def get_queryset(self):
546        base_qs = User.objects.all().exclude_anonymous()
547        if self.serializer_class(context={"request": self.request})._should_include_groups:
548            base_qs = base_qs.prefetch_related("groups")
549        if self.serializer_class(context={"request": self.request})._should_include_roles:
550            base_qs = base_qs.prefetch_related("roles")
551        return base_qs

Get the list of items for this view. This must be an iterable, and may be a queryset. Defaults to using self.queryset.

This method should always be used rather than accessing self.queryset directly, as self.queryset gets evaluated only once, and those results are cached for all subsequent requests.

You may want to override this if you need to provide different querysets depending on the incoming request.

(Eg. return a list of items that is specific to the user)

@extend_schema(parameters=[OpenApiParameter('include_groups', bool, default=True), OpenApiParameter('include_roles', bool, default=True)])
def list(self, request, *args, **kwargs):
553    @extend_schema(
554        parameters=[
555            OpenApiParameter("include_groups", bool, default=True),
556            OpenApiParameter("include_roles", bool, default=True),
557        ]
558    )
559    def list(self, request, *args, **kwargs):
560        return super().list(request, *args, **kwargs)
@permission_required(None, ['authentik_core.add_user', 'authentik_core.add_token'])
@extend_schema(request=UserServiceAccountSerializer, responses={200: inline_serializer('UserServiceAccountResponse', {'username': CharField(required=True), 'token': CharField(required=True), 'user_uid': CharField(required=True), 'user_pk': IntegerField(required=True), 'group_pk': CharField(required=False)})})
@action(detail=False, methods=['POST'], pagination_class=None, filter_backends=[])
@validate(UserServiceAccountSerializer)
def service_account( self, request: rest_framework.request.Request, body: UserServiceAccountSerializer) -> rest_framework.response.Response:
611    @permission_required(None, ["authentik_core.add_user", "authentik_core.add_token"])
612    @extend_schema(
613        request=UserServiceAccountSerializer,
614        responses={
615            200: inline_serializer(
616                "UserServiceAccountResponse",
617                {
618                    "username": CharField(required=True),
619                    "token": CharField(required=True),
620                    "user_uid": CharField(required=True),
621                    "user_pk": IntegerField(required=True),
622                    "group_pk": CharField(required=False),
623                },
624            )
625        },
626    )
627    @action(
628        detail=False,
629        methods=["POST"],
630        pagination_class=None,
631        filter_backends=[],
632    )
633    @validate(UserServiceAccountSerializer)
634    def service_account(self, request: Request, body: UserServiceAccountSerializer) -> Response:
635        """Create a new user account that is marked as a service account"""
636        expires = body.validated_data.get("expires", now() + timedelta(days=360))
637
638        username = body.validated_data["name"]
639        expiring = body.validated_data["expiring"]
640        with atomic():
641            try:
642                user: User = User.objects.create(
643                    username=username,
644                    name=username,
645                    type=UserTypes.SERVICE_ACCOUNT,
646                    attributes={USER_ATTRIBUTE_TOKEN_EXPIRING: expiring},
647                    path=USER_PATH_SERVICE_ACCOUNT,
648                )
649                user.set_unusable_password()
650                user.save()
651
652                response = {
653                    "username": user.username,
654                    "user_uid": user.uid,
655                    "user_pk": user.pk,
656                }
657                if body.validated_data["create_group"] and self.request.user.has_perm(
658                    "authentik_core.add_group"
659                ):
660                    group = Group.objects.create(name=username)
661                    group.users.add(user)
662                    response["group_pk"] = str(group.pk)
663                token = Token.objects.create(
664                    identifier=slugify(f"service-account-{username}-password"),
665                    intent=TokenIntents.INTENT_APP_PASSWORD,
666                    user=user,
667                    expires=expires,
668                    expiring=expiring,
669                )
670                response["token"] = token.key
671                return Response(response)
672            except IntegrityError as exc:
673                error_msg = str(exc).lower()
674
675                if "unique" in error_msg:
676                    return Response(
677                        data={
678                            "non_field_errors": [
679                                _("A user/group with these details already exists")
680                            ]
681                        },
682                        status=400,
683                    )
684                else:
685                    LOGGER.warning("Service account creation failed", exc=exc)
686                    return Response(
687                        data={"non_field_errors": [_("Unable to create user")]},
688                        status=400,
689                    )
690            except (ValueError, TypeError) as exc:
691                LOGGER.error("Unexpected error during service account creation", exc=exc)
692                return Response(
693                    data={"non_field_errors": [_("Unknown error occurred")]},
694                    status=500,
695                )

Create a new user account that is marked as a service account

@extend_schema(responses={200: SessionUserSerializer(many=False)})
@action(url_path='me', url_name='me', detail=False, pagination_class=None, filter_backends=[])
def user_me( self, request: rest_framework.request.Request) -> rest_framework.response.Response:
697    @extend_schema(responses={200: SessionUserSerializer(many=False)})
698    @action(
699        url_path="me",
700        url_name="me",
701        detail=False,
702        pagination_class=None,
703        filter_backends=[],
704    )
705    def user_me(self, request: Request) -> Response:
706        """Get information about current user"""
707        context = {"request": request}
708        serializer = SessionUserSerializer(
709            data={"user": UserSelfSerializer(instance=request.user, context=context).data}
710        )
711        if SESSION_KEY_IMPERSONATE_USER in request._request.session:
712            serializer.initial_data["original"] = UserSelfSerializer(
713                instance=request._request.session[SESSION_KEY_IMPERSONATE_ORIGINAL_USER],
714                context=context,
715            ).data
716        self.request.session.modified = True
717        return Response(serializer.initial_data)

Get information about current user

@permission_required('authentik_core.reset_user_password')
@extend_schema(request=UserPasswordSetSerializer, responses={204: OpenApiResponse(description='Successfully changed password'), 400: OpenApiResponse(description='Bad request')})
@action(detail=True, methods=['POST'], permission_classes=[IsAuthenticated])
@validate(UserPasswordSetSerializer)
def set_password( self, request: rest_framework.request.Request, pk: int, body: UserPasswordSetSerializer) -> rest_framework.response.Response:
719    @permission_required("authentik_core.reset_user_password")
720    @extend_schema(
721        request=UserPasswordSetSerializer,
722        responses={
723            204: OpenApiResponse(description="Successfully changed password"),
724            400: OpenApiResponse(description="Bad request"),
725        },
726    )
727    @action(
728        detail=True,
729        methods=["POST"],
730        permission_classes=[IsAuthenticated],
731    )
732    @validate(UserPasswordSetSerializer)
733    def set_password(self, request: Request, pk: int, body: UserPasswordSetSerializer) -> Response:
734        """Set password for user"""
735        user: User = self.get_object()
736        try:
737            user.set_password(body.validated_data["password"], request=request)
738            user.save()
739        except (ValidationError, IntegrityError) as exc:
740            LOGGER.debug("Failed to set password", exc=exc)
741            return Response(status=400)
742        if user.pk == request.user.pk and SESSION_KEY_IMPERSONATE_USER not in self.request.session:
743            LOGGER.debug("Updating session hash after password change")
744            update_session_auth_hash(self.request, user)
745        return Response(status=204)

Set password for user

@permission_required('authentik_core.reset_user_password')
@extend_schema(request=UserRecoveryLinkSerializer, responses={'200': LinkSerializer(many=False)})
@action(detail=True, pagination_class=None, filter_backends=[], methods=['POST'])
@validate(UserRecoveryLinkSerializer)
def recovery( self, request: rest_framework.request.Request, pk: int, body: UserRecoveryLinkSerializer) -> rest_framework.response.Response:
747    @permission_required("authentik_core.reset_user_password")
748    @extend_schema(
749        request=UserRecoveryLinkSerializer,
750        responses={
751            "200": LinkSerializer(many=False),
752        },
753    )
754    @action(detail=True, pagination_class=None, filter_backends=[], methods=["POST"])
755    @validate(UserRecoveryLinkSerializer)
756    def recovery(self, request: Request, pk: int, body: UserRecoveryLinkSerializer) -> Response:
757        """Create a temporary link that a user can use to recover their account"""
758        link, _ = self._create_recovery_link(
759            token_duration=body.validated_data.get("token_duration")
760        )
761        return Response({"link": link})

Create a temporary link that a user can use to recover their account

@permission_required('authentik_core.reset_user_password')
@extend_schema(request=UserRecoveryEmailSerializer, responses={'204': OpenApiResponse(description='Successfully sent recover email')})
@action(detail=True, pagination_class=None, filter_backends=[], methods=['POST'])
@validate(UserRecoveryEmailSerializer)
def recovery_email( self, request: rest_framework.request.Request, pk: int, body: UserRecoveryEmailSerializer) -> rest_framework.response.Response:
763    @permission_required("authentik_core.reset_user_password")
764    @extend_schema(
765        request=UserRecoveryEmailSerializer,
766        responses={
767            "204": OpenApiResponse(description="Successfully sent recover email"),
768        },
769    )
770    @action(detail=True, pagination_class=None, filter_backends=[], methods=["POST"])
771    @validate(UserRecoveryEmailSerializer)
772    def recovery_email(
773        self, request: Request, pk: int, body: UserRecoveryEmailSerializer
774    ) -> Response:
775        """Send an email with a temporary link that a user can use to recover their account"""
776        email_error_message = _("User does not have an email address set.")
777        stage_error_message = _("Email stage not found.")
778        user: User = self.get_object()
779        if not user.email:
780            LOGGER.debug("User doesn't have an email address")
781            raise ValidationError({"non_field_errors": email_error_message})
782        if not (stage := EmailStage.objects.filter(pk=body.validated_data["email_stage"]).first()):
783            LOGGER.debug("Email stage does not exist")
784            raise ValidationError({"non_field_errors": stage_error_message})
785        if not request.user.has_perm("authentik_stages_email.view_emailstage", stage):
786            LOGGER.debug("User has no view access to email stage")
787            raise ValidationError({"non_field_errors": stage_error_message})
788        link, token = self._create_recovery_link(
789            token_duration=body.validated_data.get("token_duration"), for_email=True
790        )
791        message = TemplateEmailMessage(
792            subject=_(stage.subject),
793            to=[(user.name, user.email)],
794            template_name=stage.template,
795            language=user.locale(request),
796            template_context={
797                "url": link,
798                "user": user,
799                "expires": token.expires,
800            },
801        )
802        send_mails(stage, message)
803        return Response(status=204)

Send an email with a temporary link that a user can use to recover their account

@permission_required('authentik_core.impersonate')
@extend_schema(request=inline_serializer('ImpersonationSerializer', {'reason': CharField(required=True)}), responses={204: OpenApiResponse(description='Successfully started impersonation')})
@action(detail=True, methods=['POST'], permission_classes=[IsAuthenticated])
def impersonate( self, request: rest_framework.request.Request, pk: int) -> rest_framework.response.Response:
805    @permission_required("authentik_core.impersonate")
806    @extend_schema(
807        request=inline_serializer(
808            "ImpersonationSerializer",
809            {
810                "reason": CharField(required=True),
811            },
812        ),
813        responses={
814            204: OpenApiResponse(description="Successfully started impersonation"),
815        },
816    )
817    @action(detail=True, methods=["POST"], permission_classes=[IsAuthenticated])
818    def impersonate(self, request: Request, pk: int) -> Response:
819        """Impersonate a user"""
820        if not request.tenant.impersonation:
821            LOGGER.debug("User attempted to impersonate", user=request.user)
822            return Response(status=401)
823        user_to_be = self.get_object()
824        reason = request.data.get("reason", "")
825        # Check both object-level perms and global perms
826        if not request.user.has_perm(
827            "authentik_core.impersonate", user_to_be
828        ) and not request.user.has_perm("authentik_core.impersonate"):
829            LOGGER.debug(
830                "User attempted to impersonate without permissions",
831                user=request.user,
832            )
833            return Response(status=403)
834        if user_to_be.pk == self.request.user.pk:
835            LOGGER.debug("User attempted to impersonate themselves", user=request.user)
836            return Response(status=401)
837        if not reason and request.tenant.impersonation_require_reason:
838            LOGGER.debug(
839                "User attempted to impersonate without providing a reason",
840                user=request.user,
841            )
842            raise ValidationError({"reason": _("This field is required.")})
843
844        request.session[SESSION_KEY_IMPERSONATE_ORIGINAL_USER] = request.user
845        request.session[SESSION_KEY_IMPERSONATE_USER] = user_to_be
846
847        Event.new(EventAction.IMPERSONATION_STARTED, reason=reason).from_http(request, user_to_be)
848
849        return Response(status=204)

Impersonate a user

@extend_schema(request=None, responses={'204': OpenApiResponse(description='Successfully ended impersonation')})
@action(detail=False, methods=['GET'])
def impersonate_end( self, request: rest_framework.request.Request) -> rest_framework.response.Response:
851    @extend_schema(
852        request=None,
853        responses={
854            "204": OpenApiResponse(description="Successfully ended impersonation"),
855        },
856    )
857    @action(detail=False, methods=["GET"])
858    def impersonate_end(self, request: Request) -> Response:
859        """End Impersonation a user"""
860        if (
861            SESSION_KEY_IMPERSONATE_USER not in request.session
862            or SESSION_KEY_IMPERSONATE_ORIGINAL_USER not in request.session
863        ):
864            LOGGER.debug("Can't end impersonation", user=request.user)
865            return Response(status=204)
866
867        original_user = request.session[SESSION_KEY_IMPERSONATE_ORIGINAL_USER]
868
869        del request.session[SESSION_KEY_IMPERSONATE_USER]
870        del request.session[SESSION_KEY_IMPERSONATE_ORIGINAL_USER]
871
872        Event.new(EventAction.IMPERSONATION_ENDED).from_http(request, original_user)
873
874        return Response(status=204)

End Impersonation a user

@extend_schema(responses={200: inline_serializer('UserPathSerializer', {'paths': ListField(child=CharField(), read_only=True)})}, parameters=[OpenApiParameter(name='search', location=OpenApiParameter.QUERY, type=OpenApiTypes.STR)])
@action(detail=False, pagination_class=None)
def paths( self, request: rest_framework.request.Request) -> rest_framework.response.Response:
876    @extend_schema(
877        responses={
878            200: inline_serializer(
879                "UserPathSerializer",
880                {"paths": ListField(child=CharField(), read_only=True)},
881            )
882        },
883        parameters=[
884            OpenApiParameter(
885                name="search",
886                location=OpenApiParameter.QUERY,
887                type=OpenApiTypes.STR,
888            )
889        ],
890    )
891    @action(detail=False, pagination_class=None)
892    def paths(self, request: Request) -> Response:
893        """Get all user paths"""
894        return Response(
895            data={
896                "paths": list(
897                    self.filter_queryset(self.get_queryset())
898                    .values("path")
899                    .distinct()
900                    .order_by("path")
901                    .values_list("path", flat=True)
902                )
903            }
904        )

Get all user paths

def partial_update( self, request: rest_framework.request.Request, *args, **kwargs) -> rest_framework.response.Response:
906    def partial_update(self, request: Request, *args, **kwargs) -> Response:
907        response = super().partial_update(request, *args, **kwargs)
908        instance: User = self.get_object()
909        if not instance.is_active:
910            Session.objects.filter(authenticatedsession__user=instance).delete()
911            LOGGER.debug("Deleted user's sessions", user=instance.username)
912        return response
name = None
description = None
suffix = None
detail = None
basename = None