authentik.stages.authenticator_duo.api
AuthenticatorDuoStage API Views
1"""AuthenticatorDuoStage API Views""" 2 3from typing import Any 4 5from django.http import Http404 6from drf_spectacular.types import OpenApiTypes 7from drf_spectacular.utils import OpenApiResponse, extend_schema, inline_serializer 8from guardian.shortcuts import get_objects_for_user 9from rest_framework import mixins 10from rest_framework.decorators import action 11from rest_framework.fields import CharField, ChoiceField, IntegerField 12from rest_framework.permissions import AllowAny 13from rest_framework.request import Request 14from rest_framework.response import Response 15from rest_framework.viewsets import GenericViewSet, ModelViewSet 16from structlog.stdlib import get_logger 17 18from authentik.api.validation import Serializer, validate 19from authentik.core.api.groups import PartialUserSerializer 20from authentik.core.api.used_by import UsedByMixin 21from authentik.core.api.utils import ModelSerializer 22from authentik.core.models import User 23from authentik.flows.api.stages import StageSerializer 24from authentik.flows.auth import FlowActive 25from authentik.flows.planner import FlowPlan 26from authentik.rbac.decorators import permission_required 27from authentik.stages.authenticator_duo.models import AuthenticatorDuoStage, DuoDevice 28from authentik.stages.authenticator_duo.stage import PLAN_CONTEXT_DUO_ENROLL 29 30LOGGER = get_logger() 31 32 33class AuthenticatorDuoStageSerializer(StageSerializer): 34 """AuthenticatorDuoStage Serializer""" 35 36 class Meta: 37 model = AuthenticatorDuoStage 38 fields = StageSerializer.Meta.fields + [ 39 "configure_flow", 40 "friendly_name", 41 "client_id", 42 "client_secret", 43 "api_hostname", 44 "admin_integration_key", 45 "admin_secret_key", 46 ] 47 extra_kwargs = { 48 "client_secret": {"write_only": True}, 49 "admin_secret_key": {"write_only": True}, 50 } 51 52 53class AuthenticatorDuoStageManualDeviceImport(Serializer): 54 duo_user_id = CharField(required=True) 55 username = CharField(required=True) 56 57 58class AuthenticatorDuoStageViewSet(UsedByMixin, ModelViewSet): 59 """AuthenticatorDuoStage Viewset""" 60 61 queryset = AuthenticatorDuoStage.objects.all() 62 serializer_class = AuthenticatorDuoStageSerializer 63 filterset_fields = [ 64 "name", 65 "configure_flow", 66 "client_id", 67 "api_hostname", 68 ] 69 search_fields = ["name"] 70 ordering = ["name"] 71 72 @extend_schema( 73 request=OpenApiTypes.NONE, 74 responses={ 75 200: inline_serializer( 76 "DuoDeviceEnrollmentStatusSerializer", 77 { 78 "duo_response": ChoiceField( 79 ( 80 ("success", "Success"), 81 ("waiting", "Waiting"), 82 ("invalid", "Invalid"), 83 ) 84 ) 85 }, 86 ), 87 }, 88 ) 89 @action( 90 methods=["POST"], 91 detail=True, 92 authentication_classes=[FlowActive], 93 permission_classes=[AllowAny], 94 ) 95 def enrollment_status(self, request: Request, pk: str) -> Response: 96 """Check enrollment status of user details in current session""" 97 stage: AuthenticatorDuoStage = AuthenticatorDuoStage.objects.filter(pk=pk).first() 98 if not stage: 99 raise Http404 100 client = stage.auth_client() 101 plan: FlowPlan = request.auth 102 enroll = plan.context.get(PLAN_CONTEXT_DUO_ENROLL) 103 if not enroll: 104 return Response(status=400) 105 status = client.enroll_status(enroll["user_id"], enroll["activation_code"]) 106 return Response({"duo_response": status}) 107 108 @permission_required( 109 None, ["authentik_stages_authenticator_duo.add_duodevice", "authentik_core.view_user"] 110 ) 111 @extend_schema( 112 request=AuthenticatorDuoStageManualDeviceImport(), 113 responses={ 114 204: OpenApiResponse(description="Enrollment successful"), 115 400: OpenApiResponse(description="Bad request"), 116 }, 117 ) 118 @action(methods=["POST"], detail=True) 119 @validate(AuthenticatorDuoStageManualDeviceImport) 120 def import_device_manual( 121 self, request: Request, pk: str, body: AuthenticatorDuoStageManualDeviceImport 122 ) -> Response: 123 """Import duo devices into authentik""" 124 stage: AuthenticatorDuoStage = self.get_object() 125 user = ( 126 get_objects_for_user(request.user, "authentik_core.view_user") 127 .filter(username=body.validated_data.get("username", "")) 128 .first() 129 ) 130 if not user: 131 return Response(data={"non_field_errors": ["User does not exist."]}, status=400) 132 device = DuoDevice.objects.filter( 133 duo_user_id=body.validated_data.get("duo_user_id"), user=user, stage=stage 134 ).first() 135 if device: 136 return Response(data={"non_field_errors": ["Device exists already."]}, status=400) 137 DuoDevice.objects.create( 138 duo_user_id=body.validated_data.get("duo_user_id"), 139 user=user, 140 stage=stage, 141 confirmed=True, 142 name="Imported Duo Authenticator", 143 ) 144 return Response(status=204) 145 146 @permission_required( 147 "", ["authentik_stages_authenticator_duo.add_duodevice", "authentik_core.view_user"] 148 ) 149 @extend_schema( 150 request=None, 151 responses={ 152 200: inline_serializer( 153 "AuthenticatorDuoStageDeviceImportResponse", 154 fields={ 155 "count": IntegerField(read_only=True), 156 "error": CharField(read_only=True), 157 }, 158 ), 159 400: OpenApiResponse(description="Bad request"), 160 }, 161 ) 162 @action(methods=["POST"], detail=True) 163 def import_devices_automatic(self, request: Request, pk: str) -> Response: 164 """Import duo devices into authentik""" 165 stage: AuthenticatorDuoStage = self.get_object() 166 if stage.admin_integration_key == "": 167 return Response( 168 data={ 169 "non_field_errors": [ 170 "Stage does not have Admin API configured, " 171 "which is required for automatic imports." 172 ] 173 }, 174 status=400, 175 ) 176 result = self._duo_import_devices(stage) 177 return Response(data=result, status=200 if result["error"] == "" else 400) 178 179 def _duo_import_devices(self, stage: AuthenticatorDuoStage) -> dict[str, Any]: 180 """ 181 Import duo devices. This used to be a blocking task. 182 """ 183 created = 0 184 if stage.admin_integration_key == "": 185 LOGGER.info("Stage does not have admin integration configured", stage=stage) 186 return {"error": "Stage does not have admin integration configured", "count": created} 187 client = stage.admin_client() 188 try: 189 for duo_user in client.get_users_iterator(): 190 user_id = duo_user.get("user_id") 191 username = duo_user.get("username") 192 193 user = User.objects.filter(username=username).first() 194 if not user: 195 LOGGER.debug("User not found", username=username) 196 continue 197 device = DuoDevice.objects.filter( 198 duo_user_id=user_id, user=user, stage=stage 199 ).first() 200 if device: 201 LOGGER.debug("User already has a device with ID", id=user_id) 202 continue 203 DuoDevice.objects.create( 204 duo_user_id=user_id, 205 user=user, 206 stage=stage, 207 name="Imported Duo Authenticator", 208 ) 209 created += 1 210 return {"error": "", "count": created} 211 except RuntimeError as exc: 212 LOGGER.warning("failed to get users from duo", exc=exc) 213 return { 214 "error": "An internal error occurred while importing devices.", 215 "count": created, 216 } 217 218 219class DuoDeviceSerializer(ModelSerializer): 220 """Serializer for Duo authenticator devices""" 221 222 user = PartialUserSerializer(read_only=True) 223 224 class Meta: 225 model = DuoDevice 226 fields = ["pk", "name", "user"] 227 depth = 2 228 229 230class DuoDeviceViewSet( 231 mixins.RetrieveModelMixin, 232 mixins.UpdateModelMixin, 233 mixins.DestroyModelMixin, 234 UsedByMixin, 235 mixins.ListModelMixin, 236 GenericViewSet, 237): 238 """Viewset for Duo authenticator devices""" 239 240 queryset = DuoDevice.objects.all() 241 serializer_class = DuoDeviceSerializer 242 search_fields = ["name"] 243 filterset_fields = ["name"] 244 ordering = ["name"] 245 owner_field = "user" 246 247 248class DuoAdminDeviceViewSet(ModelViewSet): 249 """Viewset for Duo authenticator devices (for admins)""" 250 251 queryset = DuoDevice.objects.all() 252 serializer_class = DuoDeviceSerializer 253 search_fields = ["name"] 254 filterset_fields = ["name"] 255 ordering = ["name"]
34class AuthenticatorDuoStageSerializer(StageSerializer): 35 """AuthenticatorDuoStage Serializer""" 36 37 class Meta: 38 model = AuthenticatorDuoStage 39 fields = StageSerializer.Meta.fields + [ 40 "configure_flow", 41 "friendly_name", 42 "client_id", 43 "client_secret", 44 "api_hostname", 45 "admin_integration_key", 46 "admin_secret_key", 47 ] 48 extra_kwargs = { 49 "client_secret": {"write_only": True}, 50 "admin_secret_key": {"write_only": True}, 51 }
AuthenticatorDuoStage Serializer
Inherited Members
37 class Meta: 38 model = AuthenticatorDuoStage 39 fields = StageSerializer.Meta.fields + [ 40 "configure_flow", 41 "friendly_name", 42 "client_id", 43 "client_secret", 44 "api_hostname", 45 "admin_integration_key", 46 "admin_secret_key", 47 ] 48 extra_kwargs = { 49 "client_secret": {"write_only": True}, 50 "admin_secret_key": {"write_only": True}, 51 }
54class AuthenticatorDuoStageManualDeviceImport(Serializer): 55 duo_user_id = CharField(required=True) 56 username = CharField(required=True)
The BaseSerializer class provides a minimal class which may be used for writing custom serializer implementations.
Note that we strongly restrict the ordering of operations/properties that may be used on the serializer in order to enforce correct usage.
In particular, if a data= argument is passed then:
.is_valid() - Available.
.initial_data - Available.
.validated_data - Only available after calling is_valid()
.errors - Only available after calling is_valid()
.data - Only available after calling is_valid()
If a data= argument is not passed then:
.is_valid() - Not available. .initial_data - Not available. .validated_data - Not available. .errors - Not available. .data - Available.
59class AuthenticatorDuoStageViewSet(UsedByMixin, ModelViewSet): 60 """AuthenticatorDuoStage Viewset""" 61 62 queryset = AuthenticatorDuoStage.objects.all() 63 serializer_class = AuthenticatorDuoStageSerializer 64 filterset_fields = [ 65 "name", 66 "configure_flow", 67 "client_id", 68 "api_hostname", 69 ] 70 search_fields = ["name"] 71 ordering = ["name"] 72 73 @extend_schema( 74 request=OpenApiTypes.NONE, 75 responses={ 76 200: inline_serializer( 77 "DuoDeviceEnrollmentStatusSerializer", 78 { 79 "duo_response": ChoiceField( 80 ( 81 ("success", "Success"), 82 ("waiting", "Waiting"), 83 ("invalid", "Invalid"), 84 ) 85 ) 86 }, 87 ), 88 }, 89 ) 90 @action( 91 methods=["POST"], 92 detail=True, 93 authentication_classes=[FlowActive], 94 permission_classes=[AllowAny], 95 ) 96 def enrollment_status(self, request: Request, pk: str) -> Response: 97 """Check enrollment status of user details in current session""" 98 stage: AuthenticatorDuoStage = AuthenticatorDuoStage.objects.filter(pk=pk).first() 99 if not stage: 100 raise Http404 101 client = stage.auth_client() 102 plan: FlowPlan = request.auth 103 enroll = plan.context.get(PLAN_CONTEXT_DUO_ENROLL) 104 if not enroll: 105 return Response(status=400) 106 status = client.enroll_status(enroll["user_id"], enroll["activation_code"]) 107 return Response({"duo_response": status}) 108 109 @permission_required( 110 None, ["authentik_stages_authenticator_duo.add_duodevice", "authentik_core.view_user"] 111 ) 112 @extend_schema( 113 request=AuthenticatorDuoStageManualDeviceImport(), 114 responses={ 115 204: OpenApiResponse(description="Enrollment successful"), 116 400: OpenApiResponse(description="Bad request"), 117 }, 118 ) 119 @action(methods=["POST"], detail=True) 120 @validate(AuthenticatorDuoStageManualDeviceImport) 121 def import_device_manual( 122 self, request: Request, pk: str, body: AuthenticatorDuoStageManualDeviceImport 123 ) -> Response: 124 """Import duo devices into authentik""" 125 stage: AuthenticatorDuoStage = self.get_object() 126 user = ( 127 get_objects_for_user(request.user, "authentik_core.view_user") 128 .filter(username=body.validated_data.get("username", "")) 129 .first() 130 ) 131 if not user: 132 return Response(data={"non_field_errors": ["User does not exist."]}, status=400) 133 device = DuoDevice.objects.filter( 134 duo_user_id=body.validated_data.get("duo_user_id"), user=user, stage=stage 135 ).first() 136 if device: 137 return Response(data={"non_field_errors": ["Device exists already."]}, status=400) 138 DuoDevice.objects.create( 139 duo_user_id=body.validated_data.get("duo_user_id"), 140 user=user, 141 stage=stage, 142 confirmed=True, 143 name="Imported Duo Authenticator", 144 ) 145 return Response(status=204) 146 147 @permission_required( 148 "", ["authentik_stages_authenticator_duo.add_duodevice", "authentik_core.view_user"] 149 ) 150 @extend_schema( 151 request=None, 152 responses={ 153 200: inline_serializer( 154 "AuthenticatorDuoStageDeviceImportResponse", 155 fields={ 156 "count": IntegerField(read_only=True), 157 "error": CharField(read_only=True), 158 }, 159 ), 160 400: OpenApiResponse(description="Bad request"), 161 }, 162 ) 163 @action(methods=["POST"], detail=True) 164 def import_devices_automatic(self, request: Request, pk: str) -> Response: 165 """Import duo devices into authentik""" 166 stage: AuthenticatorDuoStage = self.get_object() 167 if stage.admin_integration_key == "": 168 return Response( 169 data={ 170 "non_field_errors": [ 171 "Stage does not have Admin API configured, " 172 "which is required for automatic imports." 173 ] 174 }, 175 status=400, 176 ) 177 result = self._duo_import_devices(stage) 178 return Response(data=result, status=200 if result["error"] == "" else 400) 179 180 def _duo_import_devices(self, stage: AuthenticatorDuoStage) -> dict[str, Any]: 181 """ 182 Import duo devices. This used to be a blocking task. 183 """ 184 created = 0 185 if stage.admin_integration_key == "": 186 LOGGER.info("Stage does not have admin integration configured", stage=stage) 187 return {"error": "Stage does not have admin integration configured", "count": created} 188 client = stage.admin_client() 189 try: 190 for duo_user in client.get_users_iterator(): 191 user_id = duo_user.get("user_id") 192 username = duo_user.get("username") 193 194 user = User.objects.filter(username=username).first() 195 if not user: 196 LOGGER.debug("User not found", username=username) 197 continue 198 device = DuoDevice.objects.filter( 199 duo_user_id=user_id, user=user, stage=stage 200 ).first() 201 if device: 202 LOGGER.debug("User already has a device with ID", id=user_id) 203 continue 204 DuoDevice.objects.create( 205 duo_user_id=user_id, 206 user=user, 207 stage=stage, 208 name="Imported Duo Authenticator", 209 ) 210 created += 1 211 return {"error": "", "count": created} 212 except RuntimeError as exc: 213 LOGGER.warning("failed to get users from duo", exc=exc) 214 return { 215 "error": "An internal error occurred while importing devices.", 216 "count": created, 217 }
AuthenticatorDuoStage Viewset
73 @extend_schema( 74 request=OpenApiTypes.NONE, 75 responses={ 76 200: inline_serializer( 77 "DuoDeviceEnrollmentStatusSerializer", 78 { 79 "duo_response": ChoiceField( 80 ( 81 ("success", "Success"), 82 ("waiting", "Waiting"), 83 ("invalid", "Invalid"), 84 ) 85 ) 86 }, 87 ), 88 }, 89 ) 90 @action( 91 methods=["POST"], 92 detail=True, 93 authentication_classes=[FlowActive], 94 permission_classes=[AllowAny], 95 ) 96 def enrollment_status(self, request: Request, pk: str) -> Response: 97 """Check enrollment status of user details in current session""" 98 stage: AuthenticatorDuoStage = AuthenticatorDuoStage.objects.filter(pk=pk).first() 99 if not stage: 100 raise Http404 101 client = stage.auth_client() 102 plan: FlowPlan = request.auth 103 enroll = plan.context.get(PLAN_CONTEXT_DUO_ENROLL) 104 if not enroll: 105 return Response(status=400) 106 status = client.enroll_status(enroll["user_id"], enroll["activation_code"]) 107 return Response({"duo_response": status})
Check enrollment status of user details in current session
109 @permission_required( 110 None, ["authentik_stages_authenticator_duo.add_duodevice", "authentik_core.view_user"] 111 ) 112 @extend_schema( 113 request=AuthenticatorDuoStageManualDeviceImport(), 114 responses={ 115 204: OpenApiResponse(description="Enrollment successful"), 116 400: OpenApiResponse(description="Bad request"), 117 }, 118 ) 119 @action(methods=["POST"], detail=True) 120 @validate(AuthenticatorDuoStageManualDeviceImport) 121 def import_device_manual( 122 self, request: Request, pk: str, body: AuthenticatorDuoStageManualDeviceImport 123 ) -> Response: 124 """Import duo devices into authentik""" 125 stage: AuthenticatorDuoStage = self.get_object() 126 user = ( 127 get_objects_for_user(request.user, "authentik_core.view_user") 128 .filter(username=body.validated_data.get("username", "")) 129 .first() 130 ) 131 if not user: 132 return Response(data={"non_field_errors": ["User does not exist."]}, status=400) 133 device = DuoDevice.objects.filter( 134 duo_user_id=body.validated_data.get("duo_user_id"), user=user, stage=stage 135 ).first() 136 if device: 137 return Response(data={"non_field_errors": ["Device exists already."]}, status=400) 138 DuoDevice.objects.create( 139 duo_user_id=body.validated_data.get("duo_user_id"), 140 user=user, 141 stage=stage, 142 confirmed=True, 143 name="Imported Duo Authenticator", 144 ) 145 return Response(status=204)
Import duo devices into authentik
147 @permission_required( 148 "", ["authentik_stages_authenticator_duo.add_duodevice", "authentik_core.view_user"] 149 ) 150 @extend_schema( 151 request=None, 152 responses={ 153 200: inline_serializer( 154 "AuthenticatorDuoStageDeviceImportResponse", 155 fields={ 156 "count": IntegerField(read_only=True), 157 "error": CharField(read_only=True), 158 }, 159 ), 160 400: OpenApiResponse(description="Bad request"), 161 }, 162 ) 163 @action(methods=["POST"], detail=True) 164 def import_devices_automatic(self, request: Request, pk: str) -> Response: 165 """Import duo devices into authentik""" 166 stage: AuthenticatorDuoStage = self.get_object() 167 if stage.admin_integration_key == "": 168 return Response( 169 data={ 170 "non_field_errors": [ 171 "Stage does not have Admin API configured, " 172 "which is required for automatic imports." 173 ] 174 }, 175 status=400, 176 ) 177 result = self._duo_import_devices(stage) 178 return Response(data=result, status=200 if result["error"] == "" else 400)
Import duo devices into authentik
Inherited Members
220class DuoDeviceSerializer(ModelSerializer): 221 """Serializer for Duo authenticator devices""" 222 223 user = PartialUserSerializer(read_only=True) 224 225 class Meta: 226 model = DuoDevice 227 fields = ["pk", "name", "user"] 228 depth = 2
Serializer for Duo authenticator devices
Inherited Members
231class DuoDeviceViewSet( 232 mixins.RetrieveModelMixin, 233 mixins.UpdateModelMixin, 234 mixins.DestroyModelMixin, 235 UsedByMixin, 236 mixins.ListModelMixin, 237 GenericViewSet, 238): 239 """Viewset for Duo authenticator devices""" 240 241 queryset = DuoDevice.objects.all() 242 serializer_class = DuoDeviceSerializer 243 search_fields = ["name"] 244 filterset_fields = ["name"] 245 ordering = ["name"] 246 owner_field = "user"
Viewset for Duo authenticator devices
Inherited Members
249class DuoAdminDeviceViewSet(ModelViewSet): 250 """Viewset for Duo authenticator devices (for admins)""" 251 252 queryset = DuoDevice.objects.all() 253 serializer_class = DuoDeviceSerializer 254 search_fields = ["name"] 255 filterset_fields = ["name"] 256 ordering = ["name"]
Viewset for Duo authenticator devices (for admins)