authentik.core.api.users
User API Views
1"""User API Views""" 2 3from datetime import timedelta 4from json import loads 5from typing import Any 6 7from django.contrib.auth import update_session_auth_hash 8from django.contrib.auth.models import AnonymousUser, Permission 9from django.db.models import Exists, OuterRef, Prefetch, Q 10from django.db.transaction import atomic 11from django.db.utils import IntegrityError 12from django.urls import reverse_lazy 13from django.utils.http import urlencode 14from django.utils.text import slugify 15from django.utils.timezone import now 16from django.utils.translation import gettext as _ 17from django.utils.translation import gettext_lazy 18from django_filters.filters import ( 19 BooleanFilter, 20 CharFilter, 21 IsoDateTimeFilter, 22 ModelMultipleChoiceFilter, 23 MultipleChoiceFilter, 24 UUIDFilter, 25) 26from django_filters.filterset import FilterSet 27from djangoql.schema import BoolField, StrField 28from drf_spectacular.types import OpenApiTypes 29from drf_spectacular.utils import ( 30 OpenApiParameter, 31 OpenApiResponse, 32 extend_schema, 33 extend_schema_field, 34 inline_serializer, 35) 36from rest_framework.authentication import SessionAuthentication 37from rest_framework.decorators import action 38from rest_framework.exceptions import ValidationError 39from rest_framework.fields import ( 40 BooleanField, 41 CharField, 42 ChoiceField, 43 DateTimeField, 44 IntegerField, 45 ListField, 46 SerializerMethodField, 47 UUIDField, 48) 49from rest_framework.permissions import IsAuthenticated 50from rest_framework.request import Request 51from rest_framework.response import Response 52from rest_framework.serializers import ( 53 ListSerializer, 54 PrimaryKeyRelatedField, 55) 56from rest_framework.validators import UniqueValidator 57from rest_framework.viewsets import ModelViewSet 58from structlog.stdlib import get_logger 59 60from authentik.api.authentication import TokenAuthentication 61from authentik.api.search.fields import ( 62 ChoiceSearchField, 63 JSONSearchField, 64) 65from authentik.api.validation import validate 66from authentik.blueprints.v1.importer import SERIALIZER_CONTEXT_BLUEPRINT 67from authentik.brands.models import Brand 68from authentik.core.api.used_by import UsedByMixin 69from authentik.core.api.utils import ( 70 JSONDictField, 71 LinkSerializer, 72 ModelSerializer, 73 PassiveSerializer, 74) 75from authentik.core.middleware import ( 76 SESSION_KEY_IMPERSONATE_ORIGINAL_USER, 77 SESSION_KEY_IMPERSONATE_USER, 78) 79from authentik.core.models import ( 80 USER_ATTRIBUTE_TOKEN_EXPIRING, 81 USER_PATH_SERVICE_ACCOUNT, 82 USERNAME_MAX_LENGTH, 83 Group, 84 Session, 85 Token, 86 TokenIntents, 87 User, 88 UserTypes, 89 default_token_duration, 90) 91from authentik.endpoints.connectors.agent.auth import AgentAuth 92from authentik.events.models import Event, EventAction 93from authentik.flows.exceptions import FlowNonApplicableException 94from authentik.flows.models import FlowToken 95from authentik.flows.planner import PLAN_CONTEXT_PENDING_USER, FlowPlanner 96from authentik.flows.views.executor import QS_KEY_TOKEN 97from authentik.lib.avatars import get_avatar 98from authentik.lib.utils.reflection import ConditionalInheritance 99from authentik.lib.utils.time import timedelta_from_string, timedelta_string_validator 100from authentik.rbac.api.roles import RoleSerializer 101from authentik.rbac.decorators import permission_required 102from authentik.rbac.models import Role, get_permission_choices 103from authentik.stages.email.flow import pickle_flow_token_for_email 104from authentik.stages.email.models import EmailStage 105from authentik.stages.email.tasks import send_mails 106from authentik.stages.email.utils import TemplateEmailMessage 107 108LOGGER = get_logger() 109 110INVALID_PASSWORD_HASH_MESSAGE = gettext_lazy( 111 "Invalid password hash format. Must be a valid Django password hash." 112) 113 114 115class ParamUserSerializer(PassiveSerializer): 116 """Partial serializer for query parameters to select a user""" 117 118 user = PrimaryKeyRelatedField(queryset=User.objects.all().exclude_anonymous(), required=False) 119 120 121class PartialGroupSerializer(ModelSerializer): 122 """Partial Group Serializer, does not include child relations.""" 123 124 attributes = JSONDictField(required=False) 125 126 class Meta: 127 model = Group 128 fields = [ 129 "pk", 130 "num_pk", 131 "name", 132 "is_superuser", 133 "attributes", 134 ] 135 136 137class UserSerializer(ModelSerializer): 138 """User Serializer""" 139 140 is_superuser = SerializerMethodField() 141 avatar = SerializerMethodField() 142 attributes = JSONDictField(required=False) 143 groups = PrimaryKeyRelatedField( 144 allow_empty=True, 145 many=True, 146 queryset=Group.objects.all().order_by("name"), 147 default=list, 148 ) 149 groups_obj = SerializerMethodField(allow_null=True) 150 roles = PrimaryKeyRelatedField( 151 allow_empty=True, 152 many=True, 153 queryset=Role.objects.all().order_by("name"), 154 default=list, 155 ) 156 roles_obj = SerializerMethodField(allow_null=True) 157 uid = CharField(read_only=True) 158 username = CharField( 159 max_length=USERNAME_MAX_LENGTH, 160 validators=[UniqueValidator(queryset=User.objects.all().order_by("username"))], 161 ) 162 163 @property 164 def _should_include_groups(self) -> bool: 165 request: Request = self.context.get("request", None) 166 if not request: 167 return True 168 return str(request.query_params.get("include_groups", "true")).lower() == "true" 169 170 @property 171 def _should_include_roles(self) -> bool: 172 request: Request = self.context.get("request", None) 173 if not request: 174 return True 175 return str(request.query_params.get("include_roles", "true")).lower() == "true" 176 177 @extend_schema_field(BooleanField) 178 def get_is_superuser(self, instance: User) -> bool: 179 """Use annotation if available to avoid N+1 query""" 180 ann = getattr(instance, "_annotated_is_superuser", None) 181 if ann is not None: 182 return ann 183 return instance.is_superuser 184 185 @extend_schema_field(PartialGroupSerializer(many=True)) 186 def get_groups_obj(self, instance: User) -> list[PartialGroupSerializer] | None: 187 if not self._should_include_groups: 188 return None 189 return PartialGroupSerializer(instance.groups, many=True).data 190 191 @extend_schema_field(RoleSerializer(many=True)) 192 def get_roles_obj(self, instance: User) -> list[RoleSerializer] | None: 193 if not self._should_include_roles: 194 return None 195 return RoleSerializer(instance.roles, many=True).data 196 197 def __init__(self, *args, **kwargs): 198 """Setting password and permissions directly is allowed only in blueprints.""" 199 super().__init__(*args, **kwargs) 200 if SERIALIZER_CONTEXT_BLUEPRINT in self.context: 201 self.fields["password"] = CharField(required=False, allow_null=True) 202 self.fields["password_hash"] = CharField(required=False, allow_null=True) 203 self.fields["permissions"] = ListField( 204 required=False, 205 child=ChoiceField(choices=get_permission_choices()), 206 ) 207 208 def create(self, validated_data: dict) -> User: 209 """Create a user, with blueprint-only password and permission writes.""" 210 is_blueprint = SERIALIZER_CONTEXT_BLUEPRINT in self.context 211 if is_blueprint: 212 password = validated_data.pop("password", None) 213 password_hash = validated_data.pop("password_hash", None) 214 permissions = validated_data.pop("permissions", []) 215 self._validate_password_inputs(password, password_hash) 216 217 instance: User = super().create(validated_data) 218 if is_blueprint: 219 self._set_password(instance, password, password_hash) 220 perms_qs = Permission.objects.filter( 221 codename__in=[permission.split(".")[1] for permission in permissions] 222 ).values_list("content_type__app_label", "codename") 223 perms_list = [f"{ct}.{name}" for ct, name in perms_qs] 224 instance.assign_perms_to_managed_role(perms_list) 225 self._ensure_password_not_empty(instance) 226 return instance 227 228 def update(self, instance: User, validated_data: dict) -> User: 229 """Update a user, with blueprint-only password and permission writes.""" 230 is_blueprint = SERIALIZER_CONTEXT_BLUEPRINT in self.context 231 if is_blueprint: 232 password = validated_data.pop("password", None) 233 password_hash = validated_data.pop("password_hash", None) 234 permissions = validated_data.pop("permissions", []) 235 self._validate_password_inputs(password, password_hash) 236 237 instance = super().update(instance, validated_data) 238 if is_blueprint: 239 self._set_password(instance, password, password_hash) 240 perms_qs = Permission.objects.filter( 241 codename__in=[permission.split(".")[1] for permission in permissions] 242 ).values_list("content_type__app_label", "codename") 243 perms_list = [f"{ct}.{name}" for ct, name in perms_qs] 244 instance.assign_perms_to_managed_role(perms_list) 245 self._ensure_password_not_empty(instance) 246 return instance 247 248 def _validate_password_inputs(self, password: str | None, password_hash: str | None): 249 """Validate mutually-exclusive password inputs before any model mutation.""" 250 if password is not None and password_hash is not None: 251 raise ValidationError(_("Cannot set both password and password_hash. Use only one.")) 252 if password_hash is None: 253 return 254 try: 255 User.validate_password_hash(password_hash) 256 except ValueError as exc: 257 LOGGER.warning("Failed to identify password hash format", exc_info=exc) 258 raise ValidationError(INVALID_PASSWORD_HASH_MESSAGE) from exc 259 260 def _set_password(self, instance: User, password: str | None, password_hash: str | None = None): 261 """Set password from plain text or hash.""" 262 if password_hash is not None: 263 instance.set_password_from_hash(password_hash) 264 instance.save() 265 elif password: 266 instance.set_password(password) 267 instance.save() 268 269 def _ensure_password_not_empty(self, instance: User): 270 """Store an explicit unusable password instead of an empty password field.""" 271 if len(instance.password) == 0: 272 instance.set_unusable_password() 273 instance.save() 274 275 def get_avatar(self, user: User) -> str: 276 """User's avatar, either a http/https URL or a data URI""" 277 return get_avatar(user, self.context.get("request")) 278 279 def validate_path(self, path: str) -> str: 280 """Validate path""" 281 if path[:1] == "/" or path[-1] == "/": 282 raise ValidationError(_("No leading or trailing slashes allowed.")) 283 for segment in path.split("/"): 284 if segment == "": 285 raise ValidationError(_("No empty segments in user path allowed.")) 286 return path 287 288 def validate_type(self, user_type: str) -> str: 289 """Validate user type, internal_service_account is an internal value""" 290 if ( 291 self.instance 292 and self.instance.type == UserTypes.INTERNAL_SERVICE_ACCOUNT 293 and user_type != UserTypes.INTERNAL_SERVICE_ACCOUNT.value 294 ): 295 raise ValidationError(_("Can't change internal service account to other user type.")) 296 if not self.instance and user_type == UserTypes.INTERNAL_SERVICE_ACCOUNT.value: 297 raise ValidationError(_("Setting a user to internal service account is not allowed.")) 298 return user_type 299 300 def validate_groups(self, groups: list) -> list: 301 """Require enable_group_superuser permission when adding a user to a superuser group.""" 302 request: Request = self.context.get("request", None) 303 if not request: 304 return groups 305 current_groups = set(self.instance.groups.all()) if self.instance else set() 306 for group in groups: 307 if not group.is_superuser: 308 continue 309 if group in current_groups: 310 continue 311 if not request.user.has_perm("authentik_core.enable_group_superuser"): 312 raise ValidationError( 313 _("User does not have permission to add members to a superuser group.") 314 ) 315 return groups 316 317 def validate_roles(self, roles: list) -> list: 318 """Require change_role permission when assigning new roles to a user.""" 319 request: Request = self.context.get("request", None) 320 if not request: 321 return roles 322 current_roles = set(self.instance.roles.all()) if self.instance else set() 323 new_roles = [r for r in roles if r not in current_roles] 324 if not new_roles: 325 return roles 326 if not request.user.has_perm("authentik_rbac.change_role"): 327 raise ValidationError(_("User does not have permission to assign roles.")) 328 return roles 329 330 def validate(self, attrs: dict) -> dict: 331 if self.instance and self.instance.type == UserTypes.INTERNAL_SERVICE_ACCOUNT: 332 raise ValidationError(_("Can't modify internal service account users")) 333 return super().validate(attrs) 334 335 class Meta: 336 model = User 337 fields = [ 338 "pk", 339 "username", 340 "name", 341 "is_active", 342 "last_login", 343 "date_joined", 344 "is_superuser", 345 "groups", 346 "groups_obj", 347 "roles", 348 "roles_obj", 349 "email", 350 "avatar", 351 "attributes", 352 "uid", 353 "path", 354 "type", 355 "uuid", 356 "password_change_date", 357 "last_updated", 358 ] 359 extra_kwargs = { 360 "name": {"allow_blank": True}, 361 "date_joined": {"read_only": True}, 362 "password_change_date": {"read_only": True}, 363 } 364 365 366class UserSelfSerializer(ModelSerializer): 367 """User Serializer for information a user can retrieve about themselves""" 368 369 is_superuser = BooleanField(read_only=True) 370 avatar = SerializerMethodField() 371 groups = SerializerMethodField() 372 roles = SerializerMethodField() 373 uid = CharField(read_only=True) 374 settings = SerializerMethodField() 375 system_permissions = SerializerMethodField() 376 377 def get_avatar(self, user: User) -> str: 378 """User's avatar, either a http/https URL or a data URI""" 379 return get_avatar(user, self.context.get("request")) 380 381 @extend_schema_field( 382 ListSerializer( 383 child=inline_serializer( 384 "UserSelfGroups", 385 { 386 "name": CharField(read_only=True), 387 "pk": CharField(read_only=True), 388 }, 389 ) 390 ) 391 ) 392 def get_groups(self, _: User): 393 """Return only the group names a user is member of""" 394 for group in self.instance.all_groups().order_by("name"): 395 yield { 396 "name": group.name, 397 "pk": group.pk, 398 } 399 400 @extend_schema_field( 401 ListSerializer( 402 child=inline_serializer( 403 "UserSelfRoles", 404 { 405 "name": CharField(read_only=True), 406 "pk": CharField(read_only=True), 407 }, 408 ) 409 ) 410 ) 411 def get_roles(self, _: User): 412 """Return only the roles a user is member of""" 413 for role in self.instance.all_roles().order_by("name"): 414 yield { 415 "name": role.name, 416 "pk": role.pk, 417 } 418 419 def get_settings(self, user: User) -> dict[str, Any]: 420 """Get user settings with brand and group settings applied""" 421 return user.group_attributes(self._context["request"]).get("settings", {}) 422 423 def get_system_permissions(self, user: User) -> list[str]: 424 """Get all system permissions assigned to the user""" 425 return list( 426 x.split(".", maxsplit=1)[1] 427 for x in user.get_all_permissions() 428 if x.startswith("authentik_rbac") 429 ) 430 431 class Meta: 432 model = User 433 fields = [ 434 "pk", 435 "username", 436 "name", 437 "is_active", 438 "is_superuser", 439 "groups", 440 "roles", 441 "email", 442 "avatar", 443 "uid", 444 "settings", 445 "type", 446 "system_permissions", 447 ] 448 extra_kwargs = { 449 "is_active": {"read_only": True}, 450 "name": {"allow_blank": True}, 451 } 452 453 454class SessionUserSerializer(PassiveSerializer): 455 """Response for the /user/me endpoint, returns the currently active user (as `user` property) 456 and, if this user is being impersonated, the original user in the `original` property. 457 """ 458 459 user = UserSelfSerializer() 460 original = UserSelfSerializer(required=False) 461 462 463class UserPasswordSetSerializer(PassiveSerializer): 464 """Payload to set a users' password directly""" 465 466 password = CharField(required=True) 467 468 469class UserPasswordHashSetSerializer(PassiveSerializer): 470 """Payload to set a users' password hash directly""" 471 472 password = CharField(required=True) 473 474 475class UserServiceAccountSerializer(PassiveSerializer): 476 """Payload to create a service account""" 477 478 name = CharField( 479 required=True, 480 validators=[UniqueValidator(queryset=User.objects.all().order_by("username"))], 481 ) 482 create_group = BooleanField(default=False) 483 expiring = BooleanField(default=True) 484 expires = DateTimeField( 485 required=False, 486 help_text="If not provided, valid for 360 days", 487 ) 488 489 490class UserRecoveryLinkSerializer(PassiveSerializer): 491 """Payload to create a recovery link""" 492 493 token_duration = CharField(required=False) 494 495 496class UserRecoveryEmailSerializer(UserRecoveryLinkSerializer): 497 """Payload to create and email a recovery link""" 498 499 email_stage = UUIDField() 500 501 502class UsersFilter(FilterSet): 503 """Filter for users""" 504 505 attributes = CharFilter( 506 field_name="attributes", 507 lookup_expr="", 508 label="Attributes", 509 method="filter_attributes", 510 ) 511 512 date_joined__lt = IsoDateTimeFilter(field_name="date_joined", lookup_expr="lt") 513 date_joined = IsoDateTimeFilter(field_name="date_joined") 514 date_joined__gt = IsoDateTimeFilter(field_name="date_joined", lookup_expr="gt") 515 516 last_updated__lt = IsoDateTimeFilter(field_name="last_updated", lookup_expr="lt") 517 last_updated = IsoDateTimeFilter(field_name="last_updated") 518 last_updated__gt = IsoDateTimeFilter(field_name="last_updated", lookup_expr="gt") 519 520 last_login__lt = IsoDateTimeFilter(field_name="last_login", lookup_expr="lt") 521 last_login = IsoDateTimeFilter(field_name="last_login") 522 last_login__gt = IsoDateTimeFilter(field_name="last_login", lookup_expr="gt") 523 last_login__isnull = BooleanFilter(field_name="last_login", lookup_expr="isnull") 524 525 is_superuser = BooleanFilter(field_name="groups", method="filter_is_superuser") 526 uuid = UUIDFilter(field_name="uuid") 527 528 path = CharFilter(field_name="path") 529 path_startswith = CharFilter(field_name="path", lookup_expr="startswith") 530 531 type = MultipleChoiceFilter(choices=UserTypes.choices, field_name="type") 532 533 groups_by_name = ModelMultipleChoiceFilter( 534 field_name="groups__name", 535 to_field_name="name", 536 queryset=Group.objects.all().order_by("name"), 537 ) 538 groups_by_pk = ModelMultipleChoiceFilter( 539 field_name="groups", 540 queryset=Group.objects.all().order_by("name"), 541 ) 542 543 roles_by_name = ModelMultipleChoiceFilter( 544 field_name="roles__name", 545 to_field_name="name", 546 queryset=Role.objects.all().order_by("name"), 547 ) 548 roles_by_pk = ModelMultipleChoiceFilter( 549 field_name="roles", 550 queryset=Role.objects.all().order_by("name"), 551 ) 552 553 def filter_is_superuser(self, queryset, name, value): 554 if value: 555 return queryset.filter(groups__is_superuser=True).distinct() 556 return queryset.exclude(groups__is_superuser=True).distinct() 557 558 def filter_attributes(self, queryset, name, value): 559 """Filter attributes by query args""" 560 try: 561 value = loads(value) 562 except ValueError: 563 raise ValidationError(_("filter: failed to parse JSON")) from None 564 if not isinstance(value, dict): 565 raise ValidationError(_("filter: value must be key:value mapping")) 566 qs = {} 567 for key, _value in value.items(): 568 qs[f"attributes__{key}"] = _value 569 try: 570 __ = len(queryset.filter(**qs)) 571 return queryset.filter(**qs) 572 except ValueError: 573 return queryset 574 575 class Meta: 576 model = User 577 fields = [ 578 "username", 579 "email", 580 "date_joined", 581 "last_updated", 582 "last_login", 583 "name", 584 "is_active", 585 "is_superuser", 586 "attributes", 587 "groups_by_name", 588 "groups_by_pk", 589 "roles_by_name", 590 "roles_by_pk", 591 "type", 592 ] 593 594 595class UserViewSet( 596 ConditionalInheritance( 597 "authentik.enterprise.stages.account_lockdown.api.UserAccountLockdownMixin" 598 ), 599 ConditionalInheritance("authentik.enterprise.reports.api.reports.ExportMixin"), 600 UsedByMixin, 601 ModelViewSet, 602): 603 """User Viewset""" 604 605 queryset = User.objects.none() 606 ordering = ["username", "date_joined", "last_updated", "last_login"] 607 serializer_class = UserSerializer 608 filterset_class = UsersFilter 609 search_fields = ["email", "name", "uuid", "username"] 610 authentication_classes = [ 611 TokenAuthentication, 612 SessionAuthentication, 613 AgentAuth, 614 ] 615 616 def get_ql_fields(self): 617 return [ 618 StrField(User, "username"), 619 StrField(User, "name"), 620 StrField(User, "email"), 621 StrField(User, "path"), 622 BoolField(User, "is_active", nullable=True), 623 ChoiceSearchField(User, "type"), 624 JSONSearchField(User, "attributes"), 625 ] 626 627 def get_queryset(self): 628 base_qs = User.objects.all().exclude_anonymous() 629 # Always prefetch groups since group PKs are always serialized. 630 # Use full prefetch when include_groups=true (for groups_obj), ID-only otherwise. 631 if self.serializer_class(context={"request": self.request})._should_include_groups: 632 base_qs = base_qs.prefetch_related("groups") 633 else: 634 base_qs = base_qs.prefetch_related( 635 Prefetch("groups", queryset=Group.objects.all().only("group_uuid")) 636 ) 637 if self.serializer_class(context={"request": self.request})._should_include_roles: 638 base_qs = base_qs.prefetch_related("roles") 639 else: 640 base_qs = base_qs.prefetch_related( 641 Prefetch("roles", queryset=Role.objects.all().only("uuid")) 642 ) 643 # Annotate is_superuser to avoid N+1 query per user 644 base_qs = base_qs.annotate( 645 _annotated_is_superuser=Exists( 646 Group.objects.filter( 647 is_superuser=True, 648 ).filter( 649 Q(users=OuterRef("pk")) | Q(descendant_nodes__descendant__users=OuterRef("pk")) 650 ) 651 ) 652 ) 653 return base_qs 654 655 @extend_schema( 656 parameters=[ 657 OpenApiParameter("include_groups", bool, default=True), 658 OpenApiParameter("include_roles", bool, default=True), 659 ] 660 ) 661 def list(self, request, *args, **kwargs): 662 return super().list(request, *args, **kwargs) 663 664 def _create_recovery_link( 665 self, token_duration: str | None, for_email=False 666 ) -> tuple[str, Token]: 667 """Create a recovery link (when the current brand has a recovery flow set), 668 that can either be shown to an admin or sent to the user directly""" 669 brand: Brand = self.request.brand 670 # Check that there is a recovery flow, if not return an error 671 flow = brand.flow_recovery 672 if not flow: 673 raise ValidationError({"non_field_errors": _("No recovery flow set.")}) 674 user: User = self.get_object() 675 planner = FlowPlanner(flow) 676 planner.allow_empty_flows = True 677 self.request._request.user = AnonymousUser() 678 try: 679 plan = planner.plan( 680 self.request._request, 681 { 682 PLAN_CONTEXT_PENDING_USER: user, 683 }, 684 ) 685 except FlowNonApplicableException: 686 raise ValidationError( 687 {"non_field_errors": _("Recovery flow not applicable to user")} 688 ) from None 689 _plan = FlowToken.pickle(plan) 690 if for_email: 691 _plan = pickle_flow_token_for_email(plan) 692 expires = default_token_duration() 693 if token_duration: 694 timedelta_string_validator(token_duration) 695 expires = now() + timedelta_from_string(token_duration) 696 token, __ = FlowToken.objects.update_or_create( 697 identifier=f"{user.uid}-password-reset", 698 defaults={ 699 "user": user, 700 "flow": flow, 701 "_plan": _plan, 702 "revoke_on_execution": not for_email, 703 "expires": expires, 704 }, 705 ) 706 querystring = urlencode({QS_KEY_TOKEN: token.key}) 707 link = self.request.build_absolute_uri( 708 reverse_lazy("authentik_core:if-flow", kwargs={"flow_slug": flow.slug}) 709 + f"?{querystring}" 710 ) 711 return link, token 712 713 @permission_required(None, ["authentik_core.add_user", "authentik_core.add_token"]) 714 @extend_schema( 715 request=UserServiceAccountSerializer, 716 responses={ 717 200: inline_serializer( 718 "UserServiceAccountResponse", 719 { 720 "username": CharField(required=True), 721 "token": CharField(required=True), 722 "user_uid": CharField(required=True), 723 "user_pk": IntegerField(required=True), 724 "group_pk": CharField(required=False), 725 }, 726 ) 727 }, 728 ) 729 @action( 730 detail=False, 731 methods=["POST"], 732 pagination_class=None, 733 filter_backends=[], 734 ) 735 @validate(UserServiceAccountSerializer) 736 def service_account(self, request: Request, body: UserServiceAccountSerializer) -> Response: 737 """Create a new user account that is marked as a service account""" 738 expires = body.validated_data.get("expires", now() + timedelta(days=360)) 739 740 username = body.validated_data["name"] 741 expiring = body.validated_data["expiring"] 742 with atomic(): 743 try: 744 user: User = User.objects.create( 745 username=username, 746 name=username, 747 type=UserTypes.SERVICE_ACCOUNT, 748 attributes={USER_ATTRIBUTE_TOKEN_EXPIRING: expiring}, 749 path=USER_PATH_SERVICE_ACCOUNT, 750 ) 751 user.set_unusable_password() 752 user.save() 753 754 response = { 755 "username": user.username, 756 "user_uid": user.uid, 757 "user_pk": user.pk, 758 } 759 if body.validated_data["create_group"] and self.request.user.has_perm( 760 "authentik_core.add_group" 761 ): 762 group = Group.objects.create(name=username) 763 group.users.add(user) 764 response["group_pk"] = str(group.pk) 765 token = Token.objects.create( 766 identifier=slugify(f"service-account-{username}-password"), 767 intent=TokenIntents.INTENT_APP_PASSWORD, 768 user=user, 769 expires=expires, 770 expiring=expiring, 771 ) 772 response["token"] = token.key 773 return Response(response) 774 except IntegrityError as exc: 775 error_msg = str(exc).lower() 776 777 if "unique" in error_msg: 778 return Response( 779 data={ 780 "non_field_errors": [ 781 _("A user/group with these details already exists") 782 ] 783 }, 784 status=400, 785 ) 786 else: 787 LOGGER.warning("Service account creation failed", exc=exc) 788 return Response( 789 data={"non_field_errors": [_("Unable to create user")]}, 790 status=400, 791 ) 792 except (ValueError, TypeError) as exc: 793 LOGGER.error("Unexpected error during service account creation", exc=exc) 794 return Response( 795 data={"non_field_errors": [_("Unknown error occurred")]}, 796 status=500, 797 ) 798 799 @extend_schema(responses={200: SessionUserSerializer(many=False)}) 800 @action( 801 url_path="me", 802 url_name="me", 803 detail=False, 804 pagination_class=None, 805 filter_backends=[], 806 ) 807 def user_me(self, request: Request) -> Response: 808 """Get information about current user""" 809 context = {"request": request} 810 serializer = SessionUserSerializer( 811 data={"user": UserSelfSerializer(instance=request.user, context=context).data} 812 ) 813 if SESSION_KEY_IMPERSONATE_USER in request._request.session: 814 serializer.initial_data["original"] = UserSelfSerializer( 815 instance=request._request.session[SESSION_KEY_IMPERSONATE_ORIGINAL_USER], 816 context=context, 817 ).data 818 self.request.session.modified = True 819 return Response(serializer.initial_data) 820 821 def _update_session_hash_after_password_change(self, request: Request, user: User): 822 if user.pk == request.user.pk and SESSION_KEY_IMPERSONATE_USER not in self.request.session: 823 LOGGER.debug("Updating session hash after password change") 824 update_session_auth_hash(self.request, user) 825 826 @permission_required("authentik_core.reset_user_password") 827 @extend_schema( 828 request=UserPasswordSetSerializer, 829 responses={ 830 204: OpenApiResponse(description="Successfully changed password"), 831 400: OpenApiResponse(description="Bad request"), 832 }, 833 ) 834 @action( 835 detail=True, 836 methods=["POST"], 837 permission_classes=[IsAuthenticated], 838 ) 839 @validate(UserPasswordSetSerializer) 840 def set_password(self, request: Request, pk: int, body: UserPasswordSetSerializer) -> Response: 841 """Set password for user""" 842 user: User = self.get_object() 843 try: 844 user.set_password(body.validated_data["password"], request=request) 845 user.save() 846 except (ValidationError, IntegrityError) as exc: 847 LOGGER.debug("Failed to set password", exc=exc) 848 return Response(status=400) 849 self._update_session_hash_after_password_change(request, user) 850 return Response(status=204) 851 852 @permission_required("authentik_core.reset_user_password") 853 @extend_schema( 854 request=UserPasswordHashSetSerializer, 855 responses={ 856 204: OpenApiResponse(description="Successfully changed password"), 857 400: OpenApiResponse(description="Bad request"), 858 }, 859 ) 860 @action( 861 detail=True, 862 methods=["POST"], 863 permission_classes=[IsAuthenticated], 864 ) 865 @validate(UserPasswordHashSetSerializer) 866 def set_password_hash( 867 self, request: Request, pk: int, body: UserPasswordHashSetSerializer 868 ) -> Response: 869 """Set a user's password from a pre-hashed Django password value. 870 871 Submit the Django password hash in the shared ``password`` request field. 872 873 This updates authentik's local password verifier only. It does not attempt 874 to propagate the password change to LDAP or Kerberos because no raw password 875 is available from the request payload. 876 """ 877 user: User = self.get_object() 878 try: 879 user.set_password_from_hash(body.validated_data["password"], request=request) 880 user.save() 881 except ValueError as exc: 882 LOGGER.debug("Failed to set password hash", exc=exc) 883 return Response(data={"password": [INVALID_PASSWORD_HASH_MESSAGE]}, status=400) 884 except (ValidationError, IntegrityError) as exc: 885 LOGGER.debug("Failed to set password hash", exc=exc) 886 return Response(status=400) 887 self._update_session_hash_after_password_change(request, user) 888 return Response(status=204) 889 890 @permission_required("authentik_core.reset_user_password") 891 @extend_schema( 892 request=UserRecoveryLinkSerializer, 893 responses={ 894 "200": LinkSerializer(many=False), 895 }, 896 ) 897 @action(detail=True, pagination_class=None, filter_backends=[], methods=["POST"]) 898 @validate(UserRecoveryLinkSerializer) 899 def recovery(self, request: Request, pk: int, body: UserRecoveryLinkSerializer) -> Response: 900 """Create a temporary link that a user can use to recover their account""" 901 link, _ = self._create_recovery_link( 902 token_duration=body.validated_data.get("token_duration") 903 ) 904 return Response({"link": link}) 905 906 @permission_required("authentik_core.reset_user_password") 907 @extend_schema( 908 request=UserRecoveryEmailSerializer, 909 responses={ 910 "204": OpenApiResponse(description="Successfully sent recover email"), 911 }, 912 ) 913 @action(detail=True, pagination_class=None, filter_backends=[], methods=["POST"]) 914 @validate(UserRecoveryEmailSerializer) 915 def recovery_email( 916 self, request: Request, pk: int, body: UserRecoveryEmailSerializer 917 ) -> Response: 918 """Send an email with a temporary link that a user can use to recover their account""" 919 email_error_message = _("User does not have an email address set.") 920 stage_error_message = _("Email stage not found.") 921 user: User = self.get_object() 922 if not user.email: 923 LOGGER.debug("User doesn't have an email address") 924 raise ValidationError({"non_field_errors": email_error_message}) 925 if not (stage := EmailStage.objects.filter(pk=body.validated_data["email_stage"]).first()): 926 LOGGER.debug("Email stage does not exist") 927 raise ValidationError({"non_field_errors": stage_error_message}) 928 if not request.user.has_perm("authentik_stages_email.view_emailstage", stage): 929 LOGGER.debug("User has no view access to email stage") 930 raise ValidationError({"non_field_errors": stage_error_message}) 931 link, token = self._create_recovery_link( 932 token_duration=body.validated_data.get("token_duration"), for_email=True 933 ) 934 message = TemplateEmailMessage( 935 subject=_(stage.subject), 936 to=[(user.name, user.email)], 937 template_name=stage.template, 938 language=user.locale(request), 939 template_context={ 940 "url": link, 941 "user": user, 942 "expires": token.expires, 943 }, 944 ) 945 send_mails(stage, message) 946 return Response(status=204) 947 948 @permission_required("authentik_core.impersonate") 949 @extend_schema( 950 request=inline_serializer( 951 "ImpersonationSerializer", 952 { 953 "reason": CharField(required=True), 954 }, 955 ), 956 responses={ 957 204: OpenApiResponse(description="Successfully started impersonation"), 958 }, 959 ) 960 @action(detail=True, methods=["POST"], permission_classes=[IsAuthenticated]) 961 def impersonate(self, request: Request, pk: int) -> Response: 962 """Impersonate a user""" 963 if not request.tenant.impersonation: 964 LOGGER.debug("User attempted to impersonate", user=request.user) 965 return Response(status=401) 966 user_to_be = self.get_object() 967 reason = request.data.get("reason", "") 968 # Check both object-level perms and global perms 969 if not request.user.has_perm( 970 "authentik_core.impersonate", user_to_be 971 ) and not request.user.has_perm("authentik_core.impersonate"): 972 LOGGER.debug( 973 "User attempted to impersonate without permissions", 974 user=request.user, 975 ) 976 return Response(status=403) 977 if user_to_be.pk == self.request.user.pk: 978 LOGGER.debug("User attempted to impersonate themselves", user=request.user) 979 return Response(status=401) 980 if not reason and request.tenant.impersonation_require_reason: 981 LOGGER.debug( 982 "User attempted to impersonate without providing a reason", 983 user=request.user, 984 ) 985 raise ValidationError({"reason": _("This field is required.")}) 986 987 request.session[SESSION_KEY_IMPERSONATE_ORIGINAL_USER] = request.user 988 request.session[SESSION_KEY_IMPERSONATE_USER] = user_to_be 989 990 Event.new(EventAction.IMPERSONATION_STARTED, reason=reason).from_http(request, user_to_be) 991 992 return Response(status=204) 993 994 @extend_schema( 995 request=None, 996 responses={ 997 "204": OpenApiResponse(description="Successfully ended impersonation"), 998 }, 999 ) 1000 @action(detail=False, methods=["GET"]) 1001 def impersonate_end(self, request: Request) -> Response: 1002 """End Impersonation a user""" 1003 if ( 1004 SESSION_KEY_IMPERSONATE_USER not in request.session 1005 or SESSION_KEY_IMPERSONATE_ORIGINAL_USER not in request.session 1006 ): 1007 LOGGER.debug("Can't end impersonation", user=request.user) 1008 return Response(status=204) 1009 1010 original_user = request.session[SESSION_KEY_IMPERSONATE_ORIGINAL_USER] 1011 1012 del request.session[SESSION_KEY_IMPERSONATE_USER] 1013 del request.session[SESSION_KEY_IMPERSONATE_ORIGINAL_USER] 1014 1015 Event.new(EventAction.IMPERSONATION_ENDED).from_http(request, original_user) 1016 1017 return Response(status=204) 1018 1019 @extend_schema( 1020 responses={ 1021 200: inline_serializer( 1022 "UserPathSerializer", 1023 {"paths": ListField(child=CharField(), read_only=True)}, 1024 ) 1025 }, 1026 parameters=[ 1027 OpenApiParameter( 1028 name="search", 1029 location=OpenApiParameter.QUERY, 1030 type=OpenApiTypes.STR, 1031 ) 1032 ], 1033 ) 1034 @action(detail=False, pagination_class=None) 1035 def paths(self, request: Request) -> Response: 1036 """Get all user paths""" 1037 return Response( 1038 data={ 1039 "paths": list( 1040 self.filter_queryset(self.get_queryset()) 1041 .values("path") 1042 .distinct() 1043 .order_by("path") 1044 .values_list("path", flat=True) 1045 ) 1046 } 1047 ) 1048 1049 def partial_update(self, request: Request, *args, **kwargs) -> Response: 1050 response = super().partial_update(request, *args, **kwargs) 1051 instance: User = self.get_object() 1052 if not instance.is_active: 1053 Session.objects.filter(authenticatedsession__user=instance).delete() 1054 LOGGER.debug("Deleted user's sessions", user=instance.username) 1055 return response
116class ParamUserSerializer(PassiveSerializer): 117 """Partial serializer for query parameters to select a user""" 118 119 user = PrimaryKeyRelatedField(queryset=User.objects.all().exclude_anonymous(), required=False)
Partial serializer for query parameters to select a user
Inherited Members
122class PartialGroupSerializer(ModelSerializer): 123 """Partial Group Serializer, does not include child relations.""" 124 125 attributes = JSONDictField(required=False) 126 127 class Meta: 128 model = Group 129 fields = [ 130 "pk", 131 "num_pk", 132 "name", 133 "is_superuser", 134 "attributes", 135 ]
Partial Group Serializer, does not include child relations.
Inherited Members
127 class Meta: 128 model = Group 129 fields = [ 130 "pk", 131 "num_pk", 132 "name", 133 "is_superuser", 134 "attributes", 135 ]
138class UserSerializer(ModelSerializer): 139 """User Serializer""" 140 141 is_superuser = SerializerMethodField() 142 avatar = SerializerMethodField() 143 attributes = JSONDictField(required=False) 144 groups = PrimaryKeyRelatedField( 145 allow_empty=True, 146 many=True, 147 queryset=Group.objects.all().order_by("name"), 148 default=list, 149 ) 150 groups_obj = SerializerMethodField(allow_null=True) 151 roles = PrimaryKeyRelatedField( 152 allow_empty=True, 153 many=True, 154 queryset=Role.objects.all().order_by("name"), 155 default=list, 156 ) 157 roles_obj = SerializerMethodField(allow_null=True) 158 uid = CharField(read_only=True) 159 username = CharField( 160 max_length=USERNAME_MAX_LENGTH, 161 validators=[UniqueValidator(queryset=User.objects.all().order_by("username"))], 162 ) 163 164 @property 165 def _should_include_groups(self) -> bool: 166 request: Request = self.context.get("request", None) 167 if not request: 168 return True 169 return str(request.query_params.get("include_groups", "true")).lower() == "true" 170 171 @property 172 def _should_include_roles(self) -> bool: 173 request: Request = self.context.get("request", None) 174 if not request: 175 return True 176 return str(request.query_params.get("include_roles", "true")).lower() == "true" 177 178 @extend_schema_field(BooleanField) 179 def get_is_superuser(self, instance: User) -> bool: 180 """Use annotation if available to avoid N+1 query""" 181 ann = getattr(instance, "_annotated_is_superuser", None) 182 if ann is not None: 183 return ann 184 return instance.is_superuser 185 186 @extend_schema_field(PartialGroupSerializer(many=True)) 187 def get_groups_obj(self, instance: User) -> list[PartialGroupSerializer] | None: 188 if not self._should_include_groups: 189 return None 190 return PartialGroupSerializer(instance.groups, many=True).data 191 192 @extend_schema_field(RoleSerializer(many=True)) 193 def get_roles_obj(self, instance: User) -> list[RoleSerializer] | None: 194 if not self._should_include_roles: 195 return None 196 return RoleSerializer(instance.roles, many=True).data 197 198 def __init__(self, *args, **kwargs): 199 """Setting password and permissions directly is allowed only in blueprints.""" 200 super().__init__(*args, **kwargs) 201 if SERIALIZER_CONTEXT_BLUEPRINT in self.context: 202 self.fields["password"] = CharField(required=False, allow_null=True) 203 self.fields["password_hash"] = CharField(required=False, allow_null=True) 204 self.fields["permissions"] = ListField( 205 required=False, 206 child=ChoiceField(choices=get_permission_choices()), 207 ) 208 209 def create(self, validated_data: dict) -> User: 210 """Create a user, with blueprint-only password and permission writes.""" 211 is_blueprint = SERIALIZER_CONTEXT_BLUEPRINT in self.context 212 if is_blueprint: 213 password = validated_data.pop("password", None) 214 password_hash = validated_data.pop("password_hash", None) 215 permissions = validated_data.pop("permissions", []) 216 self._validate_password_inputs(password, password_hash) 217 218 instance: User = super().create(validated_data) 219 if is_blueprint: 220 self._set_password(instance, password, password_hash) 221 perms_qs = Permission.objects.filter( 222 codename__in=[permission.split(".")[1] for permission in permissions] 223 ).values_list("content_type__app_label", "codename") 224 perms_list = [f"{ct}.{name}" for ct, name in perms_qs] 225 instance.assign_perms_to_managed_role(perms_list) 226 self._ensure_password_not_empty(instance) 227 return instance 228 229 def update(self, instance: User, validated_data: dict) -> User: 230 """Update a user, with blueprint-only password and permission writes.""" 231 is_blueprint = SERIALIZER_CONTEXT_BLUEPRINT in self.context 232 if is_blueprint: 233 password = validated_data.pop("password", None) 234 password_hash = validated_data.pop("password_hash", None) 235 permissions = validated_data.pop("permissions", []) 236 self._validate_password_inputs(password, password_hash) 237 238 instance = super().update(instance, validated_data) 239 if is_blueprint: 240 self._set_password(instance, password, password_hash) 241 perms_qs = Permission.objects.filter( 242 codename__in=[permission.split(".")[1] for permission in permissions] 243 ).values_list("content_type__app_label", "codename") 244 perms_list = [f"{ct}.{name}" for ct, name in perms_qs] 245 instance.assign_perms_to_managed_role(perms_list) 246 self._ensure_password_not_empty(instance) 247 return instance 248 249 def _validate_password_inputs(self, password: str | None, password_hash: str | None): 250 """Validate mutually-exclusive password inputs before any model mutation.""" 251 if password is not None and password_hash is not None: 252 raise ValidationError(_("Cannot set both password and password_hash. Use only one.")) 253 if password_hash is None: 254 return 255 try: 256 User.validate_password_hash(password_hash) 257 except ValueError as exc: 258 LOGGER.warning("Failed to identify password hash format", exc_info=exc) 259 raise ValidationError(INVALID_PASSWORD_HASH_MESSAGE) from exc 260 261 def _set_password(self, instance: User, password: str | None, password_hash: str | None = None): 262 """Set password from plain text or hash.""" 263 if password_hash is not None: 264 instance.set_password_from_hash(password_hash) 265 instance.save() 266 elif password: 267 instance.set_password(password) 268 instance.save() 269 270 def _ensure_password_not_empty(self, instance: User): 271 """Store an explicit unusable password instead of an empty password field.""" 272 if len(instance.password) == 0: 273 instance.set_unusable_password() 274 instance.save() 275 276 def get_avatar(self, user: User) -> str: 277 """User's avatar, either a http/https URL or a data URI""" 278 return get_avatar(user, self.context.get("request")) 279 280 def validate_path(self, path: str) -> str: 281 """Validate path""" 282 if path[:1] == "/" or path[-1] == "/": 283 raise ValidationError(_("No leading or trailing slashes allowed.")) 284 for segment in path.split("/"): 285 if segment == "": 286 raise ValidationError(_("No empty segments in user path allowed.")) 287 return path 288 289 def validate_type(self, user_type: str) -> str: 290 """Validate user type, internal_service_account is an internal value""" 291 if ( 292 self.instance 293 and self.instance.type == UserTypes.INTERNAL_SERVICE_ACCOUNT 294 and user_type != UserTypes.INTERNAL_SERVICE_ACCOUNT.value 295 ): 296 raise ValidationError(_("Can't change internal service account to other user type.")) 297 if not self.instance and user_type == UserTypes.INTERNAL_SERVICE_ACCOUNT.value: 298 raise ValidationError(_("Setting a user to internal service account is not allowed.")) 299 return user_type 300 301 def validate_groups(self, groups: list) -> list: 302 """Require enable_group_superuser permission when adding a user to a superuser group.""" 303 request: Request = self.context.get("request", None) 304 if not request: 305 return groups 306 current_groups = set(self.instance.groups.all()) if self.instance else set() 307 for group in groups: 308 if not group.is_superuser: 309 continue 310 if group in current_groups: 311 continue 312 if not request.user.has_perm("authentik_core.enable_group_superuser"): 313 raise ValidationError( 314 _("User does not have permission to add members to a superuser group.") 315 ) 316 return groups 317 318 def validate_roles(self, roles: list) -> list: 319 """Require change_role permission when assigning new roles to a user.""" 320 request: Request = self.context.get("request", None) 321 if not request: 322 return roles 323 current_roles = set(self.instance.roles.all()) if self.instance else set() 324 new_roles = [r for r in roles if r not in current_roles] 325 if not new_roles: 326 return roles 327 if not request.user.has_perm("authentik_rbac.change_role"): 328 raise ValidationError(_("User does not have permission to assign roles.")) 329 return roles 330 331 def validate(self, attrs: dict) -> dict: 332 if self.instance and self.instance.type == UserTypes.INTERNAL_SERVICE_ACCOUNT: 333 raise ValidationError(_("Can't modify internal service account users")) 334 return super().validate(attrs) 335 336 class Meta: 337 model = User 338 fields = [ 339 "pk", 340 "username", 341 "name", 342 "is_active", 343 "last_login", 344 "date_joined", 345 "is_superuser", 346 "groups", 347 "groups_obj", 348 "roles", 349 "roles_obj", 350 "email", 351 "avatar", 352 "attributes", 353 "uid", 354 "path", 355 "type", 356 "uuid", 357 "password_change_date", 358 "last_updated", 359 ] 360 extra_kwargs = { 361 "name": {"allow_blank": True}, 362 "date_joined": {"read_only": True}, 363 "password_change_date": {"read_only": True}, 364 }
User Serializer
198 def __init__(self, *args, **kwargs): 199 """Setting password and permissions directly is allowed only in blueprints.""" 200 super().__init__(*args, **kwargs) 201 if SERIALIZER_CONTEXT_BLUEPRINT in self.context: 202 self.fields["password"] = CharField(required=False, allow_null=True) 203 self.fields["password_hash"] = CharField(required=False, allow_null=True) 204 self.fields["permissions"] = ListField( 205 required=False, 206 child=ChoiceField(choices=get_permission_choices()), 207 )
Setting password and permissions directly is allowed only in blueprints.
178 @extend_schema_field(BooleanField) 179 def get_is_superuser(self, instance: User) -> bool: 180 """Use annotation if available to avoid N+1 query""" 181 ann = getattr(instance, "_annotated_is_superuser", None) 182 if ann is not None: 183 return ann 184 return instance.is_superuser
Use annotation if available to avoid N+1 query
209 def create(self, validated_data: dict) -> User: 210 """Create a user, with blueprint-only password and permission writes.""" 211 is_blueprint = SERIALIZER_CONTEXT_BLUEPRINT in self.context 212 if is_blueprint: 213 password = validated_data.pop("password", None) 214 password_hash = validated_data.pop("password_hash", None) 215 permissions = validated_data.pop("permissions", []) 216 self._validate_password_inputs(password, password_hash) 217 218 instance: User = super().create(validated_data) 219 if is_blueprint: 220 self._set_password(instance, password, password_hash) 221 perms_qs = Permission.objects.filter( 222 codename__in=[permission.split(".")[1] for permission in permissions] 223 ).values_list("content_type__app_label", "codename") 224 perms_list = [f"{ct}.{name}" for ct, name in perms_qs] 225 instance.assign_perms_to_managed_role(perms_list) 226 self._ensure_password_not_empty(instance) 227 return instance
Create a user, with blueprint-only password and permission writes.
229 def update(self, instance: User, validated_data: dict) -> User: 230 """Update a user, with blueprint-only password and permission writes.""" 231 is_blueprint = SERIALIZER_CONTEXT_BLUEPRINT in self.context 232 if is_blueprint: 233 password = validated_data.pop("password", None) 234 password_hash = validated_data.pop("password_hash", None) 235 permissions = validated_data.pop("permissions", []) 236 self._validate_password_inputs(password, password_hash) 237 238 instance = super().update(instance, validated_data) 239 if is_blueprint: 240 self._set_password(instance, password, password_hash) 241 perms_qs = Permission.objects.filter( 242 codename__in=[permission.split(".")[1] for permission in permissions] 243 ).values_list("content_type__app_label", "codename") 244 perms_list = [f"{ct}.{name}" for ct, name in perms_qs] 245 instance.assign_perms_to_managed_role(perms_list) 246 self._ensure_password_not_empty(instance) 247 return instance
Update a user, with blueprint-only password and permission writes.
276 def get_avatar(self, user: User) -> str: 277 """User's avatar, either a http/https URL or a data URI""" 278 return get_avatar(user, self.context.get("request"))
User's avatar, either a http/https URL or a data URI
280 def validate_path(self, path: str) -> str: 281 """Validate path""" 282 if path[:1] == "/" or path[-1] == "/": 283 raise ValidationError(_("No leading or trailing slashes allowed.")) 284 for segment in path.split("/"): 285 if segment == "": 286 raise ValidationError(_("No empty segments in user path allowed.")) 287 return path
Validate path
289 def validate_type(self, user_type: str) -> str: 290 """Validate user type, internal_service_account is an internal value""" 291 if ( 292 self.instance 293 and self.instance.type == UserTypes.INTERNAL_SERVICE_ACCOUNT 294 and user_type != UserTypes.INTERNAL_SERVICE_ACCOUNT.value 295 ): 296 raise ValidationError(_("Can't change internal service account to other user type.")) 297 if not self.instance and user_type == UserTypes.INTERNAL_SERVICE_ACCOUNT.value: 298 raise ValidationError(_("Setting a user to internal service account is not allowed.")) 299 return user_type
Validate user type, internal_service_account is an internal value
301 def validate_groups(self, groups: list) -> list: 302 """Require enable_group_superuser permission when adding a user to a superuser group.""" 303 request: Request = self.context.get("request", None) 304 if not request: 305 return groups 306 current_groups = set(self.instance.groups.all()) if self.instance else set() 307 for group in groups: 308 if not group.is_superuser: 309 continue 310 if group in current_groups: 311 continue 312 if not request.user.has_perm("authentik_core.enable_group_superuser"): 313 raise ValidationError( 314 _("User does not have permission to add members to a superuser group.") 315 ) 316 return groups
Require enable_group_superuser permission when adding a user to a superuser group.
318 def validate_roles(self, roles: list) -> list: 319 """Require change_role permission when assigning new roles to a user.""" 320 request: Request = self.context.get("request", None) 321 if not request: 322 return roles 323 current_roles = set(self.instance.roles.all()) if self.instance else set() 324 new_roles = [r for r in roles if r not in current_roles] 325 if not new_roles: 326 return roles 327 if not request.user.has_perm("authentik_rbac.change_role"): 328 raise ValidationError(_("User does not have permission to assign roles.")) 329 return roles
Require change_role permission when assigning new roles to a user.
Inherited Members
336 class Meta: 337 model = User 338 fields = [ 339 "pk", 340 "username", 341 "name", 342 "is_active", 343 "last_login", 344 "date_joined", 345 "is_superuser", 346 "groups", 347 "groups_obj", 348 "roles", 349 "roles_obj", 350 "email", 351 "avatar", 352 "attributes", 353 "uid", 354 "path", 355 "type", 356 "uuid", 357 "password_change_date", 358 "last_updated", 359 ] 360 extra_kwargs = { 361 "name": {"allow_blank": True}, 362 "date_joined": {"read_only": True}, 363 "password_change_date": {"read_only": True}, 364 }
367class UserSelfSerializer(ModelSerializer): 368 """User Serializer for information a user can retrieve about themselves""" 369 370 is_superuser = BooleanField(read_only=True) 371 avatar = SerializerMethodField() 372 groups = SerializerMethodField() 373 roles = SerializerMethodField() 374 uid = CharField(read_only=True) 375 settings = SerializerMethodField() 376 system_permissions = SerializerMethodField() 377 378 def get_avatar(self, user: User) -> str: 379 """User's avatar, either a http/https URL or a data URI""" 380 return get_avatar(user, self.context.get("request")) 381 382 @extend_schema_field( 383 ListSerializer( 384 child=inline_serializer( 385 "UserSelfGroups", 386 { 387 "name": CharField(read_only=True), 388 "pk": CharField(read_only=True), 389 }, 390 ) 391 ) 392 ) 393 def get_groups(self, _: User): 394 """Return only the group names a user is member of""" 395 for group in self.instance.all_groups().order_by("name"): 396 yield { 397 "name": group.name, 398 "pk": group.pk, 399 } 400 401 @extend_schema_field( 402 ListSerializer( 403 child=inline_serializer( 404 "UserSelfRoles", 405 { 406 "name": CharField(read_only=True), 407 "pk": CharField(read_only=True), 408 }, 409 ) 410 ) 411 ) 412 def get_roles(self, _: User): 413 """Return only the roles a user is member of""" 414 for role in self.instance.all_roles().order_by("name"): 415 yield { 416 "name": role.name, 417 "pk": role.pk, 418 } 419 420 def get_settings(self, user: User) -> dict[str, Any]: 421 """Get user settings with brand and group settings applied""" 422 return user.group_attributes(self._context["request"]).get("settings", {}) 423 424 def get_system_permissions(self, user: User) -> list[str]: 425 """Get all system permissions assigned to the user""" 426 return list( 427 x.split(".", maxsplit=1)[1] 428 for x in user.get_all_permissions() 429 if x.startswith("authentik_rbac") 430 ) 431 432 class Meta: 433 model = User 434 fields = [ 435 "pk", 436 "username", 437 "name", 438 "is_active", 439 "is_superuser", 440 "groups", 441 "roles", 442 "email", 443 "avatar", 444 "uid", 445 "settings", 446 "type", 447 "system_permissions", 448 ] 449 extra_kwargs = { 450 "is_active": {"read_only": True}, 451 "name": {"allow_blank": True}, 452 }
User Serializer for information a user can retrieve about themselves
378 def get_avatar(self, user: User) -> str: 379 """User's avatar, either a http/https URL or a data URI""" 380 return get_avatar(user, self.context.get("request"))
User's avatar, either a http/https URL or a data URI
382 @extend_schema_field( 383 ListSerializer( 384 child=inline_serializer( 385 "UserSelfGroups", 386 { 387 "name": CharField(read_only=True), 388 "pk": CharField(read_only=True), 389 }, 390 ) 391 ) 392 ) 393 def get_groups(self, _: User): 394 """Return only the group names a user is member of""" 395 for group in self.instance.all_groups().order_by("name"): 396 yield { 397 "name": group.name, 398 "pk": group.pk, 399 }
Return only the group names a user is member of
401 @extend_schema_field( 402 ListSerializer( 403 child=inline_serializer( 404 "UserSelfRoles", 405 { 406 "name": CharField(read_only=True), 407 "pk": CharField(read_only=True), 408 }, 409 ) 410 ) 411 ) 412 def get_roles(self, _: User): 413 """Return only the roles a user is member of""" 414 for role in self.instance.all_roles().order_by("name"): 415 yield { 416 "name": role.name, 417 "pk": role.pk, 418 }
Return only the roles a user is member of
420 def get_settings(self, user: User) -> dict[str, Any]: 421 """Get user settings with brand and group settings applied""" 422 return user.group_attributes(self._context["request"]).get("settings", {})
Get user settings with brand and group settings applied
424 def get_system_permissions(self, user: User) -> list[str]: 425 """Get all system permissions assigned to the user""" 426 return list( 427 x.split(".", maxsplit=1)[1] 428 for x in user.get_all_permissions() 429 if x.startswith("authentik_rbac") 430 )
Get all system permissions assigned to the user
Inherited Members
432 class Meta: 433 model = User 434 fields = [ 435 "pk", 436 "username", 437 "name", 438 "is_active", 439 "is_superuser", 440 "groups", 441 "roles", 442 "email", 443 "avatar", 444 "uid", 445 "settings", 446 "type", 447 "system_permissions", 448 ] 449 extra_kwargs = { 450 "is_active": {"read_only": True}, 451 "name": {"allow_blank": True}, 452 }
455class SessionUserSerializer(PassiveSerializer): 456 """Response for the /user/me endpoint, returns the currently active user (as `user` property) 457 and, if this user is being impersonated, the original user in the `original` property. 458 """ 459 460 user = UserSelfSerializer() 461 original = UserSelfSerializer(required=False)
Response for the /user/me endpoint, returns the currently active user (as user property)
and, if this user is being impersonated, the original user in the original property.
Inherited Members
464class UserPasswordSetSerializer(PassiveSerializer): 465 """Payload to set a users' password directly""" 466 467 password = CharField(required=True)
Payload to set a users' password directly
Inherited Members
470class UserPasswordHashSetSerializer(PassiveSerializer): 471 """Payload to set a users' password hash directly""" 472 473 password = CharField(required=True)
Payload to set a users' password hash directly
Inherited Members
476class UserServiceAccountSerializer(PassiveSerializer): 477 """Payload to create a service account""" 478 479 name = CharField( 480 required=True, 481 validators=[UniqueValidator(queryset=User.objects.all().order_by("username"))], 482 ) 483 create_group = BooleanField(default=False) 484 expiring = BooleanField(default=True) 485 expires = DateTimeField( 486 required=False, 487 help_text="If not provided, valid for 360 days", 488 )
Payload to create a service account
Inherited Members
491class UserRecoveryLinkSerializer(PassiveSerializer): 492 """Payload to create a recovery link""" 493 494 token_duration = CharField(required=False)
Payload to create a recovery link
Inherited Members
497class UserRecoveryEmailSerializer(UserRecoveryLinkSerializer): 498 """Payload to create and email a recovery link""" 499 500 email_stage = UUIDField()
Payload to create and email a recovery link
503class UsersFilter(FilterSet): 504 """Filter for users""" 505 506 attributes = CharFilter( 507 field_name="attributes", 508 lookup_expr="", 509 label="Attributes", 510 method="filter_attributes", 511 ) 512 513 date_joined__lt = IsoDateTimeFilter(field_name="date_joined", lookup_expr="lt") 514 date_joined = IsoDateTimeFilter(field_name="date_joined") 515 date_joined__gt = IsoDateTimeFilter(field_name="date_joined", lookup_expr="gt") 516 517 last_updated__lt = IsoDateTimeFilter(field_name="last_updated", lookup_expr="lt") 518 last_updated = IsoDateTimeFilter(field_name="last_updated") 519 last_updated__gt = IsoDateTimeFilter(field_name="last_updated", lookup_expr="gt") 520 521 last_login__lt = IsoDateTimeFilter(field_name="last_login", lookup_expr="lt") 522 last_login = IsoDateTimeFilter(field_name="last_login") 523 last_login__gt = IsoDateTimeFilter(field_name="last_login", lookup_expr="gt") 524 last_login__isnull = BooleanFilter(field_name="last_login", lookup_expr="isnull") 525 526 is_superuser = BooleanFilter(field_name="groups", method="filter_is_superuser") 527 uuid = UUIDFilter(field_name="uuid") 528 529 path = CharFilter(field_name="path") 530 path_startswith = CharFilter(field_name="path", lookup_expr="startswith") 531 532 type = MultipleChoiceFilter(choices=UserTypes.choices, field_name="type") 533 534 groups_by_name = ModelMultipleChoiceFilter( 535 field_name="groups__name", 536 to_field_name="name", 537 queryset=Group.objects.all().order_by("name"), 538 ) 539 groups_by_pk = ModelMultipleChoiceFilter( 540 field_name="groups", 541 queryset=Group.objects.all().order_by("name"), 542 ) 543 544 roles_by_name = ModelMultipleChoiceFilter( 545 field_name="roles__name", 546 to_field_name="name", 547 queryset=Role.objects.all().order_by("name"), 548 ) 549 roles_by_pk = ModelMultipleChoiceFilter( 550 field_name="roles", 551 queryset=Role.objects.all().order_by("name"), 552 ) 553 554 def filter_is_superuser(self, queryset, name, value): 555 if value: 556 return queryset.filter(groups__is_superuser=True).distinct() 557 return queryset.exclude(groups__is_superuser=True).distinct() 558 559 def filter_attributes(self, queryset, name, value): 560 """Filter attributes by query args""" 561 try: 562 value = loads(value) 563 except ValueError: 564 raise ValidationError(_("filter: failed to parse JSON")) from None 565 if not isinstance(value, dict): 566 raise ValidationError(_("filter: value must be key:value mapping")) 567 qs = {} 568 for key, _value in value.items(): 569 qs[f"attributes__{key}"] = _value 570 try: 571 __ = len(queryset.filter(**qs)) 572 return queryset.filter(**qs) 573 except ValueError: 574 return queryset 575 576 class Meta: 577 model = User 578 fields = [ 579 "username", 580 "email", 581 "date_joined", 582 "last_updated", 583 "last_login", 584 "name", 585 "is_active", 586 "is_superuser", 587 "attributes", 588 "groups_by_name", 589 "groups_by_pk", 590 "roles_by_name", 591 "roles_by_pk", 592 "type", 593 ]
Filter for users
559 def filter_attributes(self, queryset, name, value): 560 """Filter attributes by query args""" 561 try: 562 value = loads(value) 563 except ValueError: 564 raise ValidationError(_("filter: failed to parse JSON")) from None 565 if not isinstance(value, dict): 566 raise ValidationError(_("filter: value must be key:value mapping")) 567 qs = {} 568 for key, _value in value.items(): 569 qs[f"attributes__{key}"] = _value 570 try: 571 __ = len(queryset.filter(**qs)) 572 return queryset.filter(**qs) 573 except ValueError: 574 return queryset
Filter attributes by query args
576 class Meta: 577 model = User 578 fields = [ 579 "username", 580 "email", 581 "date_joined", 582 "last_updated", 583 "last_login", 584 "name", 585 "is_active", 586 "is_superuser", 587 "attributes", 588 "groups_by_name", 589 "groups_by_pk", 590 "roles_by_name", 591 "roles_by_pk", 592 "type", 593 ]
596class UserViewSet( 597 ConditionalInheritance( 598 "authentik.enterprise.stages.account_lockdown.api.UserAccountLockdownMixin" 599 ), 600 ConditionalInheritance("authentik.enterprise.reports.api.reports.ExportMixin"), 601 UsedByMixin, 602 ModelViewSet, 603): 604 """User Viewset""" 605 606 queryset = User.objects.none() 607 ordering = ["username", "date_joined", "last_updated", "last_login"] 608 serializer_class = UserSerializer 609 filterset_class = UsersFilter 610 search_fields = ["email", "name", "uuid", "username"] 611 authentication_classes = [ 612 TokenAuthentication, 613 SessionAuthentication, 614 AgentAuth, 615 ] 616 617 def get_ql_fields(self): 618 return [ 619 StrField(User, "username"), 620 StrField(User, "name"), 621 StrField(User, "email"), 622 StrField(User, "path"), 623 BoolField(User, "is_active", nullable=True), 624 ChoiceSearchField(User, "type"), 625 JSONSearchField(User, "attributes"), 626 ] 627 628 def get_queryset(self): 629 base_qs = User.objects.all().exclude_anonymous() 630 # Always prefetch groups since group PKs are always serialized. 631 # Use full prefetch when include_groups=true (for groups_obj), ID-only otherwise. 632 if self.serializer_class(context={"request": self.request})._should_include_groups: 633 base_qs = base_qs.prefetch_related("groups") 634 else: 635 base_qs = base_qs.prefetch_related( 636 Prefetch("groups", queryset=Group.objects.all().only("group_uuid")) 637 ) 638 if self.serializer_class(context={"request": self.request})._should_include_roles: 639 base_qs = base_qs.prefetch_related("roles") 640 else: 641 base_qs = base_qs.prefetch_related( 642 Prefetch("roles", queryset=Role.objects.all().only("uuid")) 643 ) 644 # Annotate is_superuser to avoid N+1 query per user 645 base_qs = base_qs.annotate( 646 _annotated_is_superuser=Exists( 647 Group.objects.filter( 648 is_superuser=True, 649 ).filter( 650 Q(users=OuterRef("pk")) | Q(descendant_nodes__descendant__users=OuterRef("pk")) 651 ) 652 ) 653 ) 654 return base_qs 655 656 @extend_schema( 657 parameters=[ 658 OpenApiParameter("include_groups", bool, default=True), 659 OpenApiParameter("include_roles", bool, default=True), 660 ] 661 ) 662 def list(self, request, *args, **kwargs): 663 return super().list(request, *args, **kwargs) 664 665 def _create_recovery_link( 666 self, token_duration: str | None, for_email=False 667 ) -> tuple[str, Token]: 668 """Create a recovery link (when the current brand has a recovery flow set), 669 that can either be shown to an admin or sent to the user directly""" 670 brand: Brand = self.request.brand 671 # Check that there is a recovery flow, if not return an error 672 flow = brand.flow_recovery 673 if not flow: 674 raise ValidationError({"non_field_errors": _("No recovery flow set.")}) 675 user: User = self.get_object() 676 planner = FlowPlanner(flow) 677 planner.allow_empty_flows = True 678 self.request._request.user = AnonymousUser() 679 try: 680 plan = planner.plan( 681 self.request._request, 682 { 683 PLAN_CONTEXT_PENDING_USER: user, 684 }, 685 ) 686 except FlowNonApplicableException: 687 raise ValidationError( 688 {"non_field_errors": _("Recovery flow not applicable to user")} 689 ) from None 690 _plan = FlowToken.pickle(plan) 691 if for_email: 692 _plan = pickle_flow_token_for_email(plan) 693 expires = default_token_duration() 694 if token_duration: 695 timedelta_string_validator(token_duration) 696 expires = now() + timedelta_from_string(token_duration) 697 token, __ = FlowToken.objects.update_or_create( 698 identifier=f"{user.uid}-password-reset", 699 defaults={ 700 "user": user, 701 "flow": flow, 702 "_plan": _plan, 703 "revoke_on_execution": not for_email, 704 "expires": expires, 705 }, 706 ) 707 querystring = urlencode({QS_KEY_TOKEN: token.key}) 708 link = self.request.build_absolute_uri( 709 reverse_lazy("authentik_core:if-flow", kwargs={"flow_slug": flow.slug}) 710 + f"?{querystring}" 711 ) 712 return link, token 713 714 @permission_required(None, ["authentik_core.add_user", "authentik_core.add_token"]) 715 @extend_schema( 716 request=UserServiceAccountSerializer, 717 responses={ 718 200: inline_serializer( 719 "UserServiceAccountResponse", 720 { 721 "username": CharField(required=True), 722 "token": CharField(required=True), 723 "user_uid": CharField(required=True), 724 "user_pk": IntegerField(required=True), 725 "group_pk": CharField(required=False), 726 }, 727 ) 728 }, 729 ) 730 @action( 731 detail=False, 732 methods=["POST"], 733 pagination_class=None, 734 filter_backends=[], 735 ) 736 @validate(UserServiceAccountSerializer) 737 def service_account(self, request: Request, body: UserServiceAccountSerializer) -> Response: 738 """Create a new user account that is marked as a service account""" 739 expires = body.validated_data.get("expires", now() + timedelta(days=360)) 740 741 username = body.validated_data["name"] 742 expiring = body.validated_data["expiring"] 743 with atomic(): 744 try: 745 user: User = User.objects.create( 746 username=username, 747 name=username, 748 type=UserTypes.SERVICE_ACCOUNT, 749 attributes={USER_ATTRIBUTE_TOKEN_EXPIRING: expiring}, 750 path=USER_PATH_SERVICE_ACCOUNT, 751 ) 752 user.set_unusable_password() 753 user.save() 754 755 response = { 756 "username": user.username, 757 "user_uid": user.uid, 758 "user_pk": user.pk, 759 } 760 if body.validated_data["create_group"] and self.request.user.has_perm( 761 "authentik_core.add_group" 762 ): 763 group = Group.objects.create(name=username) 764 group.users.add(user) 765 response["group_pk"] = str(group.pk) 766 token = Token.objects.create( 767 identifier=slugify(f"service-account-{username}-password"), 768 intent=TokenIntents.INTENT_APP_PASSWORD, 769 user=user, 770 expires=expires, 771 expiring=expiring, 772 ) 773 response["token"] = token.key 774 return Response(response) 775 except IntegrityError as exc: 776 error_msg = str(exc).lower() 777 778 if "unique" in error_msg: 779 return Response( 780 data={ 781 "non_field_errors": [ 782 _("A user/group with these details already exists") 783 ] 784 }, 785 status=400, 786 ) 787 else: 788 LOGGER.warning("Service account creation failed", exc=exc) 789 return Response( 790 data={"non_field_errors": [_("Unable to create user")]}, 791 status=400, 792 ) 793 except (ValueError, TypeError) as exc: 794 LOGGER.error("Unexpected error during service account creation", exc=exc) 795 return Response( 796 data={"non_field_errors": [_("Unknown error occurred")]}, 797 status=500, 798 ) 799 800 @extend_schema(responses={200: SessionUserSerializer(many=False)}) 801 @action( 802 url_path="me", 803 url_name="me", 804 detail=False, 805 pagination_class=None, 806 filter_backends=[], 807 ) 808 def user_me(self, request: Request) -> Response: 809 """Get information about current user""" 810 context = {"request": request} 811 serializer = SessionUserSerializer( 812 data={"user": UserSelfSerializer(instance=request.user, context=context).data} 813 ) 814 if SESSION_KEY_IMPERSONATE_USER in request._request.session: 815 serializer.initial_data["original"] = UserSelfSerializer( 816 instance=request._request.session[SESSION_KEY_IMPERSONATE_ORIGINAL_USER], 817 context=context, 818 ).data 819 self.request.session.modified = True 820 return Response(serializer.initial_data) 821 822 def _update_session_hash_after_password_change(self, request: Request, user: User): 823 if user.pk == request.user.pk and SESSION_KEY_IMPERSONATE_USER not in self.request.session: 824 LOGGER.debug("Updating session hash after password change") 825 update_session_auth_hash(self.request, user) 826 827 @permission_required("authentik_core.reset_user_password") 828 @extend_schema( 829 request=UserPasswordSetSerializer, 830 responses={ 831 204: OpenApiResponse(description="Successfully changed password"), 832 400: OpenApiResponse(description="Bad request"), 833 }, 834 ) 835 @action( 836 detail=True, 837 methods=["POST"], 838 permission_classes=[IsAuthenticated], 839 ) 840 @validate(UserPasswordSetSerializer) 841 def set_password(self, request: Request, pk: int, body: UserPasswordSetSerializer) -> Response: 842 """Set password for user""" 843 user: User = self.get_object() 844 try: 845 user.set_password(body.validated_data["password"], request=request) 846 user.save() 847 except (ValidationError, IntegrityError) as exc: 848 LOGGER.debug("Failed to set password", exc=exc) 849 return Response(status=400) 850 self._update_session_hash_after_password_change(request, user) 851 return Response(status=204) 852 853 @permission_required("authentik_core.reset_user_password") 854 @extend_schema( 855 request=UserPasswordHashSetSerializer, 856 responses={ 857 204: OpenApiResponse(description="Successfully changed password"), 858 400: OpenApiResponse(description="Bad request"), 859 }, 860 ) 861 @action( 862 detail=True, 863 methods=["POST"], 864 permission_classes=[IsAuthenticated], 865 ) 866 @validate(UserPasswordHashSetSerializer) 867 def set_password_hash( 868 self, request: Request, pk: int, body: UserPasswordHashSetSerializer 869 ) -> Response: 870 """Set a user's password from a pre-hashed Django password value. 871 872 Submit the Django password hash in the shared ``password`` request field. 873 874 This updates authentik's local password verifier only. It does not attempt 875 to propagate the password change to LDAP or Kerberos because no raw password 876 is available from the request payload. 877 """ 878 user: User = self.get_object() 879 try: 880 user.set_password_from_hash(body.validated_data["password"], request=request) 881 user.save() 882 except ValueError as exc: 883 LOGGER.debug("Failed to set password hash", exc=exc) 884 return Response(data={"password": [INVALID_PASSWORD_HASH_MESSAGE]}, status=400) 885 except (ValidationError, IntegrityError) as exc: 886 LOGGER.debug("Failed to set password hash", exc=exc) 887 return Response(status=400) 888 self._update_session_hash_after_password_change(request, user) 889 return Response(status=204) 890 891 @permission_required("authentik_core.reset_user_password") 892 @extend_schema( 893 request=UserRecoveryLinkSerializer, 894 responses={ 895 "200": LinkSerializer(many=False), 896 }, 897 ) 898 @action(detail=True, pagination_class=None, filter_backends=[], methods=["POST"]) 899 @validate(UserRecoveryLinkSerializer) 900 def recovery(self, request: Request, pk: int, body: UserRecoveryLinkSerializer) -> Response: 901 """Create a temporary link that a user can use to recover their account""" 902 link, _ = self._create_recovery_link( 903 token_duration=body.validated_data.get("token_duration") 904 ) 905 return Response({"link": link}) 906 907 @permission_required("authentik_core.reset_user_password") 908 @extend_schema( 909 request=UserRecoveryEmailSerializer, 910 responses={ 911 "204": OpenApiResponse(description="Successfully sent recover email"), 912 }, 913 ) 914 @action(detail=True, pagination_class=None, filter_backends=[], methods=["POST"]) 915 @validate(UserRecoveryEmailSerializer) 916 def recovery_email( 917 self, request: Request, pk: int, body: UserRecoveryEmailSerializer 918 ) -> Response: 919 """Send an email with a temporary link that a user can use to recover their account""" 920 email_error_message = _("User does not have an email address set.") 921 stage_error_message = _("Email stage not found.") 922 user: User = self.get_object() 923 if not user.email: 924 LOGGER.debug("User doesn't have an email address") 925 raise ValidationError({"non_field_errors": email_error_message}) 926 if not (stage := EmailStage.objects.filter(pk=body.validated_data["email_stage"]).first()): 927 LOGGER.debug("Email stage does not exist") 928 raise ValidationError({"non_field_errors": stage_error_message}) 929 if not request.user.has_perm("authentik_stages_email.view_emailstage", stage): 930 LOGGER.debug("User has no view access to email stage") 931 raise ValidationError({"non_field_errors": stage_error_message}) 932 link, token = self._create_recovery_link( 933 token_duration=body.validated_data.get("token_duration"), for_email=True 934 ) 935 message = TemplateEmailMessage( 936 subject=_(stage.subject), 937 to=[(user.name, user.email)], 938 template_name=stage.template, 939 language=user.locale(request), 940 template_context={ 941 "url": link, 942 "user": user, 943 "expires": token.expires, 944 }, 945 ) 946 send_mails(stage, message) 947 return Response(status=204) 948 949 @permission_required("authentik_core.impersonate") 950 @extend_schema( 951 request=inline_serializer( 952 "ImpersonationSerializer", 953 { 954 "reason": CharField(required=True), 955 }, 956 ), 957 responses={ 958 204: OpenApiResponse(description="Successfully started impersonation"), 959 }, 960 ) 961 @action(detail=True, methods=["POST"], permission_classes=[IsAuthenticated]) 962 def impersonate(self, request: Request, pk: int) -> Response: 963 """Impersonate a user""" 964 if not request.tenant.impersonation: 965 LOGGER.debug("User attempted to impersonate", user=request.user) 966 return Response(status=401) 967 user_to_be = self.get_object() 968 reason = request.data.get("reason", "") 969 # Check both object-level perms and global perms 970 if not request.user.has_perm( 971 "authentik_core.impersonate", user_to_be 972 ) and not request.user.has_perm("authentik_core.impersonate"): 973 LOGGER.debug( 974 "User attempted to impersonate without permissions", 975 user=request.user, 976 ) 977 return Response(status=403) 978 if user_to_be.pk == self.request.user.pk: 979 LOGGER.debug("User attempted to impersonate themselves", user=request.user) 980 return Response(status=401) 981 if not reason and request.tenant.impersonation_require_reason: 982 LOGGER.debug( 983 "User attempted to impersonate without providing a reason", 984 user=request.user, 985 ) 986 raise ValidationError({"reason": _("This field is required.")}) 987 988 request.session[SESSION_KEY_IMPERSONATE_ORIGINAL_USER] = request.user 989 request.session[SESSION_KEY_IMPERSONATE_USER] = user_to_be 990 991 Event.new(EventAction.IMPERSONATION_STARTED, reason=reason).from_http(request, user_to_be) 992 993 return Response(status=204) 994 995 @extend_schema( 996 request=None, 997 responses={ 998 "204": OpenApiResponse(description="Successfully ended impersonation"), 999 }, 1000 ) 1001 @action(detail=False, methods=["GET"]) 1002 def impersonate_end(self, request: Request) -> Response: 1003 """End Impersonation a user""" 1004 if ( 1005 SESSION_KEY_IMPERSONATE_USER not in request.session 1006 or SESSION_KEY_IMPERSONATE_ORIGINAL_USER not in request.session 1007 ): 1008 LOGGER.debug("Can't end impersonation", user=request.user) 1009 return Response(status=204) 1010 1011 original_user = request.session[SESSION_KEY_IMPERSONATE_ORIGINAL_USER] 1012 1013 del request.session[SESSION_KEY_IMPERSONATE_USER] 1014 del request.session[SESSION_KEY_IMPERSONATE_ORIGINAL_USER] 1015 1016 Event.new(EventAction.IMPERSONATION_ENDED).from_http(request, original_user) 1017 1018 return Response(status=204) 1019 1020 @extend_schema( 1021 responses={ 1022 200: inline_serializer( 1023 "UserPathSerializer", 1024 {"paths": ListField(child=CharField(), read_only=True)}, 1025 ) 1026 }, 1027 parameters=[ 1028 OpenApiParameter( 1029 name="search", 1030 location=OpenApiParameter.QUERY, 1031 type=OpenApiTypes.STR, 1032 ) 1033 ], 1034 ) 1035 @action(detail=False, pagination_class=None) 1036 def paths(self, request: Request) -> Response: 1037 """Get all user paths""" 1038 return Response( 1039 data={ 1040 "paths": list( 1041 self.filter_queryset(self.get_queryset()) 1042 .values("path") 1043 .distinct() 1044 .order_by("path") 1045 .values_list("path", flat=True) 1046 ) 1047 } 1048 ) 1049 1050 def partial_update(self, request: Request, *args, **kwargs) -> Response: 1051 response = super().partial_update(request, *args, **kwargs) 1052 instance: User = self.get_object() 1053 if not instance.is_active: 1054 Session.objects.filter(authenticatedsession__user=instance).delete() 1055 LOGGER.debug("Deleted user's sessions", user=instance.username) 1056 return response
User Viewset
628 def get_queryset(self): 629 base_qs = User.objects.all().exclude_anonymous() 630 # Always prefetch groups since group PKs are always serialized. 631 # Use full prefetch when include_groups=true (for groups_obj), ID-only otherwise. 632 if self.serializer_class(context={"request": self.request})._should_include_groups: 633 base_qs = base_qs.prefetch_related("groups") 634 else: 635 base_qs = base_qs.prefetch_related( 636 Prefetch("groups", queryset=Group.objects.all().only("group_uuid")) 637 ) 638 if self.serializer_class(context={"request": self.request})._should_include_roles: 639 base_qs = base_qs.prefetch_related("roles") 640 else: 641 base_qs = base_qs.prefetch_related( 642 Prefetch("roles", queryset=Role.objects.all().only("uuid")) 643 ) 644 # Annotate is_superuser to avoid N+1 query per user 645 base_qs = base_qs.annotate( 646 _annotated_is_superuser=Exists( 647 Group.objects.filter( 648 is_superuser=True, 649 ).filter( 650 Q(users=OuterRef("pk")) | Q(descendant_nodes__descendant__users=OuterRef("pk")) 651 ) 652 ) 653 ) 654 return base_qs
Get the list of items for this view.
This must be an iterable, and may be a queryset.
Defaults to using self.queryset.
This method should always be used rather than accessing self.queryset
directly, as self.queryset gets evaluated only once, and those results
are cached for all subsequent requests.
You may want to override this if you need to provide different querysets depending on the incoming request.
(Eg. return a list of items that is specific to the user)
714 @permission_required(None, ["authentik_core.add_user", "authentik_core.add_token"]) 715 @extend_schema( 716 request=UserServiceAccountSerializer, 717 responses={ 718 200: inline_serializer( 719 "UserServiceAccountResponse", 720 { 721 "username": CharField(required=True), 722 "token": CharField(required=True), 723 "user_uid": CharField(required=True), 724 "user_pk": IntegerField(required=True), 725 "group_pk": CharField(required=False), 726 }, 727 ) 728 }, 729 ) 730 @action( 731 detail=False, 732 methods=["POST"], 733 pagination_class=None, 734 filter_backends=[], 735 ) 736 @validate(UserServiceAccountSerializer) 737 def service_account(self, request: Request, body: UserServiceAccountSerializer) -> Response: 738 """Create a new user account that is marked as a service account""" 739 expires = body.validated_data.get("expires", now() + timedelta(days=360)) 740 741 username = body.validated_data["name"] 742 expiring = body.validated_data["expiring"] 743 with atomic(): 744 try: 745 user: User = User.objects.create( 746 username=username, 747 name=username, 748 type=UserTypes.SERVICE_ACCOUNT, 749 attributes={USER_ATTRIBUTE_TOKEN_EXPIRING: expiring}, 750 path=USER_PATH_SERVICE_ACCOUNT, 751 ) 752 user.set_unusable_password() 753 user.save() 754 755 response = { 756 "username": user.username, 757 "user_uid": user.uid, 758 "user_pk": user.pk, 759 } 760 if body.validated_data["create_group"] and self.request.user.has_perm( 761 "authentik_core.add_group" 762 ): 763 group = Group.objects.create(name=username) 764 group.users.add(user) 765 response["group_pk"] = str(group.pk) 766 token = Token.objects.create( 767 identifier=slugify(f"service-account-{username}-password"), 768 intent=TokenIntents.INTENT_APP_PASSWORD, 769 user=user, 770 expires=expires, 771 expiring=expiring, 772 ) 773 response["token"] = token.key 774 return Response(response) 775 except IntegrityError as exc: 776 error_msg = str(exc).lower() 777 778 if "unique" in error_msg: 779 return Response( 780 data={ 781 "non_field_errors": [ 782 _("A user/group with these details already exists") 783 ] 784 }, 785 status=400, 786 ) 787 else: 788 LOGGER.warning("Service account creation failed", exc=exc) 789 return Response( 790 data={"non_field_errors": [_("Unable to create user")]}, 791 status=400, 792 ) 793 except (ValueError, TypeError) as exc: 794 LOGGER.error("Unexpected error during service account creation", exc=exc) 795 return Response( 796 data={"non_field_errors": [_("Unknown error occurred")]}, 797 status=500, 798 )
Create a new user account that is marked as a service account
800 @extend_schema(responses={200: SessionUserSerializer(many=False)}) 801 @action( 802 url_path="me", 803 url_name="me", 804 detail=False, 805 pagination_class=None, 806 filter_backends=[], 807 ) 808 def user_me(self, request: Request) -> Response: 809 """Get information about current user""" 810 context = {"request": request} 811 serializer = SessionUserSerializer( 812 data={"user": UserSelfSerializer(instance=request.user, context=context).data} 813 ) 814 if SESSION_KEY_IMPERSONATE_USER in request._request.session: 815 serializer.initial_data["original"] = UserSelfSerializer( 816 instance=request._request.session[SESSION_KEY_IMPERSONATE_ORIGINAL_USER], 817 context=context, 818 ).data 819 self.request.session.modified = True 820 return Response(serializer.initial_data)
Get information about current user
827 @permission_required("authentik_core.reset_user_password") 828 @extend_schema( 829 request=UserPasswordSetSerializer, 830 responses={ 831 204: OpenApiResponse(description="Successfully changed password"), 832 400: OpenApiResponse(description="Bad request"), 833 }, 834 ) 835 @action( 836 detail=True, 837 methods=["POST"], 838 permission_classes=[IsAuthenticated], 839 ) 840 @validate(UserPasswordSetSerializer) 841 def set_password(self, request: Request, pk: int, body: UserPasswordSetSerializer) -> Response: 842 """Set password for user""" 843 user: User = self.get_object() 844 try: 845 user.set_password(body.validated_data["password"], request=request) 846 user.save() 847 except (ValidationError, IntegrityError) as exc: 848 LOGGER.debug("Failed to set password", exc=exc) 849 return Response(status=400) 850 self._update_session_hash_after_password_change(request, user) 851 return Response(status=204)
Set password for user
853 @permission_required("authentik_core.reset_user_password") 854 @extend_schema( 855 request=UserPasswordHashSetSerializer, 856 responses={ 857 204: OpenApiResponse(description="Successfully changed password"), 858 400: OpenApiResponse(description="Bad request"), 859 }, 860 ) 861 @action( 862 detail=True, 863 methods=["POST"], 864 permission_classes=[IsAuthenticated], 865 ) 866 @validate(UserPasswordHashSetSerializer) 867 def set_password_hash( 868 self, request: Request, pk: int, body: UserPasswordHashSetSerializer 869 ) -> Response: 870 """Set a user's password from a pre-hashed Django password value. 871 872 Submit the Django password hash in the shared ``password`` request field. 873 874 This updates authentik's local password verifier only. It does not attempt 875 to propagate the password change to LDAP or Kerberos because no raw password 876 is available from the request payload. 877 """ 878 user: User = self.get_object() 879 try: 880 user.set_password_from_hash(body.validated_data["password"], request=request) 881 user.save() 882 except ValueError as exc: 883 LOGGER.debug("Failed to set password hash", exc=exc) 884 return Response(data={"password": [INVALID_PASSWORD_HASH_MESSAGE]}, status=400) 885 except (ValidationError, IntegrityError) as exc: 886 LOGGER.debug("Failed to set password hash", exc=exc) 887 return Response(status=400) 888 self._update_session_hash_after_password_change(request, user) 889 return Response(status=204)
Set a user's password from a pre-hashed Django password value.
Submit the Django password hash in the shared password request field.
This updates authentik's local password verifier only. It does not attempt to propagate the password change to LDAP or Kerberos because no raw password is available from the request payload.
891 @permission_required("authentik_core.reset_user_password") 892 @extend_schema( 893 request=UserRecoveryLinkSerializer, 894 responses={ 895 "200": LinkSerializer(many=False), 896 }, 897 ) 898 @action(detail=True, pagination_class=None, filter_backends=[], methods=["POST"]) 899 @validate(UserRecoveryLinkSerializer) 900 def recovery(self, request: Request, pk: int, body: UserRecoveryLinkSerializer) -> Response: 901 """Create a temporary link that a user can use to recover their account""" 902 link, _ = self._create_recovery_link( 903 token_duration=body.validated_data.get("token_duration") 904 ) 905 return Response({"link": link})
Create a temporary link that a user can use to recover their account
907 @permission_required("authentik_core.reset_user_password") 908 @extend_schema( 909 request=UserRecoveryEmailSerializer, 910 responses={ 911 "204": OpenApiResponse(description="Successfully sent recover email"), 912 }, 913 ) 914 @action(detail=True, pagination_class=None, filter_backends=[], methods=["POST"]) 915 @validate(UserRecoveryEmailSerializer) 916 def recovery_email( 917 self, request: Request, pk: int, body: UserRecoveryEmailSerializer 918 ) -> Response: 919 """Send an email with a temporary link that a user can use to recover their account""" 920 email_error_message = _("User does not have an email address set.") 921 stage_error_message = _("Email stage not found.") 922 user: User = self.get_object() 923 if not user.email: 924 LOGGER.debug("User doesn't have an email address") 925 raise ValidationError({"non_field_errors": email_error_message}) 926 if not (stage := EmailStage.objects.filter(pk=body.validated_data["email_stage"]).first()): 927 LOGGER.debug("Email stage does not exist") 928 raise ValidationError({"non_field_errors": stage_error_message}) 929 if not request.user.has_perm("authentik_stages_email.view_emailstage", stage): 930 LOGGER.debug("User has no view access to email stage") 931 raise ValidationError({"non_field_errors": stage_error_message}) 932 link, token = self._create_recovery_link( 933 token_duration=body.validated_data.get("token_duration"), for_email=True 934 ) 935 message = TemplateEmailMessage( 936 subject=_(stage.subject), 937 to=[(user.name, user.email)], 938 template_name=stage.template, 939 language=user.locale(request), 940 template_context={ 941 "url": link, 942 "user": user, 943 "expires": token.expires, 944 }, 945 ) 946 send_mails(stage, message) 947 return Response(status=204)
Send an email with a temporary link that a user can use to recover their account
949 @permission_required("authentik_core.impersonate") 950 @extend_schema( 951 request=inline_serializer( 952 "ImpersonationSerializer", 953 { 954 "reason": CharField(required=True), 955 }, 956 ), 957 responses={ 958 204: OpenApiResponse(description="Successfully started impersonation"), 959 }, 960 ) 961 @action(detail=True, methods=["POST"], permission_classes=[IsAuthenticated]) 962 def impersonate(self, request: Request, pk: int) -> Response: 963 """Impersonate a user""" 964 if not request.tenant.impersonation: 965 LOGGER.debug("User attempted to impersonate", user=request.user) 966 return Response(status=401) 967 user_to_be = self.get_object() 968 reason = request.data.get("reason", "") 969 # Check both object-level perms and global perms 970 if not request.user.has_perm( 971 "authentik_core.impersonate", user_to_be 972 ) and not request.user.has_perm("authentik_core.impersonate"): 973 LOGGER.debug( 974 "User attempted to impersonate without permissions", 975 user=request.user, 976 ) 977 return Response(status=403) 978 if user_to_be.pk == self.request.user.pk: 979 LOGGER.debug("User attempted to impersonate themselves", user=request.user) 980 return Response(status=401) 981 if not reason and request.tenant.impersonation_require_reason: 982 LOGGER.debug( 983 "User attempted to impersonate without providing a reason", 984 user=request.user, 985 ) 986 raise ValidationError({"reason": _("This field is required.")}) 987 988 request.session[SESSION_KEY_IMPERSONATE_ORIGINAL_USER] = request.user 989 request.session[SESSION_KEY_IMPERSONATE_USER] = user_to_be 990 991 Event.new(EventAction.IMPERSONATION_STARTED, reason=reason).from_http(request, user_to_be) 992 993 return Response(status=204)
Impersonate a user
995 @extend_schema( 996 request=None, 997 responses={ 998 "204": OpenApiResponse(description="Successfully ended impersonation"), 999 }, 1000 ) 1001 @action(detail=False, methods=["GET"]) 1002 def impersonate_end(self, request: Request) -> Response: 1003 """End Impersonation a user""" 1004 if ( 1005 SESSION_KEY_IMPERSONATE_USER not in request.session 1006 or SESSION_KEY_IMPERSONATE_ORIGINAL_USER not in request.session 1007 ): 1008 LOGGER.debug("Can't end impersonation", user=request.user) 1009 return Response(status=204) 1010 1011 original_user = request.session[SESSION_KEY_IMPERSONATE_ORIGINAL_USER] 1012 1013 del request.session[SESSION_KEY_IMPERSONATE_USER] 1014 del request.session[SESSION_KEY_IMPERSONATE_ORIGINAL_USER] 1015 1016 Event.new(EventAction.IMPERSONATION_ENDED).from_http(request, original_user) 1017 1018 return Response(status=204)
End Impersonation a user
1020 @extend_schema( 1021 responses={ 1022 200: inline_serializer( 1023 "UserPathSerializer", 1024 {"paths": ListField(child=CharField(), read_only=True)}, 1025 ) 1026 }, 1027 parameters=[ 1028 OpenApiParameter( 1029 name="search", 1030 location=OpenApiParameter.QUERY, 1031 type=OpenApiTypes.STR, 1032 ) 1033 ], 1034 ) 1035 @action(detail=False, pagination_class=None) 1036 def paths(self, request: Request) -> Response: 1037 """Get all user paths""" 1038 return Response( 1039 data={ 1040 "paths": list( 1041 self.filter_queryset(self.get_queryset()) 1042 .values("path") 1043 .distinct() 1044 .order_by("path") 1045 .values_list("path", flat=True) 1046 ) 1047 } 1048 )
Get all user paths
1050 def partial_update(self, request: Request, *args, **kwargs) -> Response: 1051 response = super().partial_update(request, *args, **kwargs) 1052 instance: User = self.get_object() 1053 if not instance.is_active: 1054 Session.objects.filter(authenticatedsession__user=instance).delete() 1055 LOGGER.debug("Deleted user's sessions", user=instance.username) 1056 return response