authentik.enterprise.stages.account_lockdown.api
Account Lockdown Stage API Views
1"""Account Lockdown Stage API Views""" 2 3from django.utils.translation import gettext as _ 4from drf_spectacular.utils import OpenApiExample, OpenApiResponse, extend_schema 5from rest_framework.decorators import action 6from rest_framework.exceptions import ValidationError 7from rest_framework.permissions import IsAuthenticated 8from rest_framework.request import Request 9from rest_framework.response import Response 10from rest_framework.serializers import PrimaryKeyRelatedField 11from rest_framework.viewsets import ModelViewSet 12from structlog.stdlib import get_logger 13 14from authentik.api.validation import validate 15from authentik.core.api.used_by import UsedByMixin 16from authentik.core.api.utils import LinkSerializer, PassiveSerializer 17from authentik.core.models import ( 18 User, 19) 20from authentik.enterprise.api import EnterpriseRequiredMixin, enterprise_action 21from authentik.enterprise.stages.account_lockdown.models import AccountLockdownStage 22from authentik.enterprise.stages.account_lockdown.stage import ( 23 can_lock_user, 24 get_lockdown_target_users, 25) 26from authentik.flows.api.stages import StageSerializer 27from authentik.flows.exceptions import EmptyFlowException, FlowNonApplicableException 28from authentik.flows.planner import PLAN_CONTEXT_PENDING_USER, FlowPlanner 29 30LOGGER = get_logger() 31 32 33class AccountLockdownStageSerializer(EnterpriseRequiredMixin, StageSerializer): 34 """AccountLockdownStage Serializer""" 35 36 class Meta: 37 model = AccountLockdownStage 38 fields = StageSerializer.Meta.fields + [ 39 "deactivate_user", 40 "set_unusable_password", 41 "delete_sessions", 42 "revoke_tokens", 43 "self_service_completion_flow", 44 ] 45 46 47class AccountLockdownStageViewSet(UsedByMixin, ModelViewSet): 48 """AccountLockdownStage Viewset""" 49 50 queryset = AccountLockdownStage.objects.all() 51 serializer_class = AccountLockdownStageSerializer 52 filterset_fields = "__all__" 53 ordering = ["name"] 54 search_fields = ["name"] 55 56 57class UserAccountLockdownSerializer(PassiveSerializer): 58 """Choose the target account before starting the lockdown flow.""" 59 60 user = PrimaryKeyRelatedField( 61 queryset=get_lockdown_target_users(), 62 required=False, 63 allow_null=True, 64 help_text=_("User to lock. If omitted, locks the current user (self-service)."), 65 ) 66 67 68class UserAccountLockdownMixin: 69 """Enterprise account-lockdown API actions for UserViewSet.""" 70 71 def _create_lockdown_flow_url(self, request: Request, user: User) -> str: 72 """Create a flow URL for account lockdown. 73 74 The request body selects the target before the flow starts. The API 75 pre-plans the lockdown flow with the target as the pending user, so the 76 account lockdown stage can use the normal flow context. 77 """ 78 flow = request._request.brand.flow_lockdown 79 if flow is None: 80 raise ValidationError({"non_field_errors": [_("No lockdown flow configured.")]}) 81 planner = FlowPlanner(flow) 82 planner.use_cache = False 83 try: 84 plan = planner.plan(request._request, {PLAN_CONTEXT_PENDING_USER: user}) 85 except EmptyFlowException, FlowNonApplicableException: 86 raise ValidationError( 87 {"non_field_errors": [_("Lockdown flow is not applicable.")]} 88 ) from None 89 return plan.to_redirect(request._request, flow).url 90 91 @extend_schema( 92 description=_("Choose the target account, then return a flow link."), 93 request=UserAccountLockdownSerializer, 94 responses={ 95 "200": OpenApiResponse( 96 response=LinkSerializer, 97 examples=[ 98 OpenApiExample( 99 "Lockdown flow URL", 100 value={ 101 "link": "https://example.invalid/if/flow/default-account-lockdown/", 102 }, 103 response_only=True, 104 status_codes=["200"], 105 ) 106 ], 107 ), 108 "400": OpenApiResponse( 109 description=_("No lockdown flow configured or the flow is not applicable") 110 ), 111 "403": OpenApiResponse( 112 description=_("Permission denied (when targeting another user)") 113 ), 114 }, 115 ) 116 @action( 117 detail=False, 118 methods=["POST"], 119 permission_classes=[IsAuthenticated], 120 url_path="account_lockdown", 121 ) 122 @validate(UserAccountLockdownSerializer) 123 @enterprise_action 124 def account_lockdown(self, request: Request, body: UserAccountLockdownSerializer) -> Response: 125 """Trigger account lockdown for a user. 126 127 If no user is specified, locks the current user (self-service). 128 When targeting another user, admin permissions are required. 129 130 Returns a flow link for the frontend to follow. The flow is pre-planned 131 with the target user as pending user for the lockdown stage. 132 """ 133 user = body.validated_data.get("user") or request.user 134 135 if not can_lock_user(request.user, user): 136 LOGGER.debug("Permission denied for account lockdown", user=request.user) 137 self.permission_denied(request) 138 139 flow_url = self._create_lockdown_flow_url(request, user) 140 LOGGER.debug("Returning lockdown flow URL", flow_url=flow_url, user=user.username) 141 return Response({"link": flow_url})
LOGGER =
<BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
class
AccountLockdownStageSerializer(authentik.enterprise.api.EnterpriseRequiredMixin, authentik.flows.api.stages.StageSerializer):
34class AccountLockdownStageSerializer(EnterpriseRequiredMixin, StageSerializer): 35 """AccountLockdownStage Serializer""" 36 37 class Meta: 38 model = AccountLockdownStage 39 fields = StageSerializer.Meta.fields + [ 40 "deactivate_user", 41 "set_unusable_password", 42 "delete_sessions", 43 "revoke_tokens", 44 "self_service_completion_flow", 45 ]
AccountLockdownStage Serializer
Inherited Members
class
AccountLockdownStageSerializer.Meta:
37 class Meta: 38 model = AccountLockdownStage 39 fields = StageSerializer.Meta.fields + [ 40 "deactivate_user", 41 "set_unusable_password", 42 "delete_sessions", 43 "revoke_tokens", 44 "self_service_completion_flow", 45 ]
model =
<class 'authentik.enterprise.stages.account_lockdown.models.AccountLockdownStage'>
class
AccountLockdownStageViewSet(authentik.core.api.used_by.UsedByMixin, rest_framework.viewsets.ModelViewSet):
48class AccountLockdownStageViewSet(UsedByMixin, ModelViewSet): 49 """AccountLockdownStage Viewset""" 50 51 queryset = AccountLockdownStage.objects.all() 52 serializer_class = AccountLockdownStageSerializer 53 filterset_fields = "__all__" 54 ordering = ["name"] 55 search_fields = ["name"]
AccountLockdownStage Viewset
serializer_class =
<class 'AccountLockdownStageSerializer'>
Inherited Members
58class UserAccountLockdownSerializer(PassiveSerializer): 59 """Choose the target account before starting the lockdown flow.""" 60 61 user = PrimaryKeyRelatedField( 62 queryset=get_lockdown_target_users(), 63 required=False, 64 allow_null=True, 65 help_text=_("User to lock. If omitted, locks the current user (self-service)."), 66 )
Choose the target account before starting the lockdown flow.
Inherited Members
class
UserAccountLockdownMixin:
69class UserAccountLockdownMixin: 70 """Enterprise account-lockdown API actions for UserViewSet.""" 71 72 def _create_lockdown_flow_url(self, request: Request, user: User) -> str: 73 """Create a flow URL for account lockdown. 74 75 The request body selects the target before the flow starts. The API 76 pre-plans the lockdown flow with the target as the pending user, so the 77 account lockdown stage can use the normal flow context. 78 """ 79 flow = request._request.brand.flow_lockdown 80 if flow is None: 81 raise ValidationError({"non_field_errors": [_("No lockdown flow configured.")]}) 82 planner = FlowPlanner(flow) 83 planner.use_cache = False 84 try: 85 plan = planner.plan(request._request, {PLAN_CONTEXT_PENDING_USER: user}) 86 except EmptyFlowException, FlowNonApplicableException: 87 raise ValidationError( 88 {"non_field_errors": [_("Lockdown flow is not applicable.")]} 89 ) from None 90 return plan.to_redirect(request._request, flow).url 91 92 @extend_schema( 93 description=_("Choose the target account, then return a flow link."), 94 request=UserAccountLockdownSerializer, 95 responses={ 96 "200": OpenApiResponse( 97 response=LinkSerializer, 98 examples=[ 99 OpenApiExample( 100 "Lockdown flow URL", 101 value={ 102 "link": "https://example.invalid/if/flow/default-account-lockdown/", 103 }, 104 response_only=True, 105 status_codes=["200"], 106 ) 107 ], 108 ), 109 "400": OpenApiResponse( 110 description=_("No lockdown flow configured or the flow is not applicable") 111 ), 112 "403": OpenApiResponse( 113 description=_("Permission denied (when targeting another user)") 114 ), 115 }, 116 ) 117 @action( 118 detail=False, 119 methods=["POST"], 120 permission_classes=[IsAuthenticated], 121 url_path="account_lockdown", 122 ) 123 @validate(UserAccountLockdownSerializer) 124 @enterprise_action 125 def account_lockdown(self, request: Request, body: UserAccountLockdownSerializer) -> Response: 126 """Trigger account lockdown for a user. 127 128 If no user is specified, locks the current user (self-service). 129 When targeting another user, admin permissions are required. 130 131 Returns a flow link for the frontend to follow. The flow is pre-planned 132 with the target user as pending user for the lockdown stage. 133 """ 134 user = body.validated_data.get("user") or request.user 135 136 if not can_lock_user(request.user, user): 137 LOGGER.debug("Permission denied for account lockdown", user=request.user) 138 self.permission_denied(request) 139 140 flow_url = self._create_lockdown_flow_url(request, user) 141 LOGGER.debug("Returning lockdown flow URL", flow_url=flow_url, user=user.username) 142 return Response({"link": flow_url})
Enterprise account-lockdown API actions for UserViewSet.
@extend_schema(description=_('Choose the target account, then return a flow link.'), request=UserAccountLockdownSerializer, responses={'200': OpenApiResponse(response=LinkSerializer, examples=[OpenApiExample('Lockdown flow URL', value={'link': 'https://example.invalid/if/flow/default-account-lockdown/'}, response_only=True, status_codes=['200'])]), '400': OpenApiResponse(description=_('No lockdown flow configured or the flow is not applicable')), '403': OpenApiResponse(description=_('Permission denied (when targeting another user)'))})
@action(detail=False, methods=['POST'], permission_classes=[IsAuthenticated], url_path='account_lockdown')
@validate(UserAccountLockdownSerializer)
@enterprise_action
def
account_lockdown( self, request: rest_framework.request.Request, body: UserAccountLockdownSerializer) -> rest_framework.response.Response:
92 @extend_schema( 93 description=_("Choose the target account, then return a flow link."), 94 request=UserAccountLockdownSerializer, 95 responses={ 96 "200": OpenApiResponse( 97 response=LinkSerializer, 98 examples=[ 99 OpenApiExample( 100 "Lockdown flow URL", 101 value={ 102 "link": "https://example.invalid/if/flow/default-account-lockdown/", 103 }, 104 response_only=True, 105 status_codes=["200"], 106 ) 107 ], 108 ), 109 "400": OpenApiResponse( 110 description=_("No lockdown flow configured or the flow is not applicable") 111 ), 112 "403": OpenApiResponse( 113 description=_("Permission denied (when targeting another user)") 114 ), 115 }, 116 ) 117 @action( 118 detail=False, 119 methods=["POST"], 120 permission_classes=[IsAuthenticated], 121 url_path="account_lockdown", 122 ) 123 @validate(UserAccountLockdownSerializer) 124 @enterprise_action 125 def account_lockdown(self, request: Request, body: UserAccountLockdownSerializer) -> Response: 126 """Trigger account lockdown for a user. 127 128 If no user is specified, locks the current user (self-service). 129 When targeting another user, admin permissions are required. 130 131 Returns a flow link for the frontend to follow. The flow is pre-planned 132 with the target user as pending user for the lockdown stage. 133 """ 134 user = body.validated_data.get("user") or request.user 135 136 if not can_lock_user(request.user, user): 137 LOGGER.debug("Permission denied for account lockdown", user=request.user) 138 self.permission_denied(request) 139 140 flow_url = self._create_lockdown_flow_url(request, user) 141 LOGGER.debug("Returning lockdown flow URL", flow_url=flow_url, user=user.username) 142 return Response({"link": flow_url})
Trigger account lockdown for a user.
If no user is specified, locks the current user (self-service). When targeting another user, admin permissions are required.
Returns a flow link for the frontend to follow. The flow is pre-planned with the target user as pending user for the lockdown stage.