authentik.core.api.users
User API Views
1"""User API Views""" 2 3from datetime import timedelta 4from json import loads 5from typing import Any 6 7from django.contrib.auth import update_session_auth_hash 8from django.contrib.auth.models import AnonymousUser, Permission 9from django.db.transaction import atomic 10from django.db.utils import IntegrityError 11from django.urls import reverse_lazy 12from django.utils.http import urlencode 13from django.utils.text import slugify 14from django.utils.timezone import now 15from django.utils.translation import gettext as _ 16from django_filters.filters import ( 17 BooleanFilter, 18 CharFilter, 19 IsoDateTimeFilter, 20 ModelMultipleChoiceFilter, 21 MultipleChoiceFilter, 22 UUIDFilter, 23) 24from django_filters.filterset import FilterSet 25from drf_spectacular.types import OpenApiTypes 26from drf_spectacular.utils import ( 27 OpenApiParameter, 28 OpenApiResponse, 29 extend_schema, 30 extend_schema_field, 31 inline_serializer, 32) 33from rest_framework.authentication import SessionAuthentication 34from rest_framework.decorators import action 35from rest_framework.exceptions import ValidationError 36from rest_framework.fields import ( 37 BooleanField, 38 CharField, 39 ChoiceField, 40 DateTimeField, 41 IntegerField, 42 ListField, 43 SerializerMethodField, 44 UUIDField, 45) 46from rest_framework.permissions import IsAuthenticated 47from rest_framework.request import Request 48from rest_framework.response import Response 49from rest_framework.serializers import ( 50 ListSerializer, 51 PrimaryKeyRelatedField, 52) 53from rest_framework.validators import UniqueValidator 54from rest_framework.viewsets import ModelViewSet 55from structlog.stdlib import get_logger 56 57from authentik.api.authentication import TokenAuthentication 58from authentik.api.validation import validate 59from authentik.blueprints.v1.importer import SERIALIZER_CONTEXT_BLUEPRINT 60from authentik.brands.models import Brand 61from authentik.core.api.used_by import UsedByMixin 62from authentik.core.api.utils import ( 63 JSONDictField, 64 LinkSerializer, 65 ModelSerializer, 66 PassiveSerializer, 67) 68from authentik.core.middleware import ( 69 SESSION_KEY_IMPERSONATE_ORIGINAL_USER, 70 SESSION_KEY_IMPERSONATE_USER, 71) 72from authentik.core.models import ( 73 USER_ATTRIBUTE_TOKEN_EXPIRING, 74 USER_PATH_SERVICE_ACCOUNT, 75 USERNAME_MAX_LENGTH, 76 Group, 77 Session, 78 Token, 79 TokenIntents, 80 User, 81 UserTypes, 82 default_token_duration, 83) 84from authentik.endpoints.connectors.agent.auth import AgentAuth 85from authentik.events.models import Event, EventAction 86from authentik.flows.exceptions import FlowNonApplicableException 87from authentik.flows.models import FlowToken 88from authentik.flows.planner import PLAN_CONTEXT_PENDING_USER, FlowPlanner 89from authentik.flows.views.executor import QS_KEY_TOKEN 90from authentik.lib.avatars import get_avatar 91from authentik.lib.utils.reflection import ConditionalInheritance 92from authentik.lib.utils.time import timedelta_from_string, timedelta_string_validator 93from authentik.rbac.api.roles import RoleSerializer 94from authentik.rbac.decorators import permission_required 95from authentik.rbac.models import Role, get_permission_choices 96from authentik.stages.email.flow import pickle_flow_token_for_email 97from authentik.stages.email.models import EmailStage 98from authentik.stages.email.tasks import send_mails 99from authentik.stages.email.utils import TemplateEmailMessage 100 101LOGGER = get_logger() 102 103 104class ParamUserSerializer(PassiveSerializer): 105 """Partial serializer for query parameters to select a user""" 106 107 user = PrimaryKeyRelatedField(queryset=User.objects.all().exclude_anonymous(), required=False) 108 109 110class PartialGroupSerializer(ModelSerializer): 111 """Partial Group Serializer, does not include child relations.""" 112 113 attributes = JSONDictField(required=False) 114 115 class Meta: 116 model = Group 117 fields = [ 118 "pk", 119 "num_pk", 120 "name", 121 "is_superuser", 122 "attributes", 123 ] 124 125 126class UserSerializer(ModelSerializer): 127 """User Serializer""" 128 129 is_superuser = BooleanField(read_only=True) 130 avatar = SerializerMethodField() 131 attributes = JSONDictField(required=False) 132 groups = PrimaryKeyRelatedField( 133 allow_empty=True, 134 many=True, 135 queryset=Group.objects.all().order_by("name"), 136 default=list, 137 ) 138 groups_obj = SerializerMethodField(allow_null=True) 139 roles = PrimaryKeyRelatedField( 140 allow_empty=True, 141 many=True, 142 queryset=Role.objects.all().order_by("name"), 143 default=list, 144 ) 145 roles_obj = SerializerMethodField(allow_null=True) 146 uid = CharField(read_only=True) 147 username = CharField( 148 max_length=USERNAME_MAX_LENGTH, 149 validators=[UniqueValidator(queryset=User.objects.all().order_by("username"))], 150 ) 151 152 @property 153 def _should_include_groups(self) -> bool: 154 request: Request = self.context.get("request", None) 155 if not request: 156 return True 157 return str(request.query_params.get("include_groups", "true")).lower() == "true" 158 159 @property 160 def _should_include_roles(self) -> bool: 161 request: Request = self.context.get("request", None) 162 if not request: 163 return True 164 return str(request.query_params.get("include_roles", "true")).lower() == "true" 165 166 @extend_schema_field(PartialGroupSerializer(many=True)) 167 def get_groups_obj(self, instance: User) -> list[PartialGroupSerializer] | None: 168 if not self._should_include_groups: 169 return None 170 return PartialGroupSerializer(instance.groups, many=True).data 171 172 @extend_schema_field(RoleSerializer(many=True)) 173 def get_roles_obj(self, instance: User) -> list[RoleSerializer] | None: 174 if not self._should_include_roles: 175 return None 176 return RoleSerializer(instance.roles, many=True).data 177 178 def __init__(self, *args, **kwargs): 179 super().__init__(*args, **kwargs) 180 if SERIALIZER_CONTEXT_BLUEPRINT in self.context: 181 self.fields["password"] = CharField(required=False, allow_null=True) 182 self.fields["permissions"] = ListField( 183 required=False, 184 child=ChoiceField(choices=get_permission_choices()), 185 ) 186 187 def create(self, validated_data: dict) -> User: 188 """If this serializer is used in the blueprint context, we allow for 189 directly setting a password. However should be done via the `set_password` 190 method instead of directly setting it like rest_framework.""" 191 password = validated_data.pop("password", None) 192 perms_qs = Permission.objects.filter( 193 codename__in=[x.split(".")[1] for x in validated_data.pop("permissions", [])] 194 ).values_list("content_type__app_label", "codename") 195 perms_list = [f"{ct}.{name}" for ct, name in list(perms_qs)] 196 instance: User = super().create(validated_data) 197 self._set_password(instance, password) 198 instance.assign_perms_to_managed_role(perms_list) 199 return instance 200 201 def update(self, instance: User, validated_data: dict) -> User: 202 """Same as `create` above, set the password directly if we're in a blueprint 203 context""" 204 password = validated_data.pop("password", None) 205 perms_qs = Permission.objects.filter( 206 codename__in=[x.split(".")[1] for x in validated_data.pop("permissions", [])] 207 ).values_list("content_type__app_label", "codename") 208 perms_list = [f"{ct}.{name}" for ct, name in list(perms_qs)] 209 instance = super().update(instance, validated_data) 210 self._set_password(instance, password) 211 instance.assign_perms_to_managed_role(perms_list) 212 return instance 213 214 def _set_password(self, instance: User, password: str | None): 215 """Set password of user if we're in a blueprint context, and if it's an empty 216 string then use an unusable password""" 217 if SERIALIZER_CONTEXT_BLUEPRINT in self.context and password: 218 instance.set_password(password) 219 instance.save() 220 if len(instance.password) == 0: 221 instance.set_unusable_password() 222 instance.save() 223 224 def get_avatar(self, user: User) -> str: 225 """User's avatar, either a http/https URL or a data URI""" 226 return get_avatar(user, self.context.get("request")) 227 228 def validate_path(self, path: str) -> str: 229 """Validate path""" 230 if path[:1] == "/" or path[-1] == "/": 231 raise ValidationError(_("No leading or trailing slashes allowed.")) 232 for segment in path.split("/"): 233 if segment == "": 234 raise ValidationError(_("No empty segments in user path allowed.")) 235 return path 236 237 def validate_type(self, user_type: str) -> str: 238 """Validate user type, internal_service_account is an internal value""" 239 if ( 240 self.instance 241 and self.instance.type == UserTypes.INTERNAL_SERVICE_ACCOUNT 242 and user_type != UserTypes.INTERNAL_SERVICE_ACCOUNT.value 243 ): 244 raise ValidationError(_("Can't change internal service account to other user type.")) 245 if not self.instance and user_type == UserTypes.INTERNAL_SERVICE_ACCOUNT.value: 246 raise ValidationError(_("Setting a user to internal service account is not allowed.")) 247 return user_type 248 249 def validate(self, attrs: dict) -> dict: 250 if self.instance and self.instance.type == UserTypes.INTERNAL_SERVICE_ACCOUNT: 251 raise ValidationError(_("Can't modify internal service account users")) 252 return super().validate(attrs) 253 254 class Meta: 255 model = User 256 fields = [ 257 "pk", 258 "username", 259 "name", 260 "is_active", 261 "last_login", 262 "date_joined", 263 "is_superuser", 264 "groups", 265 "groups_obj", 266 "roles", 267 "roles_obj", 268 "email", 269 "avatar", 270 "attributes", 271 "uid", 272 "path", 273 "type", 274 "uuid", 275 "password_change_date", 276 "last_updated", 277 ] 278 extra_kwargs = { 279 "name": {"allow_blank": True}, 280 "date_joined": {"read_only": True}, 281 "password_change_date": {"read_only": True}, 282 } 283 284 285class UserSelfSerializer(ModelSerializer): 286 """User Serializer for information a user can retrieve about themselves""" 287 288 is_superuser = BooleanField(read_only=True) 289 avatar = SerializerMethodField() 290 groups = SerializerMethodField() 291 roles = SerializerMethodField() 292 uid = CharField(read_only=True) 293 settings = SerializerMethodField() 294 system_permissions = SerializerMethodField() 295 296 def get_avatar(self, user: User) -> str: 297 """User's avatar, either a http/https URL or a data URI""" 298 return get_avatar(user, self.context.get("request")) 299 300 @extend_schema_field( 301 ListSerializer( 302 child=inline_serializer( 303 "UserSelfGroups", 304 { 305 "name": CharField(read_only=True), 306 "pk": CharField(read_only=True), 307 }, 308 ) 309 ) 310 ) 311 def get_groups(self, _: User): 312 """Return only the group names a user is member of""" 313 for group in self.instance.all_groups().order_by("name"): 314 yield { 315 "name": group.name, 316 "pk": group.pk, 317 } 318 319 @extend_schema_field( 320 ListSerializer( 321 child=inline_serializer( 322 "UserSelfRoles", 323 { 324 "name": CharField(read_only=True), 325 "pk": CharField(read_only=True), 326 }, 327 ) 328 ) 329 ) 330 def get_roles(self, _: User): 331 """Return only the roles a user is member of""" 332 for role in self.instance.all_roles().order_by("name"): 333 yield { 334 "name": role.name, 335 "pk": role.pk, 336 } 337 338 def get_settings(self, user: User) -> dict[str, Any]: 339 """Get user settings with brand and group settings applied""" 340 return user.group_attributes(self._context["request"]).get("settings", {}) 341 342 def get_system_permissions(self, user: User) -> list[str]: 343 """Get all system permissions assigned to the user""" 344 return list( 345 x.split(".", maxsplit=1)[1] 346 for x in user.get_all_permissions() 347 if x.startswith("authentik_rbac") 348 ) 349 350 class Meta: 351 model = User 352 fields = [ 353 "pk", 354 "username", 355 "name", 356 "is_active", 357 "is_superuser", 358 "groups", 359 "roles", 360 "email", 361 "avatar", 362 "uid", 363 "settings", 364 "type", 365 "system_permissions", 366 ] 367 extra_kwargs = { 368 "is_active": {"read_only": True}, 369 "name": {"allow_blank": True}, 370 } 371 372 373class SessionUserSerializer(PassiveSerializer): 374 """Response for the /user/me endpoint, returns the currently active user (as `user` property) 375 and, if this user is being impersonated, the original user in the `original` property. 376 """ 377 378 user = UserSelfSerializer() 379 original = UserSelfSerializer(required=False) 380 381 382class UserPasswordSetSerializer(PassiveSerializer): 383 """Payload to set a users' password directly""" 384 385 password = CharField(required=True) 386 387 388class UserServiceAccountSerializer(PassiveSerializer): 389 """Payload to create a service account""" 390 391 name = CharField( 392 required=True, 393 validators=[UniqueValidator(queryset=User.objects.all().order_by("username"))], 394 ) 395 create_group = BooleanField(default=False) 396 expiring = BooleanField(default=True) 397 expires = DateTimeField( 398 required=False, 399 help_text="If not provided, valid for 360 days", 400 ) 401 402 403class UserRecoveryLinkSerializer(PassiveSerializer): 404 """Payload to create a recovery link""" 405 406 token_duration = CharField(required=False) 407 408 409class UserRecoveryEmailSerializer(UserRecoveryLinkSerializer): 410 """Payload to create and email a recovery link""" 411 412 email_stage = UUIDField() 413 414 415class UsersFilter(FilterSet): 416 """Filter for users""" 417 418 attributes = CharFilter( 419 field_name="attributes", 420 lookup_expr="", 421 label="Attributes", 422 method="filter_attributes", 423 ) 424 425 date_joined__lt = IsoDateTimeFilter(field_name="date_joined", lookup_expr="lt") 426 date_joined = IsoDateTimeFilter(field_name="date_joined") 427 date_joined__gt = IsoDateTimeFilter(field_name="date_joined", lookup_expr="gt") 428 429 last_updated__lt = IsoDateTimeFilter(field_name="last_updated", lookup_expr="lt") 430 last_updated = IsoDateTimeFilter(field_name="last_updated") 431 last_updated__gt = IsoDateTimeFilter(field_name="last_updated", lookup_expr="gt") 432 433 last_login__lt = IsoDateTimeFilter(field_name="last_login", lookup_expr="lt") 434 last_login = IsoDateTimeFilter(field_name="last_login") 435 last_login__gt = IsoDateTimeFilter(field_name="last_login", lookup_expr="gt") 436 last_login__isnull = BooleanFilter(field_name="last_login", lookup_expr="isnull") 437 438 is_superuser = BooleanFilter(field_name="groups", method="filter_is_superuser") 439 uuid = UUIDFilter(field_name="uuid") 440 441 path = CharFilter(field_name="path") 442 path_startswith = CharFilter(field_name="path", lookup_expr="startswith") 443 444 type = MultipleChoiceFilter(choices=UserTypes.choices, field_name="type") 445 446 groups_by_name = ModelMultipleChoiceFilter( 447 field_name="groups__name", 448 to_field_name="name", 449 queryset=Group.objects.all().order_by("name"), 450 ) 451 groups_by_pk = ModelMultipleChoiceFilter( 452 field_name="groups", 453 queryset=Group.objects.all().order_by("name"), 454 ) 455 456 roles_by_name = ModelMultipleChoiceFilter( 457 field_name="roles__name", 458 to_field_name="name", 459 queryset=Role.objects.all().order_by("name"), 460 ) 461 roles_by_pk = ModelMultipleChoiceFilter( 462 field_name="roles", 463 queryset=Role.objects.all().order_by("name"), 464 ) 465 466 def filter_is_superuser(self, queryset, name, value): 467 if value: 468 return queryset.filter(groups__is_superuser=True).distinct() 469 return queryset.exclude(groups__is_superuser=True).distinct() 470 471 def filter_attributes(self, queryset, name, value): 472 """Filter attributes by query args""" 473 try: 474 value = loads(value) 475 except ValueError: 476 raise ValidationError(_("filter: failed to parse JSON")) from None 477 if not isinstance(value, dict): 478 raise ValidationError(_("filter: value must be key:value mapping")) 479 qs = {} 480 for key, _value in value.items(): 481 qs[f"attributes__{key}"] = _value 482 try: 483 __ = len(queryset.filter(**qs)) 484 return queryset.filter(**qs) 485 except ValueError: 486 return queryset 487 488 class Meta: 489 model = User 490 fields = [ 491 "username", 492 "email", 493 "date_joined", 494 "last_updated", 495 "last_login", 496 "name", 497 "is_active", 498 "is_superuser", 499 "attributes", 500 "groups_by_name", 501 "groups_by_pk", 502 "roles_by_name", 503 "roles_by_pk", 504 "type", 505 ] 506 507 508class UserViewSet( 509 ConditionalInheritance("authentik.enterprise.reports.api.reports.ExportMixin"), 510 UsedByMixin, 511 ModelViewSet, 512): 513 """User Viewset""" 514 515 queryset = User.objects.none() 516 ordering = ["username", "date_joined", "last_updated", "last_login"] 517 serializer_class = UserSerializer 518 filterset_class = UsersFilter 519 search_fields = ["email", "name", "uuid", "username"] 520 authentication_classes = [ 521 TokenAuthentication, 522 SessionAuthentication, 523 AgentAuth, 524 ] 525 526 def get_ql_fields(self): 527 from djangoql.schema import BoolField, StrField 528 529 from authentik.enterprise.search.fields import ( 530 ChoiceSearchField, 531 JSONSearchField, 532 ) 533 534 return [ 535 StrField(User, "username"), 536 StrField(User, "name"), 537 StrField(User, "email"), 538 StrField(User, "path"), 539 BoolField(User, "is_active", nullable=True), 540 ChoiceSearchField(User, "type"), 541 JSONSearchField(User, "attributes"), 542 ] 543 544 def get_queryset(self): 545 base_qs = User.objects.all().exclude_anonymous() 546 if self.serializer_class(context={"request": self.request})._should_include_groups: 547 base_qs = base_qs.prefetch_related("groups") 548 if self.serializer_class(context={"request": self.request})._should_include_roles: 549 base_qs = base_qs.prefetch_related("roles") 550 return base_qs 551 552 @extend_schema( 553 parameters=[ 554 OpenApiParameter("include_groups", bool, default=True), 555 OpenApiParameter("include_roles", bool, default=True), 556 ] 557 ) 558 def list(self, request, *args, **kwargs): 559 return super().list(request, *args, **kwargs) 560 561 def _create_recovery_link( 562 self, token_duration: str | None, for_email=False 563 ) -> tuple[str, Token]: 564 """Create a recovery link (when the current brand has a recovery flow set), 565 that can either be shown to an admin or sent to the user directly""" 566 brand: Brand = self.request.brand 567 # Check that there is a recovery flow, if not return an error 568 flow = brand.flow_recovery 569 if not flow: 570 raise ValidationError({"non_field_errors": _("No recovery flow set.")}) 571 user: User = self.get_object() 572 planner = FlowPlanner(flow) 573 planner.allow_empty_flows = True 574 self.request._request.user = AnonymousUser() 575 try: 576 plan = planner.plan( 577 self.request._request, 578 { 579 PLAN_CONTEXT_PENDING_USER: user, 580 }, 581 ) 582 except FlowNonApplicableException: 583 raise ValidationError( 584 {"non_field_errors": _("Recovery flow not applicable to user")} 585 ) from None 586 _plan = FlowToken.pickle(plan) 587 if for_email: 588 _plan = pickle_flow_token_for_email(plan) 589 expires = default_token_duration() 590 if token_duration: 591 timedelta_string_validator(token_duration) 592 expires = now() + timedelta_from_string(token_duration) 593 token, __ = FlowToken.objects.update_or_create( 594 identifier=f"{user.uid}-password-reset", 595 defaults={ 596 "user": user, 597 "flow": flow, 598 "_plan": _plan, 599 "revoke_on_execution": not for_email, 600 "expires": expires, 601 }, 602 ) 603 querystring = urlencode({QS_KEY_TOKEN: token.key}) 604 link = self.request.build_absolute_uri( 605 reverse_lazy("authentik_core:if-flow", kwargs={"flow_slug": flow.slug}) 606 + f"?{querystring}" 607 ) 608 return link, token 609 610 @permission_required(None, ["authentik_core.add_user", "authentik_core.add_token"]) 611 @extend_schema( 612 request=UserServiceAccountSerializer, 613 responses={ 614 200: inline_serializer( 615 "UserServiceAccountResponse", 616 { 617 "username": CharField(required=True), 618 "token": CharField(required=True), 619 "user_uid": CharField(required=True), 620 "user_pk": IntegerField(required=True), 621 "group_pk": CharField(required=False), 622 }, 623 ) 624 }, 625 ) 626 @action( 627 detail=False, 628 methods=["POST"], 629 pagination_class=None, 630 filter_backends=[], 631 ) 632 @validate(UserServiceAccountSerializer) 633 def service_account(self, request: Request, body: UserServiceAccountSerializer) -> Response: 634 """Create a new user account that is marked as a service account""" 635 expires = body.validated_data.get("expires", now() + timedelta(days=360)) 636 637 username = body.validated_data["name"] 638 expiring = body.validated_data["expiring"] 639 with atomic(): 640 try: 641 user: User = User.objects.create( 642 username=username, 643 name=username, 644 type=UserTypes.SERVICE_ACCOUNT, 645 attributes={USER_ATTRIBUTE_TOKEN_EXPIRING: expiring}, 646 path=USER_PATH_SERVICE_ACCOUNT, 647 ) 648 user.set_unusable_password() 649 user.save() 650 651 response = { 652 "username": user.username, 653 "user_uid": user.uid, 654 "user_pk": user.pk, 655 } 656 if body.validated_data["create_group"] and self.request.user.has_perm( 657 "authentik_core.add_group" 658 ): 659 group = Group.objects.create(name=username) 660 group.users.add(user) 661 response["group_pk"] = str(group.pk) 662 token = Token.objects.create( 663 identifier=slugify(f"service-account-{username}-password"), 664 intent=TokenIntents.INTENT_APP_PASSWORD, 665 user=user, 666 expires=expires, 667 expiring=expiring, 668 ) 669 response["token"] = token.key 670 return Response(response) 671 except IntegrityError as exc: 672 error_msg = str(exc).lower() 673 674 if "unique" in error_msg: 675 return Response( 676 data={ 677 "non_field_errors": [ 678 _("A user/group with these details already exists") 679 ] 680 }, 681 status=400, 682 ) 683 else: 684 LOGGER.warning("Service account creation failed", exc=exc) 685 return Response( 686 data={"non_field_errors": [_("Unable to create user")]}, 687 status=400, 688 ) 689 except (ValueError, TypeError) as exc: 690 LOGGER.error("Unexpected error during service account creation", exc=exc) 691 return Response( 692 data={"non_field_errors": [_("Unknown error occurred")]}, 693 status=500, 694 ) 695 696 @extend_schema(responses={200: SessionUserSerializer(many=False)}) 697 @action( 698 url_path="me", 699 url_name="me", 700 detail=False, 701 pagination_class=None, 702 filter_backends=[], 703 ) 704 def user_me(self, request: Request) -> Response: 705 """Get information about current user""" 706 context = {"request": request} 707 serializer = SessionUserSerializer( 708 data={"user": UserSelfSerializer(instance=request.user, context=context).data} 709 ) 710 if SESSION_KEY_IMPERSONATE_USER in request._request.session: 711 serializer.initial_data["original"] = UserSelfSerializer( 712 instance=request._request.session[SESSION_KEY_IMPERSONATE_ORIGINAL_USER], 713 context=context, 714 ).data 715 self.request.session.modified = True 716 return Response(serializer.initial_data) 717 718 @permission_required("authentik_core.reset_user_password") 719 @extend_schema( 720 request=UserPasswordSetSerializer, 721 responses={ 722 204: OpenApiResponse(description="Successfully changed password"), 723 400: OpenApiResponse(description="Bad request"), 724 }, 725 ) 726 @action( 727 detail=True, 728 methods=["POST"], 729 permission_classes=[IsAuthenticated], 730 ) 731 @validate(UserPasswordSetSerializer) 732 def set_password(self, request: Request, pk: int, body: UserPasswordSetSerializer) -> Response: 733 """Set password for user""" 734 user: User = self.get_object() 735 try: 736 user.set_password(body.validated_data["password"], request=request) 737 user.save() 738 except (ValidationError, IntegrityError) as exc: 739 LOGGER.debug("Failed to set password", exc=exc) 740 return Response(status=400) 741 if user.pk == request.user.pk and SESSION_KEY_IMPERSONATE_USER not in self.request.session: 742 LOGGER.debug("Updating session hash after password change") 743 update_session_auth_hash(self.request, user) 744 return Response(status=204) 745 746 @permission_required("authentik_core.reset_user_password") 747 @extend_schema( 748 request=UserRecoveryLinkSerializer, 749 responses={ 750 "200": LinkSerializer(many=False), 751 }, 752 ) 753 @action(detail=True, pagination_class=None, filter_backends=[], methods=["POST"]) 754 @validate(UserRecoveryLinkSerializer) 755 def recovery(self, request: Request, pk: int, body: UserRecoveryLinkSerializer) -> Response: 756 """Create a temporary link that a user can use to recover their account""" 757 link, _ = self._create_recovery_link( 758 token_duration=body.validated_data.get("token_duration") 759 ) 760 return Response({"link": link}) 761 762 @permission_required("authentik_core.reset_user_password") 763 @extend_schema( 764 request=UserRecoveryEmailSerializer, 765 responses={ 766 "204": OpenApiResponse(description="Successfully sent recover email"), 767 }, 768 ) 769 @action(detail=True, pagination_class=None, filter_backends=[], methods=["POST"]) 770 @validate(UserRecoveryEmailSerializer) 771 def recovery_email( 772 self, request: Request, pk: int, body: UserRecoveryEmailSerializer 773 ) -> Response: 774 """Send an email with a temporary link that a user can use to recover their account""" 775 email_error_message = _("User does not have an email address set.") 776 stage_error_message = _("Email stage not found.") 777 user: User = self.get_object() 778 if not user.email: 779 LOGGER.debug("User doesn't have an email address") 780 raise ValidationError({"non_field_errors": email_error_message}) 781 if not (stage := EmailStage.objects.filter(pk=body.validated_data["email_stage"]).first()): 782 LOGGER.debug("Email stage does not exist") 783 raise ValidationError({"non_field_errors": stage_error_message}) 784 if not request.user.has_perm("authentik_stages_email.view_emailstage", stage): 785 LOGGER.debug("User has no view access to email stage") 786 raise ValidationError({"non_field_errors": stage_error_message}) 787 link, token = self._create_recovery_link( 788 token_duration=body.validated_data.get("token_duration"), for_email=True 789 ) 790 message = TemplateEmailMessage( 791 subject=_(stage.subject), 792 to=[(user.name, user.email)], 793 template_name=stage.template, 794 language=user.locale(request), 795 template_context={ 796 "url": link, 797 "user": user, 798 "expires": token.expires, 799 }, 800 ) 801 send_mails(stage, message) 802 return Response(status=204) 803 804 @permission_required("authentik_core.impersonate") 805 @extend_schema( 806 request=inline_serializer( 807 "ImpersonationSerializer", 808 { 809 "reason": CharField(required=True), 810 }, 811 ), 812 responses={ 813 204: OpenApiResponse(description="Successfully started impersonation"), 814 }, 815 ) 816 @action(detail=True, methods=["POST"], permission_classes=[IsAuthenticated]) 817 def impersonate(self, request: Request, pk: int) -> Response: 818 """Impersonate a user""" 819 if not request.tenant.impersonation: 820 LOGGER.debug("User attempted to impersonate", user=request.user) 821 return Response(status=401) 822 user_to_be = self.get_object() 823 reason = request.data.get("reason", "") 824 # Check both object-level perms and global perms 825 if not request.user.has_perm( 826 "authentik_core.impersonate", user_to_be 827 ) and not request.user.has_perm("authentik_core.impersonate"): 828 LOGGER.debug( 829 "User attempted to impersonate without permissions", 830 user=request.user, 831 ) 832 return Response(status=403) 833 if user_to_be.pk == self.request.user.pk: 834 LOGGER.debug("User attempted to impersonate themselves", user=request.user) 835 return Response(status=401) 836 if not reason and request.tenant.impersonation_require_reason: 837 LOGGER.debug( 838 "User attempted to impersonate without providing a reason", 839 user=request.user, 840 ) 841 raise ValidationError({"reason": _("This field is required.")}) 842 843 request.session[SESSION_KEY_IMPERSONATE_ORIGINAL_USER] = request.user 844 request.session[SESSION_KEY_IMPERSONATE_USER] = user_to_be 845 846 Event.new(EventAction.IMPERSONATION_STARTED, reason=reason).from_http(request, user_to_be) 847 848 return Response(status=204) 849 850 @extend_schema( 851 request=None, 852 responses={ 853 "204": OpenApiResponse(description="Successfully ended impersonation"), 854 }, 855 ) 856 @action(detail=False, methods=["GET"]) 857 def impersonate_end(self, request: Request) -> Response: 858 """End Impersonation a user""" 859 if ( 860 SESSION_KEY_IMPERSONATE_USER not in request.session 861 or SESSION_KEY_IMPERSONATE_ORIGINAL_USER not in request.session 862 ): 863 LOGGER.debug("Can't end impersonation", user=request.user) 864 return Response(status=204) 865 866 original_user = request.session[SESSION_KEY_IMPERSONATE_ORIGINAL_USER] 867 868 del request.session[SESSION_KEY_IMPERSONATE_USER] 869 del request.session[SESSION_KEY_IMPERSONATE_ORIGINAL_USER] 870 871 Event.new(EventAction.IMPERSONATION_ENDED).from_http(request, original_user) 872 873 return Response(status=204) 874 875 @extend_schema( 876 responses={ 877 200: inline_serializer( 878 "UserPathSerializer", 879 {"paths": ListField(child=CharField(), read_only=True)}, 880 ) 881 }, 882 parameters=[ 883 OpenApiParameter( 884 name="search", 885 location=OpenApiParameter.QUERY, 886 type=OpenApiTypes.STR, 887 ) 888 ], 889 ) 890 @action(detail=False, pagination_class=None) 891 def paths(self, request: Request) -> Response: 892 """Get all user paths""" 893 return Response( 894 data={ 895 "paths": list( 896 self.filter_queryset(self.get_queryset()) 897 .values("path") 898 .distinct() 899 .order_by("path") 900 .values_list("path", flat=True) 901 ) 902 } 903 ) 904 905 def partial_update(self, request: Request, *args, **kwargs) -> Response: 906 response = super().partial_update(request, *args, **kwargs) 907 instance: User = self.get_object() 908 if not instance.is_active: 909 Session.objects.filter(authenticatedsession__user=instance).delete() 910 LOGGER.debug("Deleted user's sessions", user=instance.username) 911 return response
105class ParamUserSerializer(PassiveSerializer): 106 """Partial serializer for query parameters to select a user""" 107 108 user = PrimaryKeyRelatedField(queryset=User.objects.all().exclude_anonymous(), required=False)
Partial serializer for query parameters to select a user
Inherited Members
111class PartialGroupSerializer(ModelSerializer): 112 """Partial Group Serializer, does not include child relations.""" 113 114 attributes = JSONDictField(required=False) 115 116 class Meta: 117 model = Group 118 fields = [ 119 "pk", 120 "num_pk", 121 "name", 122 "is_superuser", 123 "attributes", 124 ]
Partial Group Serializer, does not include child relations.
Inherited Members
116 class Meta: 117 model = Group 118 fields = [ 119 "pk", 120 "num_pk", 121 "name", 122 "is_superuser", 123 "attributes", 124 ]
127class UserSerializer(ModelSerializer): 128 """User Serializer""" 129 130 is_superuser = BooleanField(read_only=True) 131 avatar = SerializerMethodField() 132 attributes = JSONDictField(required=False) 133 groups = PrimaryKeyRelatedField( 134 allow_empty=True, 135 many=True, 136 queryset=Group.objects.all().order_by("name"), 137 default=list, 138 ) 139 groups_obj = SerializerMethodField(allow_null=True) 140 roles = PrimaryKeyRelatedField( 141 allow_empty=True, 142 many=True, 143 queryset=Role.objects.all().order_by("name"), 144 default=list, 145 ) 146 roles_obj = SerializerMethodField(allow_null=True) 147 uid = CharField(read_only=True) 148 username = CharField( 149 max_length=USERNAME_MAX_LENGTH, 150 validators=[UniqueValidator(queryset=User.objects.all().order_by("username"))], 151 ) 152 153 @property 154 def _should_include_groups(self) -> bool: 155 request: Request = self.context.get("request", None) 156 if not request: 157 return True 158 return str(request.query_params.get("include_groups", "true")).lower() == "true" 159 160 @property 161 def _should_include_roles(self) -> bool: 162 request: Request = self.context.get("request", None) 163 if not request: 164 return True 165 return str(request.query_params.get("include_roles", "true")).lower() == "true" 166 167 @extend_schema_field(PartialGroupSerializer(many=True)) 168 def get_groups_obj(self, instance: User) -> list[PartialGroupSerializer] | None: 169 if not self._should_include_groups: 170 return None 171 return PartialGroupSerializer(instance.groups, many=True).data 172 173 @extend_schema_field(RoleSerializer(many=True)) 174 def get_roles_obj(self, instance: User) -> list[RoleSerializer] | None: 175 if not self._should_include_roles: 176 return None 177 return RoleSerializer(instance.roles, many=True).data 178 179 def __init__(self, *args, **kwargs): 180 super().__init__(*args, **kwargs) 181 if SERIALIZER_CONTEXT_BLUEPRINT in self.context: 182 self.fields["password"] = CharField(required=False, allow_null=True) 183 self.fields["permissions"] = ListField( 184 required=False, 185 child=ChoiceField(choices=get_permission_choices()), 186 ) 187 188 def create(self, validated_data: dict) -> User: 189 """If this serializer is used in the blueprint context, we allow for 190 directly setting a password. However should be done via the `set_password` 191 method instead of directly setting it like rest_framework.""" 192 password = validated_data.pop("password", None) 193 perms_qs = Permission.objects.filter( 194 codename__in=[x.split(".")[1] for x in validated_data.pop("permissions", [])] 195 ).values_list("content_type__app_label", "codename") 196 perms_list = [f"{ct}.{name}" for ct, name in list(perms_qs)] 197 instance: User = super().create(validated_data) 198 self._set_password(instance, password) 199 instance.assign_perms_to_managed_role(perms_list) 200 return instance 201 202 def update(self, instance: User, validated_data: dict) -> User: 203 """Same as `create` above, set the password directly if we're in a blueprint 204 context""" 205 password = validated_data.pop("password", None) 206 perms_qs = Permission.objects.filter( 207 codename__in=[x.split(".")[1] for x in validated_data.pop("permissions", [])] 208 ).values_list("content_type__app_label", "codename") 209 perms_list = [f"{ct}.{name}" for ct, name in list(perms_qs)] 210 instance = super().update(instance, validated_data) 211 self._set_password(instance, password) 212 instance.assign_perms_to_managed_role(perms_list) 213 return instance 214 215 def _set_password(self, instance: User, password: str | None): 216 """Set password of user if we're in a blueprint context, and if it's an empty 217 string then use an unusable password""" 218 if SERIALIZER_CONTEXT_BLUEPRINT in self.context and password: 219 instance.set_password(password) 220 instance.save() 221 if len(instance.password) == 0: 222 instance.set_unusable_password() 223 instance.save() 224 225 def get_avatar(self, user: User) -> str: 226 """User's avatar, either a http/https URL or a data URI""" 227 return get_avatar(user, self.context.get("request")) 228 229 def validate_path(self, path: str) -> str: 230 """Validate path""" 231 if path[:1] == "/" or path[-1] == "/": 232 raise ValidationError(_("No leading or trailing slashes allowed.")) 233 for segment in path.split("/"): 234 if segment == "": 235 raise ValidationError(_("No empty segments in user path allowed.")) 236 return path 237 238 def validate_type(self, user_type: str) -> str: 239 """Validate user type, internal_service_account is an internal value""" 240 if ( 241 self.instance 242 and self.instance.type == UserTypes.INTERNAL_SERVICE_ACCOUNT 243 and user_type != UserTypes.INTERNAL_SERVICE_ACCOUNT.value 244 ): 245 raise ValidationError(_("Can't change internal service account to other user type.")) 246 if not self.instance and user_type == UserTypes.INTERNAL_SERVICE_ACCOUNT.value: 247 raise ValidationError(_("Setting a user to internal service account is not allowed.")) 248 return user_type 249 250 def validate(self, attrs: dict) -> dict: 251 if self.instance and self.instance.type == UserTypes.INTERNAL_SERVICE_ACCOUNT: 252 raise ValidationError(_("Can't modify internal service account users")) 253 return super().validate(attrs) 254 255 class Meta: 256 model = User 257 fields = [ 258 "pk", 259 "username", 260 "name", 261 "is_active", 262 "last_login", 263 "date_joined", 264 "is_superuser", 265 "groups", 266 "groups_obj", 267 "roles", 268 "roles_obj", 269 "email", 270 "avatar", 271 "attributes", 272 "uid", 273 "path", 274 "type", 275 "uuid", 276 "password_change_date", 277 "last_updated", 278 ] 279 extra_kwargs = { 280 "name": {"allow_blank": True}, 281 "date_joined": {"read_only": True}, 282 "password_change_date": {"read_only": True}, 283 }
User Serializer
179 def __init__(self, *args, **kwargs): 180 super().__init__(*args, **kwargs) 181 if SERIALIZER_CONTEXT_BLUEPRINT in self.context: 182 self.fields["password"] = CharField(required=False, allow_null=True) 183 self.fields["permissions"] = ListField( 184 required=False, 185 child=ChoiceField(choices=get_permission_choices()), 186 )
188 def create(self, validated_data: dict) -> User: 189 """If this serializer is used in the blueprint context, we allow for 190 directly setting a password. However should be done via the `set_password` 191 method instead of directly setting it like rest_framework.""" 192 password = validated_data.pop("password", None) 193 perms_qs = Permission.objects.filter( 194 codename__in=[x.split(".")[1] for x in validated_data.pop("permissions", [])] 195 ).values_list("content_type__app_label", "codename") 196 perms_list = [f"{ct}.{name}" for ct, name in list(perms_qs)] 197 instance: User = super().create(validated_data) 198 self._set_password(instance, password) 199 instance.assign_perms_to_managed_role(perms_list) 200 return instance
If this serializer is used in the blueprint context, we allow for
directly setting a password. However should be done via the set_password
method instead of directly setting it like rest_framework.
202 def update(self, instance: User, validated_data: dict) -> User: 203 """Same as `create` above, set the password directly if we're in a blueprint 204 context""" 205 password = validated_data.pop("password", None) 206 perms_qs = Permission.objects.filter( 207 codename__in=[x.split(".")[1] for x in validated_data.pop("permissions", [])] 208 ).values_list("content_type__app_label", "codename") 209 perms_list = [f"{ct}.{name}" for ct, name in list(perms_qs)] 210 instance = super().update(instance, validated_data) 211 self._set_password(instance, password) 212 instance.assign_perms_to_managed_role(perms_list) 213 return instance
Same as create above, set the password directly if we're in a blueprint
context
225 def get_avatar(self, user: User) -> str: 226 """User's avatar, either a http/https URL or a data URI""" 227 return get_avatar(user, self.context.get("request"))
User's avatar, either a http/https URL or a data URI
229 def validate_path(self, path: str) -> str: 230 """Validate path""" 231 if path[:1] == "/" or path[-1] == "/": 232 raise ValidationError(_("No leading or trailing slashes allowed.")) 233 for segment in path.split("/"): 234 if segment == "": 235 raise ValidationError(_("No empty segments in user path allowed.")) 236 return path
Validate path
238 def validate_type(self, user_type: str) -> str: 239 """Validate user type, internal_service_account is an internal value""" 240 if ( 241 self.instance 242 and self.instance.type == UserTypes.INTERNAL_SERVICE_ACCOUNT 243 and user_type != UserTypes.INTERNAL_SERVICE_ACCOUNT.value 244 ): 245 raise ValidationError(_("Can't change internal service account to other user type.")) 246 if not self.instance and user_type == UserTypes.INTERNAL_SERVICE_ACCOUNT.value: 247 raise ValidationError(_("Setting a user to internal service account is not allowed.")) 248 return user_type
Validate user type, internal_service_account is an internal value
Inherited Members
255 class Meta: 256 model = User 257 fields = [ 258 "pk", 259 "username", 260 "name", 261 "is_active", 262 "last_login", 263 "date_joined", 264 "is_superuser", 265 "groups", 266 "groups_obj", 267 "roles", 268 "roles_obj", 269 "email", 270 "avatar", 271 "attributes", 272 "uid", 273 "path", 274 "type", 275 "uuid", 276 "password_change_date", 277 "last_updated", 278 ] 279 extra_kwargs = { 280 "name": {"allow_blank": True}, 281 "date_joined": {"read_only": True}, 282 "password_change_date": {"read_only": True}, 283 }
286class UserSelfSerializer(ModelSerializer): 287 """User Serializer for information a user can retrieve about themselves""" 288 289 is_superuser = BooleanField(read_only=True) 290 avatar = SerializerMethodField() 291 groups = SerializerMethodField() 292 roles = SerializerMethodField() 293 uid = CharField(read_only=True) 294 settings = SerializerMethodField() 295 system_permissions = SerializerMethodField() 296 297 def get_avatar(self, user: User) -> str: 298 """User's avatar, either a http/https URL or a data URI""" 299 return get_avatar(user, self.context.get("request")) 300 301 @extend_schema_field( 302 ListSerializer( 303 child=inline_serializer( 304 "UserSelfGroups", 305 { 306 "name": CharField(read_only=True), 307 "pk": CharField(read_only=True), 308 }, 309 ) 310 ) 311 ) 312 def get_groups(self, _: User): 313 """Return only the group names a user is member of""" 314 for group in self.instance.all_groups().order_by("name"): 315 yield { 316 "name": group.name, 317 "pk": group.pk, 318 } 319 320 @extend_schema_field( 321 ListSerializer( 322 child=inline_serializer( 323 "UserSelfRoles", 324 { 325 "name": CharField(read_only=True), 326 "pk": CharField(read_only=True), 327 }, 328 ) 329 ) 330 ) 331 def get_roles(self, _: User): 332 """Return only the roles a user is member of""" 333 for role in self.instance.all_roles().order_by("name"): 334 yield { 335 "name": role.name, 336 "pk": role.pk, 337 } 338 339 def get_settings(self, user: User) -> dict[str, Any]: 340 """Get user settings with brand and group settings applied""" 341 return user.group_attributes(self._context["request"]).get("settings", {}) 342 343 def get_system_permissions(self, user: User) -> list[str]: 344 """Get all system permissions assigned to the user""" 345 return list( 346 x.split(".", maxsplit=1)[1] 347 for x in user.get_all_permissions() 348 if x.startswith("authentik_rbac") 349 ) 350 351 class Meta: 352 model = User 353 fields = [ 354 "pk", 355 "username", 356 "name", 357 "is_active", 358 "is_superuser", 359 "groups", 360 "roles", 361 "email", 362 "avatar", 363 "uid", 364 "settings", 365 "type", 366 "system_permissions", 367 ] 368 extra_kwargs = { 369 "is_active": {"read_only": True}, 370 "name": {"allow_blank": True}, 371 }
User Serializer for information a user can retrieve about themselves
297 def get_avatar(self, user: User) -> str: 298 """User's avatar, either a http/https URL or a data URI""" 299 return get_avatar(user, self.context.get("request"))
User's avatar, either a http/https URL or a data URI
301 @extend_schema_field( 302 ListSerializer( 303 child=inline_serializer( 304 "UserSelfGroups", 305 { 306 "name": CharField(read_only=True), 307 "pk": CharField(read_only=True), 308 }, 309 ) 310 ) 311 ) 312 def get_groups(self, _: User): 313 """Return only the group names a user is member of""" 314 for group in self.instance.all_groups().order_by("name"): 315 yield { 316 "name": group.name, 317 "pk": group.pk, 318 }
Return only the group names a user is member of
320 @extend_schema_field( 321 ListSerializer( 322 child=inline_serializer( 323 "UserSelfRoles", 324 { 325 "name": CharField(read_only=True), 326 "pk": CharField(read_only=True), 327 }, 328 ) 329 ) 330 ) 331 def get_roles(self, _: User): 332 """Return only the roles a user is member of""" 333 for role in self.instance.all_roles().order_by("name"): 334 yield { 335 "name": role.name, 336 "pk": role.pk, 337 }
Return only the roles a user is member of
339 def get_settings(self, user: User) -> dict[str, Any]: 340 """Get user settings with brand and group settings applied""" 341 return user.group_attributes(self._context["request"]).get("settings", {})
Get user settings with brand and group settings applied
343 def get_system_permissions(self, user: User) -> list[str]: 344 """Get all system permissions assigned to the user""" 345 return list( 346 x.split(".", maxsplit=1)[1] 347 for x in user.get_all_permissions() 348 if x.startswith("authentik_rbac") 349 )
Get all system permissions assigned to the user
Inherited Members
351 class Meta: 352 model = User 353 fields = [ 354 "pk", 355 "username", 356 "name", 357 "is_active", 358 "is_superuser", 359 "groups", 360 "roles", 361 "email", 362 "avatar", 363 "uid", 364 "settings", 365 "type", 366 "system_permissions", 367 ] 368 extra_kwargs = { 369 "is_active": {"read_only": True}, 370 "name": {"allow_blank": True}, 371 }
374class SessionUserSerializer(PassiveSerializer): 375 """Response for the /user/me endpoint, returns the currently active user (as `user` property) 376 and, if this user is being impersonated, the original user in the `original` property. 377 """ 378 379 user = UserSelfSerializer() 380 original = UserSelfSerializer(required=False)
Response for the /user/me endpoint, returns the currently active user (as user property)
and, if this user is being impersonated, the original user in the original property.
Inherited Members
383class UserPasswordSetSerializer(PassiveSerializer): 384 """Payload to set a users' password directly""" 385 386 password = CharField(required=True)
Payload to set a users' password directly
Inherited Members
389class UserServiceAccountSerializer(PassiveSerializer): 390 """Payload to create a service account""" 391 392 name = CharField( 393 required=True, 394 validators=[UniqueValidator(queryset=User.objects.all().order_by("username"))], 395 ) 396 create_group = BooleanField(default=False) 397 expiring = BooleanField(default=True) 398 expires = DateTimeField( 399 required=False, 400 help_text="If not provided, valid for 360 days", 401 )
Payload to create a service account
Inherited Members
404class UserRecoveryLinkSerializer(PassiveSerializer): 405 """Payload to create a recovery link""" 406 407 token_duration = CharField(required=False)
Payload to create a recovery link
Inherited Members
410class UserRecoveryEmailSerializer(UserRecoveryLinkSerializer): 411 """Payload to create and email a recovery link""" 412 413 email_stage = UUIDField()
Payload to create and email a recovery link
416class UsersFilter(FilterSet): 417 """Filter for users""" 418 419 attributes = CharFilter( 420 field_name="attributes", 421 lookup_expr="", 422 label="Attributes", 423 method="filter_attributes", 424 ) 425 426 date_joined__lt = IsoDateTimeFilter(field_name="date_joined", lookup_expr="lt") 427 date_joined = IsoDateTimeFilter(field_name="date_joined") 428 date_joined__gt = IsoDateTimeFilter(field_name="date_joined", lookup_expr="gt") 429 430 last_updated__lt = IsoDateTimeFilter(field_name="last_updated", lookup_expr="lt") 431 last_updated = IsoDateTimeFilter(field_name="last_updated") 432 last_updated__gt = IsoDateTimeFilter(field_name="last_updated", lookup_expr="gt") 433 434 last_login__lt = IsoDateTimeFilter(field_name="last_login", lookup_expr="lt") 435 last_login = IsoDateTimeFilter(field_name="last_login") 436 last_login__gt = IsoDateTimeFilter(field_name="last_login", lookup_expr="gt") 437 last_login__isnull = BooleanFilter(field_name="last_login", lookup_expr="isnull") 438 439 is_superuser = BooleanFilter(field_name="groups", method="filter_is_superuser") 440 uuid = UUIDFilter(field_name="uuid") 441 442 path = CharFilter(field_name="path") 443 path_startswith = CharFilter(field_name="path", lookup_expr="startswith") 444 445 type = MultipleChoiceFilter(choices=UserTypes.choices, field_name="type") 446 447 groups_by_name = ModelMultipleChoiceFilter( 448 field_name="groups__name", 449 to_field_name="name", 450 queryset=Group.objects.all().order_by("name"), 451 ) 452 groups_by_pk = ModelMultipleChoiceFilter( 453 field_name="groups", 454 queryset=Group.objects.all().order_by("name"), 455 ) 456 457 roles_by_name = ModelMultipleChoiceFilter( 458 field_name="roles__name", 459 to_field_name="name", 460 queryset=Role.objects.all().order_by("name"), 461 ) 462 roles_by_pk = ModelMultipleChoiceFilter( 463 field_name="roles", 464 queryset=Role.objects.all().order_by("name"), 465 ) 466 467 def filter_is_superuser(self, queryset, name, value): 468 if value: 469 return queryset.filter(groups__is_superuser=True).distinct() 470 return queryset.exclude(groups__is_superuser=True).distinct() 471 472 def filter_attributes(self, queryset, name, value): 473 """Filter attributes by query args""" 474 try: 475 value = loads(value) 476 except ValueError: 477 raise ValidationError(_("filter: failed to parse JSON")) from None 478 if not isinstance(value, dict): 479 raise ValidationError(_("filter: value must be key:value mapping")) 480 qs = {} 481 for key, _value in value.items(): 482 qs[f"attributes__{key}"] = _value 483 try: 484 __ = len(queryset.filter(**qs)) 485 return queryset.filter(**qs) 486 except ValueError: 487 return queryset 488 489 class Meta: 490 model = User 491 fields = [ 492 "username", 493 "email", 494 "date_joined", 495 "last_updated", 496 "last_login", 497 "name", 498 "is_active", 499 "is_superuser", 500 "attributes", 501 "groups_by_name", 502 "groups_by_pk", 503 "roles_by_name", 504 "roles_by_pk", 505 "type", 506 ]
Filter for users
472 def filter_attributes(self, queryset, name, value): 473 """Filter attributes by query args""" 474 try: 475 value = loads(value) 476 except ValueError: 477 raise ValidationError(_("filter: failed to parse JSON")) from None 478 if not isinstance(value, dict): 479 raise ValidationError(_("filter: value must be key:value mapping")) 480 qs = {} 481 for key, _value in value.items(): 482 qs[f"attributes__{key}"] = _value 483 try: 484 __ = len(queryset.filter(**qs)) 485 return queryset.filter(**qs) 486 except ValueError: 487 return queryset
Filter attributes by query args
489 class Meta: 490 model = User 491 fields = [ 492 "username", 493 "email", 494 "date_joined", 495 "last_updated", 496 "last_login", 497 "name", 498 "is_active", 499 "is_superuser", 500 "attributes", 501 "groups_by_name", 502 "groups_by_pk", 503 "roles_by_name", 504 "roles_by_pk", 505 "type", 506 ]
509class UserViewSet( 510 ConditionalInheritance("authentik.enterprise.reports.api.reports.ExportMixin"), 511 UsedByMixin, 512 ModelViewSet, 513): 514 """User Viewset""" 515 516 queryset = User.objects.none() 517 ordering = ["username", "date_joined", "last_updated", "last_login"] 518 serializer_class = UserSerializer 519 filterset_class = UsersFilter 520 search_fields = ["email", "name", "uuid", "username"] 521 authentication_classes = [ 522 TokenAuthentication, 523 SessionAuthentication, 524 AgentAuth, 525 ] 526 527 def get_ql_fields(self): 528 from djangoql.schema import BoolField, StrField 529 530 from authentik.enterprise.search.fields import ( 531 ChoiceSearchField, 532 JSONSearchField, 533 ) 534 535 return [ 536 StrField(User, "username"), 537 StrField(User, "name"), 538 StrField(User, "email"), 539 StrField(User, "path"), 540 BoolField(User, "is_active", nullable=True), 541 ChoiceSearchField(User, "type"), 542 JSONSearchField(User, "attributes"), 543 ] 544 545 def get_queryset(self): 546 base_qs = User.objects.all().exclude_anonymous() 547 if self.serializer_class(context={"request": self.request})._should_include_groups: 548 base_qs = base_qs.prefetch_related("groups") 549 if self.serializer_class(context={"request": self.request})._should_include_roles: 550 base_qs = base_qs.prefetch_related("roles") 551 return base_qs 552 553 @extend_schema( 554 parameters=[ 555 OpenApiParameter("include_groups", bool, default=True), 556 OpenApiParameter("include_roles", bool, default=True), 557 ] 558 ) 559 def list(self, request, *args, **kwargs): 560 return super().list(request, *args, **kwargs) 561 562 def _create_recovery_link( 563 self, token_duration: str | None, for_email=False 564 ) -> tuple[str, Token]: 565 """Create a recovery link (when the current brand has a recovery flow set), 566 that can either be shown to an admin or sent to the user directly""" 567 brand: Brand = self.request.brand 568 # Check that there is a recovery flow, if not return an error 569 flow = brand.flow_recovery 570 if not flow: 571 raise ValidationError({"non_field_errors": _("No recovery flow set.")}) 572 user: User = self.get_object() 573 planner = FlowPlanner(flow) 574 planner.allow_empty_flows = True 575 self.request._request.user = AnonymousUser() 576 try: 577 plan = planner.plan( 578 self.request._request, 579 { 580 PLAN_CONTEXT_PENDING_USER: user, 581 }, 582 ) 583 except FlowNonApplicableException: 584 raise ValidationError( 585 {"non_field_errors": _("Recovery flow not applicable to user")} 586 ) from None 587 _plan = FlowToken.pickle(plan) 588 if for_email: 589 _plan = pickle_flow_token_for_email(plan) 590 expires = default_token_duration() 591 if token_duration: 592 timedelta_string_validator(token_duration) 593 expires = now() + timedelta_from_string(token_duration) 594 token, __ = FlowToken.objects.update_or_create( 595 identifier=f"{user.uid}-password-reset", 596 defaults={ 597 "user": user, 598 "flow": flow, 599 "_plan": _plan, 600 "revoke_on_execution": not for_email, 601 "expires": expires, 602 }, 603 ) 604 querystring = urlencode({QS_KEY_TOKEN: token.key}) 605 link = self.request.build_absolute_uri( 606 reverse_lazy("authentik_core:if-flow", kwargs={"flow_slug": flow.slug}) 607 + f"?{querystring}" 608 ) 609 return link, token 610 611 @permission_required(None, ["authentik_core.add_user", "authentik_core.add_token"]) 612 @extend_schema( 613 request=UserServiceAccountSerializer, 614 responses={ 615 200: inline_serializer( 616 "UserServiceAccountResponse", 617 { 618 "username": CharField(required=True), 619 "token": CharField(required=True), 620 "user_uid": CharField(required=True), 621 "user_pk": IntegerField(required=True), 622 "group_pk": CharField(required=False), 623 }, 624 ) 625 }, 626 ) 627 @action( 628 detail=False, 629 methods=["POST"], 630 pagination_class=None, 631 filter_backends=[], 632 ) 633 @validate(UserServiceAccountSerializer) 634 def service_account(self, request: Request, body: UserServiceAccountSerializer) -> Response: 635 """Create a new user account that is marked as a service account""" 636 expires = body.validated_data.get("expires", now() + timedelta(days=360)) 637 638 username = body.validated_data["name"] 639 expiring = body.validated_data["expiring"] 640 with atomic(): 641 try: 642 user: User = User.objects.create( 643 username=username, 644 name=username, 645 type=UserTypes.SERVICE_ACCOUNT, 646 attributes={USER_ATTRIBUTE_TOKEN_EXPIRING: expiring}, 647 path=USER_PATH_SERVICE_ACCOUNT, 648 ) 649 user.set_unusable_password() 650 user.save() 651 652 response = { 653 "username": user.username, 654 "user_uid": user.uid, 655 "user_pk": user.pk, 656 } 657 if body.validated_data["create_group"] and self.request.user.has_perm( 658 "authentik_core.add_group" 659 ): 660 group = Group.objects.create(name=username) 661 group.users.add(user) 662 response["group_pk"] = str(group.pk) 663 token = Token.objects.create( 664 identifier=slugify(f"service-account-{username}-password"), 665 intent=TokenIntents.INTENT_APP_PASSWORD, 666 user=user, 667 expires=expires, 668 expiring=expiring, 669 ) 670 response["token"] = token.key 671 return Response(response) 672 except IntegrityError as exc: 673 error_msg = str(exc).lower() 674 675 if "unique" in error_msg: 676 return Response( 677 data={ 678 "non_field_errors": [ 679 _("A user/group with these details already exists") 680 ] 681 }, 682 status=400, 683 ) 684 else: 685 LOGGER.warning("Service account creation failed", exc=exc) 686 return Response( 687 data={"non_field_errors": [_("Unable to create user")]}, 688 status=400, 689 ) 690 except (ValueError, TypeError) as exc: 691 LOGGER.error("Unexpected error during service account creation", exc=exc) 692 return Response( 693 data={"non_field_errors": [_("Unknown error occurred")]}, 694 status=500, 695 ) 696 697 @extend_schema(responses={200: SessionUserSerializer(many=False)}) 698 @action( 699 url_path="me", 700 url_name="me", 701 detail=False, 702 pagination_class=None, 703 filter_backends=[], 704 ) 705 def user_me(self, request: Request) -> Response: 706 """Get information about current user""" 707 context = {"request": request} 708 serializer = SessionUserSerializer( 709 data={"user": UserSelfSerializer(instance=request.user, context=context).data} 710 ) 711 if SESSION_KEY_IMPERSONATE_USER in request._request.session: 712 serializer.initial_data["original"] = UserSelfSerializer( 713 instance=request._request.session[SESSION_KEY_IMPERSONATE_ORIGINAL_USER], 714 context=context, 715 ).data 716 self.request.session.modified = True 717 return Response(serializer.initial_data) 718 719 @permission_required("authentik_core.reset_user_password") 720 @extend_schema( 721 request=UserPasswordSetSerializer, 722 responses={ 723 204: OpenApiResponse(description="Successfully changed password"), 724 400: OpenApiResponse(description="Bad request"), 725 }, 726 ) 727 @action( 728 detail=True, 729 methods=["POST"], 730 permission_classes=[IsAuthenticated], 731 ) 732 @validate(UserPasswordSetSerializer) 733 def set_password(self, request: Request, pk: int, body: UserPasswordSetSerializer) -> Response: 734 """Set password for user""" 735 user: User = self.get_object() 736 try: 737 user.set_password(body.validated_data["password"], request=request) 738 user.save() 739 except (ValidationError, IntegrityError) as exc: 740 LOGGER.debug("Failed to set password", exc=exc) 741 return Response(status=400) 742 if user.pk == request.user.pk and SESSION_KEY_IMPERSONATE_USER not in self.request.session: 743 LOGGER.debug("Updating session hash after password change") 744 update_session_auth_hash(self.request, user) 745 return Response(status=204) 746 747 @permission_required("authentik_core.reset_user_password") 748 @extend_schema( 749 request=UserRecoveryLinkSerializer, 750 responses={ 751 "200": LinkSerializer(many=False), 752 }, 753 ) 754 @action(detail=True, pagination_class=None, filter_backends=[], methods=["POST"]) 755 @validate(UserRecoveryLinkSerializer) 756 def recovery(self, request: Request, pk: int, body: UserRecoveryLinkSerializer) -> Response: 757 """Create a temporary link that a user can use to recover their account""" 758 link, _ = self._create_recovery_link( 759 token_duration=body.validated_data.get("token_duration") 760 ) 761 return Response({"link": link}) 762 763 @permission_required("authentik_core.reset_user_password") 764 @extend_schema( 765 request=UserRecoveryEmailSerializer, 766 responses={ 767 "204": OpenApiResponse(description="Successfully sent recover email"), 768 }, 769 ) 770 @action(detail=True, pagination_class=None, filter_backends=[], methods=["POST"]) 771 @validate(UserRecoveryEmailSerializer) 772 def recovery_email( 773 self, request: Request, pk: int, body: UserRecoveryEmailSerializer 774 ) -> Response: 775 """Send an email with a temporary link that a user can use to recover their account""" 776 email_error_message = _("User does not have an email address set.") 777 stage_error_message = _("Email stage not found.") 778 user: User = self.get_object() 779 if not user.email: 780 LOGGER.debug("User doesn't have an email address") 781 raise ValidationError({"non_field_errors": email_error_message}) 782 if not (stage := EmailStage.objects.filter(pk=body.validated_data["email_stage"]).first()): 783 LOGGER.debug("Email stage does not exist") 784 raise ValidationError({"non_field_errors": stage_error_message}) 785 if not request.user.has_perm("authentik_stages_email.view_emailstage", stage): 786 LOGGER.debug("User has no view access to email stage") 787 raise ValidationError({"non_field_errors": stage_error_message}) 788 link, token = self._create_recovery_link( 789 token_duration=body.validated_data.get("token_duration"), for_email=True 790 ) 791 message = TemplateEmailMessage( 792 subject=_(stage.subject), 793 to=[(user.name, user.email)], 794 template_name=stage.template, 795 language=user.locale(request), 796 template_context={ 797 "url": link, 798 "user": user, 799 "expires": token.expires, 800 }, 801 ) 802 send_mails(stage, message) 803 return Response(status=204) 804 805 @permission_required("authentik_core.impersonate") 806 @extend_schema( 807 request=inline_serializer( 808 "ImpersonationSerializer", 809 { 810 "reason": CharField(required=True), 811 }, 812 ), 813 responses={ 814 204: OpenApiResponse(description="Successfully started impersonation"), 815 }, 816 ) 817 @action(detail=True, methods=["POST"], permission_classes=[IsAuthenticated]) 818 def impersonate(self, request: Request, pk: int) -> Response: 819 """Impersonate a user""" 820 if not request.tenant.impersonation: 821 LOGGER.debug("User attempted to impersonate", user=request.user) 822 return Response(status=401) 823 user_to_be = self.get_object() 824 reason = request.data.get("reason", "") 825 # Check both object-level perms and global perms 826 if not request.user.has_perm( 827 "authentik_core.impersonate", user_to_be 828 ) and not request.user.has_perm("authentik_core.impersonate"): 829 LOGGER.debug( 830 "User attempted to impersonate without permissions", 831 user=request.user, 832 ) 833 return Response(status=403) 834 if user_to_be.pk == self.request.user.pk: 835 LOGGER.debug("User attempted to impersonate themselves", user=request.user) 836 return Response(status=401) 837 if not reason and request.tenant.impersonation_require_reason: 838 LOGGER.debug( 839 "User attempted to impersonate without providing a reason", 840 user=request.user, 841 ) 842 raise ValidationError({"reason": _("This field is required.")}) 843 844 request.session[SESSION_KEY_IMPERSONATE_ORIGINAL_USER] = request.user 845 request.session[SESSION_KEY_IMPERSONATE_USER] = user_to_be 846 847 Event.new(EventAction.IMPERSONATION_STARTED, reason=reason).from_http(request, user_to_be) 848 849 return Response(status=204) 850 851 @extend_schema( 852 request=None, 853 responses={ 854 "204": OpenApiResponse(description="Successfully ended impersonation"), 855 }, 856 ) 857 @action(detail=False, methods=["GET"]) 858 def impersonate_end(self, request: Request) -> Response: 859 """End Impersonation a user""" 860 if ( 861 SESSION_KEY_IMPERSONATE_USER not in request.session 862 or SESSION_KEY_IMPERSONATE_ORIGINAL_USER not in request.session 863 ): 864 LOGGER.debug("Can't end impersonation", user=request.user) 865 return Response(status=204) 866 867 original_user = request.session[SESSION_KEY_IMPERSONATE_ORIGINAL_USER] 868 869 del request.session[SESSION_KEY_IMPERSONATE_USER] 870 del request.session[SESSION_KEY_IMPERSONATE_ORIGINAL_USER] 871 872 Event.new(EventAction.IMPERSONATION_ENDED).from_http(request, original_user) 873 874 return Response(status=204) 875 876 @extend_schema( 877 responses={ 878 200: inline_serializer( 879 "UserPathSerializer", 880 {"paths": ListField(child=CharField(), read_only=True)}, 881 ) 882 }, 883 parameters=[ 884 OpenApiParameter( 885 name="search", 886 location=OpenApiParameter.QUERY, 887 type=OpenApiTypes.STR, 888 ) 889 ], 890 ) 891 @action(detail=False, pagination_class=None) 892 def paths(self, request: Request) -> Response: 893 """Get all user paths""" 894 return Response( 895 data={ 896 "paths": list( 897 self.filter_queryset(self.get_queryset()) 898 .values("path") 899 .distinct() 900 .order_by("path") 901 .values_list("path", flat=True) 902 ) 903 } 904 ) 905 906 def partial_update(self, request: Request, *args, **kwargs) -> Response: 907 response = super().partial_update(request, *args, **kwargs) 908 instance: User = self.get_object() 909 if not instance.is_active: 910 Session.objects.filter(authenticatedsession__user=instance).delete() 911 LOGGER.debug("Deleted user's sessions", user=instance.username) 912 return response
User Viewset
527 def get_ql_fields(self): 528 from djangoql.schema import BoolField, StrField 529 530 from authentik.enterprise.search.fields import ( 531 ChoiceSearchField, 532 JSONSearchField, 533 ) 534 535 return [ 536 StrField(User, "username"), 537 StrField(User, "name"), 538 StrField(User, "email"), 539 StrField(User, "path"), 540 BoolField(User, "is_active", nullable=True), 541 ChoiceSearchField(User, "type"), 542 JSONSearchField(User, "attributes"), 543 ]
545 def get_queryset(self): 546 base_qs = User.objects.all().exclude_anonymous() 547 if self.serializer_class(context={"request": self.request})._should_include_groups: 548 base_qs = base_qs.prefetch_related("groups") 549 if self.serializer_class(context={"request": self.request})._should_include_roles: 550 base_qs = base_qs.prefetch_related("roles") 551 return base_qs
Get the list of items for this view.
This must be an iterable, and may be a queryset.
Defaults to using self.queryset.
This method should always be used rather than accessing self.queryset
directly, as self.queryset gets evaluated only once, and those results
are cached for all subsequent requests.
You may want to override this if you need to provide different querysets depending on the incoming request.
(Eg. return a list of items that is specific to the user)
611 @permission_required(None, ["authentik_core.add_user", "authentik_core.add_token"]) 612 @extend_schema( 613 request=UserServiceAccountSerializer, 614 responses={ 615 200: inline_serializer( 616 "UserServiceAccountResponse", 617 { 618 "username": CharField(required=True), 619 "token": CharField(required=True), 620 "user_uid": CharField(required=True), 621 "user_pk": IntegerField(required=True), 622 "group_pk": CharField(required=False), 623 }, 624 ) 625 }, 626 ) 627 @action( 628 detail=False, 629 methods=["POST"], 630 pagination_class=None, 631 filter_backends=[], 632 ) 633 @validate(UserServiceAccountSerializer) 634 def service_account(self, request: Request, body: UserServiceAccountSerializer) -> Response: 635 """Create a new user account that is marked as a service account""" 636 expires = body.validated_data.get("expires", now() + timedelta(days=360)) 637 638 username = body.validated_data["name"] 639 expiring = body.validated_data["expiring"] 640 with atomic(): 641 try: 642 user: User = User.objects.create( 643 username=username, 644 name=username, 645 type=UserTypes.SERVICE_ACCOUNT, 646 attributes={USER_ATTRIBUTE_TOKEN_EXPIRING: expiring}, 647 path=USER_PATH_SERVICE_ACCOUNT, 648 ) 649 user.set_unusable_password() 650 user.save() 651 652 response = { 653 "username": user.username, 654 "user_uid": user.uid, 655 "user_pk": user.pk, 656 } 657 if body.validated_data["create_group"] and self.request.user.has_perm( 658 "authentik_core.add_group" 659 ): 660 group = Group.objects.create(name=username) 661 group.users.add(user) 662 response["group_pk"] = str(group.pk) 663 token = Token.objects.create( 664 identifier=slugify(f"service-account-{username}-password"), 665 intent=TokenIntents.INTENT_APP_PASSWORD, 666 user=user, 667 expires=expires, 668 expiring=expiring, 669 ) 670 response["token"] = token.key 671 return Response(response) 672 except IntegrityError as exc: 673 error_msg = str(exc).lower() 674 675 if "unique" in error_msg: 676 return Response( 677 data={ 678 "non_field_errors": [ 679 _("A user/group with these details already exists") 680 ] 681 }, 682 status=400, 683 ) 684 else: 685 LOGGER.warning("Service account creation failed", exc=exc) 686 return Response( 687 data={"non_field_errors": [_("Unable to create user")]}, 688 status=400, 689 ) 690 except (ValueError, TypeError) as exc: 691 LOGGER.error("Unexpected error during service account creation", exc=exc) 692 return Response( 693 data={"non_field_errors": [_("Unknown error occurred")]}, 694 status=500, 695 )
Create a new user account that is marked as a service account
697 @extend_schema(responses={200: SessionUserSerializer(many=False)}) 698 @action( 699 url_path="me", 700 url_name="me", 701 detail=False, 702 pagination_class=None, 703 filter_backends=[], 704 ) 705 def user_me(self, request: Request) -> Response: 706 """Get information about current user""" 707 context = {"request": request} 708 serializer = SessionUserSerializer( 709 data={"user": UserSelfSerializer(instance=request.user, context=context).data} 710 ) 711 if SESSION_KEY_IMPERSONATE_USER in request._request.session: 712 serializer.initial_data["original"] = UserSelfSerializer( 713 instance=request._request.session[SESSION_KEY_IMPERSONATE_ORIGINAL_USER], 714 context=context, 715 ).data 716 self.request.session.modified = True 717 return Response(serializer.initial_data)
Get information about current user
719 @permission_required("authentik_core.reset_user_password") 720 @extend_schema( 721 request=UserPasswordSetSerializer, 722 responses={ 723 204: OpenApiResponse(description="Successfully changed password"), 724 400: OpenApiResponse(description="Bad request"), 725 }, 726 ) 727 @action( 728 detail=True, 729 methods=["POST"], 730 permission_classes=[IsAuthenticated], 731 ) 732 @validate(UserPasswordSetSerializer) 733 def set_password(self, request: Request, pk: int, body: UserPasswordSetSerializer) -> Response: 734 """Set password for user""" 735 user: User = self.get_object() 736 try: 737 user.set_password(body.validated_data["password"], request=request) 738 user.save() 739 except (ValidationError, IntegrityError) as exc: 740 LOGGER.debug("Failed to set password", exc=exc) 741 return Response(status=400) 742 if user.pk == request.user.pk and SESSION_KEY_IMPERSONATE_USER not in self.request.session: 743 LOGGER.debug("Updating session hash after password change") 744 update_session_auth_hash(self.request, user) 745 return Response(status=204)
Set password for user
747 @permission_required("authentik_core.reset_user_password") 748 @extend_schema( 749 request=UserRecoveryLinkSerializer, 750 responses={ 751 "200": LinkSerializer(many=False), 752 }, 753 ) 754 @action(detail=True, pagination_class=None, filter_backends=[], methods=["POST"]) 755 @validate(UserRecoveryLinkSerializer) 756 def recovery(self, request: Request, pk: int, body: UserRecoveryLinkSerializer) -> Response: 757 """Create a temporary link that a user can use to recover their account""" 758 link, _ = self._create_recovery_link( 759 token_duration=body.validated_data.get("token_duration") 760 ) 761 return Response({"link": link})
Create a temporary link that a user can use to recover their account
763 @permission_required("authentik_core.reset_user_password") 764 @extend_schema( 765 request=UserRecoveryEmailSerializer, 766 responses={ 767 "204": OpenApiResponse(description="Successfully sent recover email"), 768 }, 769 ) 770 @action(detail=True, pagination_class=None, filter_backends=[], methods=["POST"]) 771 @validate(UserRecoveryEmailSerializer) 772 def recovery_email( 773 self, request: Request, pk: int, body: UserRecoveryEmailSerializer 774 ) -> Response: 775 """Send an email with a temporary link that a user can use to recover their account""" 776 email_error_message = _("User does not have an email address set.") 777 stage_error_message = _("Email stage not found.") 778 user: User = self.get_object() 779 if not user.email: 780 LOGGER.debug("User doesn't have an email address") 781 raise ValidationError({"non_field_errors": email_error_message}) 782 if not (stage := EmailStage.objects.filter(pk=body.validated_data["email_stage"]).first()): 783 LOGGER.debug("Email stage does not exist") 784 raise ValidationError({"non_field_errors": stage_error_message}) 785 if not request.user.has_perm("authentik_stages_email.view_emailstage", stage): 786 LOGGER.debug("User has no view access to email stage") 787 raise ValidationError({"non_field_errors": stage_error_message}) 788 link, token = self._create_recovery_link( 789 token_duration=body.validated_data.get("token_duration"), for_email=True 790 ) 791 message = TemplateEmailMessage( 792 subject=_(stage.subject), 793 to=[(user.name, user.email)], 794 template_name=stage.template, 795 language=user.locale(request), 796 template_context={ 797 "url": link, 798 "user": user, 799 "expires": token.expires, 800 }, 801 ) 802 send_mails(stage, message) 803 return Response(status=204)
Send an email with a temporary link that a user can use to recover their account
805 @permission_required("authentik_core.impersonate") 806 @extend_schema( 807 request=inline_serializer( 808 "ImpersonationSerializer", 809 { 810 "reason": CharField(required=True), 811 }, 812 ), 813 responses={ 814 204: OpenApiResponse(description="Successfully started impersonation"), 815 }, 816 ) 817 @action(detail=True, methods=["POST"], permission_classes=[IsAuthenticated]) 818 def impersonate(self, request: Request, pk: int) -> Response: 819 """Impersonate a user""" 820 if not request.tenant.impersonation: 821 LOGGER.debug("User attempted to impersonate", user=request.user) 822 return Response(status=401) 823 user_to_be = self.get_object() 824 reason = request.data.get("reason", "") 825 # Check both object-level perms and global perms 826 if not request.user.has_perm( 827 "authentik_core.impersonate", user_to_be 828 ) and not request.user.has_perm("authentik_core.impersonate"): 829 LOGGER.debug( 830 "User attempted to impersonate without permissions", 831 user=request.user, 832 ) 833 return Response(status=403) 834 if user_to_be.pk == self.request.user.pk: 835 LOGGER.debug("User attempted to impersonate themselves", user=request.user) 836 return Response(status=401) 837 if not reason and request.tenant.impersonation_require_reason: 838 LOGGER.debug( 839 "User attempted to impersonate without providing a reason", 840 user=request.user, 841 ) 842 raise ValidationError({"reason": _("This field is required.")}) 843 844 request.session[SESSION_KEY_IMPERSONATE_ORIGINAL_USER] = request.user 845 request.session[SESSION_KEY_IMPERSONATE_USER] = user_to_be 846 847 Event.new(EventAction.IMPERSONATION_STARTED, reason=reason).from_http(request, user_to_be) 848 849 return Response(status=204)
Impersonate a user
851 @extend_schema( 852 request=None, 853 responses={ 854 "204": OpenApiResponse(description="Successfully ended impersonation"), 855 }, 856 ) 857 @action(detail=False, methods=["GET"]) 858 def impersonate_end(self, request: Request) -> Response: 859 """End Impersonation a user""" 860 if ( 861 SESSION_KEY_IMPERSONATE_USER not in request.session 862 or SESSION_KEY_IMPERSONATE_ORIGINAL_USER not in request.session 863 ): 864 LOGGER.debug("Can't end impersonation", user=request.user) 865 return Response(status=204) 866 867 original_user = request.session[SESSION_KEY_IMPERSONATE_ORIGINAL_USER] 868 869 del request.session[SESSION_KEY_IMPERSONATE_USER] 870 del request.session[SESSION_KEY_IMPERSONATE_ORIGINAL_USER] 871 872 Event.new(EventAction.IMPERSONATION_ENDED).from_http(request, original_user) 873 874 return Response(status=204)
End Impersonation a user
876 @extend_schema( 877 responses={ 878 200: inline_serializer( 879 "UserPathSerializer", 880 {"paths": ListField(child=CharField(), read_only=True)}, 881 ) 882 }, 883 parameters=[ 884 OpenApiParameter( 885 name="search", 886 location=OpenApiParameter.QUERY, 887 type=OpenApiTypes.STR, 888 ) 889 ], 890 ) 891 @action(detail=False, pagination_class=None) 892 def paths(self, request: Request) -> Response: 893 """Get all user paths""" 894 return Response( 895 data={ 896 "paths": list( 897 self.filter_queryset(self.get_queryset()) 898 .values("path") 899 .distinct() 900 .order_by("path") 901 .values_list("path", flat=True) 902 ) 903 } 904 )
Get all user paths
906 def partial_update(self, request: Request, *args, **kwargs) -> Response: 907 response = super().partial_update(request, *args, **kwargs) 908 instance: User = self.get_object() 909 if not instance.is_active: 910 Session.objects.filter(authenticatedsession__user=instance).delete() 911 LOGGER.debug("Deleted user's sessions", user=instance.username) 912 return response