authentik.enterprise.stages.account_lockdown.api

Account Lockdown Stage API Views

  1"""Account Lockdown Stage API Views"""
  2
  3from django.utils.translation import gettext as _
  4from drf_spectacular.utils import OpenApiExample, OpenApiResponse, extend_schema
  5from rest_framework.decorators import action
  6from rest_framework.exceptions import ValidationError
  7from rest_framework.permissions import IsAuthenticated
  8from rest_framework.request import Request
  9from rest_framework.response import Response
 10from rest_framework.serializers import PrimaryKeyRelatedField
 11from rest_framework.viewsets import ModelViewSet
 12from structlog.stdlib import get_logger
 13
 14from authentik.api.validation import validate
 15from authentik.core.api.used_by import UsedByMixin
 16from authentik.core.api.utils import LinkSerializer, PassiveSerializer
 17from authentik.core.models import (
 18    User,
 19)
 20from authentik.enterprise.api import EnterpriseRequiredMixin, enterprise_action
 21from authentik.enterprise.stages.account_lockdown.models import AccountLockdownStage
 22from authentik.enterprise.stages.account_lockdown.stage import (
 23    can_lock_user,
 24    get_lockdown_target_users,
 25)
 26from authentik.flows.api.stages import StageSerializer
 27from authentik.flows.exceptions import EmptyFlowException, FlowNonApplicableException
 28from authentik.flows.planner import PLAN_CONTEXT_PENDING_USER, FlowPlanner
 29
 30LOGGER = get_logger()
 31
 32
 33class AccountLockdownStageSerializer(EnterpriseRequiredMixin, StageSerializer):
 34    """AccountLockdownStage Serializer"""
 35
 36    class Meta:
 37        model = AccountLockdownStage
 38        fields = StageSerializer.Meta.fields + [
 39            "deactivate_user",
 40            "set_unusable_password",
 41            "delete_sessions",
 42            "revoke_tokens",
 43            "self_service_completion_flow",
 44        ]
 45
 46
 47class AccountLockdownStageViewSet(UsedByMixin, ModelViewSet):
 48    """AccountLockdownStage Viewset"""
 49
 50    queryset = AccountLockdownStage.objects.all()
 51    serializer_class = AccountLockdownStageSerializer
 52    filterset_fields = "__all__"
 53    ordering = ["name"]
 54    search_fields = ["name"]
 55
 56
 57class UserAccountLockdownSerializer(PassiveSerializer):
 58    """Choose the target account before starting the lockdown flow."""
 59
 60    user = PrimaryKeyRelatedField(
 61        queryset=get_lockdown_target_users(),
 62        required=False,
 63        allow_null=True,
 64        help_text=_("User to lock. If omitted, locks the current user (self-service)."),
 65    )
 66
 67
 68class UserAccountLockdownMixin:
 69    """Enterprise account-lockdown API actions for UserViewSet."""
 70
 71    def _create_lockdown_flow_url(self, request: Request, user: User) -> str:
 72        """Create a flow URL for account lockdown.
 73
 74        The request body selects the target before the flow starts. The API
 75        pre-plans the lockdown flow with the target as the pending user, so the
 76        account lockdown stage can use the normal flow context.
 77        """
 78        flow = request._request.brand.flow_lockdown
 79        if flow is None:
 80            raise ValidationError({"non_field_errors": [_("No lockdown flow configured.")]})
 81        planner = FlowPlanner(flow)
 82        planner.use_cache = False
 83        try:
 84            plan = planner.plan(request._request, {PLAN_CONTEXT_PENDING_USER: user})
 85        except EmptyFlowException, FlowNonApplicableException:
 86            raise ValidationError(
 87                {"non_field_errors": [_("Lockdown flow is not applicable.")]}
 88            ) from None
 89        return plan.to_redirect(request._request, flow).url
 90
 91    @extend_schema(
 92        description=_("Choose the target account, then return a flow link."),
 93        request=UserAccountLockdownSerializer,
 94        responses={
 95            "200": OpenApiResponse(
 96                response=LinkSerializer,
 97                examples=[
 98                    OpenApiExample(
 99                        "Lockdown flow URL",
100                        value={
101                            "link": "https://example.invalid/if/flow/default-account-lockdown/",
102                        },
103                        response_only=True,
104                        status_codes=["200"],
105                    )
106                ],
107            ),
108            "400": OpenApiResponse(
109                description=_("No lockdown flow configured or the flow is not applicable")
110            ),
111            "403": OpenApiResponse(
112                description=_("Permission denied (when targeting another user)")
113            ),
114        },
115    )
116    @action(
117        detail=False,
118        methods=["POST"],
119        permission_classes=[IsAuthenticated],
120        url_path="account_lockdown",
121    )
122    @validate(UserAccountLockdownSerializer)
123    @enterprise_action
124    def account_lockdown(self, request: Request, body: UserAccountLockdownSerializer) -> Response:
125        """Trigger account lockdown for a user.
126
127        If no user is specified, locks the current user (self-service).
128        When targeting another user, admin permissions are required.
129
130        Returns a flow link for the frontend to follow. The flow is pre-planned
131        with the target user as pending user for the lockdown stage.
132        """
133        user = body.validated_data.get("user") or request.user
134
135        if not can_lock_user(request.user, user):
136            LOGGER.debug("Permission denied for account lockdown", user=request.user)
137            self.permission_denied(request)
138
139        flow_url = self._create_lockdown_flow_url(request, user)
140        LOGGER.debug("Returning lockdown flow URL", flow_url=flow_url, user=user.username)
141        return Response({"link": flow_url})
LOGGER = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
34class AccountLockdownStageSerializer(EnterpriseRequiredMixin, StageSerializer):
35    """AccountLockdownStage Serializer"""
36
37    class Meta:
38        model = AccountLockdownStage
39        fields = StageSerializer.Meta.fields + [
40            "deactivate_user",
41            "set_unusable_password",
42            "delete_sessions",
43            "revoke_tokens",
44            "self_service_completion_flow",
45        ]

AccountLockdownStage Serializer

class AccountLockdownStageSerializer.Meta:
37    class Meta:
38        model = AccountLockdownStage
39        fields = StageSerializer.Meta.fields + [
40            "deactivate_user",
41            "set_unusable_password",
42            "delete_sessions",
43            "revoke_tokens",
44            "self_service_completion_flow",
45        ]
fields = ['pk', 'name', 'component', 'verbose_name', 'verbose_name_plural', 'meta_model_name', 'flow_set', 'deactivate_user', 'set_unusable_password', 'delete_sessions', 'revoke_tokens', 'self_service_completion_flow']
class AccountLockdownStageViewSet(authentik.core.api.used_by.UsedByMixin, rest_framework.viewsets.ModelViewSet):
48class AccountLockdownStageViewSet(UsedByMixin, ModelViewSet):
49    """AccountLockdownStage Viewset"""
50
51    queryset = AccountLockdownStage.objects.all()
52    serializer_class = AccountLockdownStageSerializer
53    filterset_fields = "__all__"
54    ordering = ["name"]
55    search_fields = ["name"]

AccountLockdownStage Viewset

queryset = <InheritanceQuerySet []>
serializer_class = <class 'AccountLockdownStageSerializer'>
filterset_fields = '__all__'
ordering = ['name']
search_fields = ['name']
name = None
description = None
suffix = None
detail = None
basename = None
class UserAccountLockdownSerializer(authentik.core.api.utils.PassiveSerializer):
58class UserAccountLockdownSerializer(PassiveSerializer):
59    """Choose the target account before starting the lockdown flow."""
60
61    user = PrimaryKeyRelatedField(
62        queryset=get_lockdown_target_users(),
63        required=False,
64        allow_null=True,
65        help_text=_("User to lock. If omitted, locks the current user (self-service)."),
66    )

Choose the target account before starting the lockdown flow.

user
class UserAccountLockdownMixin:
 69class UserAccountLockdownMixin:
 70    """Enterprise account-lockdown API actions for UserViewSet."""
 71
 72    def _create_lockdown_flow_url(self, request: Request, user: User) -> str:
 73        """Create a flow URL for account lockdown.
 74
 75        The request body selects the target before the flow starts. The API
 76        pre-plans the lockdown flow with the target as the pending user, so the
 77        account lockdown stage can use the normal flow context.
 78        """
 79        flow = request._request.brand.flow_lockdown
 80        if flow is None:
 81            raise ValidationError({"non_field_errors": [_("No lockdown flow configured.")]})
 82        planner = FlowPlanner(flow)
 83        planner.use_cache = False
 84        try:
 85            plan = planner.plan(request._request, {PLAN_CONTEXT_PENDING_USER: user})
 86        except EmptyFlowException, FlowNonApplicableException:
 87            raise ValidationError(
 88                {"non_field_errors": [_("Lockdown flow is not applicable.")]}
 89            ) from None
 90        return plan.to_redirect(request._request, flow).url
 91
 92    @extend_schema(
 93        description=_("Choose the target account, then return a flow link."),
 94        request=UserAccountLockdownSerializer,
 95        responses={
 96            "200": OpenApiResponse(
 97                response=LinkSerializer,
 98                examples=[
 99                    OpenApiExample(
100                        "Lockdown flow URL",
101                        value={
102                            "link": "https://example.invalid/if/flow/default-account-lockdown/",
103                        },
104                        response_only=True,
105                        status_codes=["200"],
106                    )
107                ],
108            ),
109            "400": OpenApiResponse(
110                description=_("No lockdown flow configured or the flow is not applicable")
111            ),
112            "403": OpenApiResponse(
113                description=_("Permission denied (when targeting another user)")
114            ),
115        },
116    )
117    @action(
118        detail=False,
119        methods=["POST"],
120        permission_classes=[IsAuthenticated],
121        url_path="account_lockdown",
122    )
123    @validate(UserAccountLockdownSerializer)
124    @enterprise_action
125    def account_lockdown(self, request: Request, body: UserAccountLockdownSerializer) -> Response:
126        """Trigger account lockdown for a user.
127
128        If no user is specified, locks the current user (self-service).
129        When targeting another user, admin permissions are required.
130
131        Returns a flow link for the frontend to follow. The flow is pre-planned
132        with the target user as pending user for the lockdown stage.
133        """
134        user = body.validated_data.get("user") or request.user
135
136        if not can_lock_user(request.user, user):
137            LOGGER.debug("Permission denied for account lockdown", user=request.user)
138            self.permission_denied(request)
139
140        flow_url = self._create_lockdown_flow_url(request, user)
141        LOGGER.debug("Returning lockdown flow URL", flow_url=flow_url, user=user.username)
142        return Response({"link": flow_url})

Enterprise account-lockdown API actions for UserViewSet.

@extend_schema(description=_('Choose the target account, then return a flow link.'), request=UserAccountLockdownSerializer, responses={'200': OpenApiResponse(response=LinkSerializer, examples=[OpenApiExample('Lockdown flow URL', value={'link': 'https://example.invalid/if/flow/default-account-lockdown/'}, response_only=True, status_codes=['200'])]), '400': OpenApiResponse(description=_('No lockdown flow configured or the flow is not applicable')), '403': OpenApiResponse(description=_('Permission denied (when targeting another user)'))})
@action(detail=False, methods=['POST'], permission_classes=[IsAuthenticated], url_path='account_lockdown')
@validate(UserAccountLockdownSerializer)
@enterprise_action
def account_lockdown( self, request: rest_framework.request.Request, body: UserAccountLockdownSerializer) -> rest_framework.response.Response:
 92    @extend_schema(
 93        description=_("Choose the target account, then return a flow link."),
 94        request=UserAccountLockdownSerializer,
 95        responses={
 96            "200": OpenApiResponse(
 97                response=LinkSerializer,
 98                examples=[
 99                    OpenApiExample(
100                        "Lockdown flow URL",
101                        value={
102                            "link": "https://example.invalid/if/flow/default-account-lockdown/",
103                        },
104                        response_only=True,
105                        status_codes=["200"],
106                    )
107                ],
108            ),
109            "400": OpenApiResponse(
110                description=_("No lockdown flow configured or the flow is not applicable")
111            ),
112            "403": OpenApiResponse(
113                description=_("Permission denied (when targeting another user)")
114            ),
115        },
116    )
117    @action(
118        detail=False,
119        methods=["POST"],
120        permission_classes=[IsAuthenticated],
121        url_path="account_lockdown",
122    )
123    @validate(UserAccountLockdownSerializer)
124    @enterprise_action
125    def account_lockdown(self, request: Request, body: UserAccountLockdownSerializer) -> Response:
126        """Trigger account lockdown for a user.
127
128        If no user is specified, locks the current user (self-service).
129        When targeting another user, admin permissions are required.
130
131        Returns a flow link for the frontend to follow. The flow is pre-planned
132        with the target user as pending user for the lockdown stage.
133        """
134        user = body.validated_data.get("user") or request.user
135
136        if not can_lock_user(request.user, user):
137            LOGGER.debug("Permission denied for account lockdown", user=request.user)
138            self.permission_denied(request)
139
140        flow_url = self._create_lockdown_flow_url(request, user)
141        LOGGER.debug("Returning lockdown flow URL", flow_url=flow_url, user=user.username)
142        return Response({"link": flow_url})

Trigger account lockdown for a user.

If no user is specified, locks the current user (self-service). When targeting another user, admin permissions are required.

Returns a flow link for the frontend to follow. The flow is pre-planned with the target user as pending user for the lockdown stage.