authentik.stages.authenticator_duo.stage
Duo stage
1"""Duo stage""" 2 3from django.http import HttpResponse 4from django.utils.timezone import now 5from rest_framework.fields import CharField 6 7from authentik.events.models import Event, EventAction 8from authentik.flows.challenge import ( 9 Challenge, 10 ChallengeResponse, 11 WithUserInfoChallenge, 12) 13from authentik.flows.stage import ChallengeStageView 14from authentik.flows.views.executor import InvalidStageError 15from authentik.stages.authenticator_duo.models import AuthenticatorDuoStage, DuoDevice 16 17PLAN_CONTEXT_DUO_ENROLL = "goauthentik.io/stages/authenticator_duo/enroll" 18 19 20class AuthenticatorDuoChallenge(WithUserInfoChallenge): 21 """Duo Challenge""" 22 23 activation_barcode = CharField() 24 activation_code = CharField() 25 stage_uuid = CharField() 26 component = CharField(default="ak-stage-authenticator-duo") 27 28 29class AuthenticatorDuoChallengeResponse(ChallengeResponse): 30 """Pseudo class for duo response""" 31 32 component = CharField(default="ak-stage-authenticator-duo") 33 34 35class AuthenticatorDuoStageView(ChallengeStageView): 36 """Duo stage""" 37 38 response_class = AuthenticatorDuoChallengeResponse 39 40 def duo_enroll(self): 41 """Enroll User with Duo API and save results""" 42 user = self.get_pending_user() 43 stage: AuthenticatorDuoStage = self.executor.current_stage 44 try: 45 enroll = stage.auth_client().enroll(user.username) 46 except RuntimeError as exc: 47 Event.new( 48 EventAction.CONFIGURATION_ERROR, 49 message=f"Failed to enroll user: {str(exc)}", 50 user=user, 51 ).from_http(self.request, user) 52 raise InvalidStageError(str(exc)) from exc 53 self.executor.plan.context[PLAN_CONTEXT_DUO_ENROLL] = enroll 54 return enroll 55 56 def get_challenge(self, *args, **kwargs) -> Challenge: 57 stage: AuthenticatorDuoStage = self.executor.current_stage 58 if PLAN_CONTEXT_DUO_ENROLL not in self.executor.plan.context: 59 self.duo_enroll() 60 enroll = self.executor.plan.context[PLAN_CONTEXT_DUO_ENROLL] 61 return AuthenticatorDuoChallenge( 62 data={ 63 "activation_barcode": enroll["activation_barcode"], 64 "activation_code": enroll["activation_code"], 65 "stage_uuid": str(stage.stage_uuid), 66 } 67 ) 68 69 def challenge_valid(self, response: ChallengeResponse) -> HttpResponse: 70 # Duo Challenge has already been validated 71 stage: AuthenticatorDuoStage = self.executor.current_stage 72 enroll = self.executor.plan.context.get(PLAN_CONTEXT_DUO_ENROLL) 73 enroll_status = stage.auth_client().enroll_status( 74 enroll["user_id"], enroll["activation_code"] 75 ) 76 if enroll_status != "success": 77 return self.executor.stage_invalid(f"Invalid enrollment status: {enroll_status}.") 78 existing_device = DuoDevice.objects.filter(duo_user_id=enroll["user_id"]).first() 79 self.executor.plan.context.pop(PLAN_CONTEXT_DUO_ENROLL) 80 if not existing_device: 81 DuoDevice.objects.create( 82 name="Duo Authenticator", 83 user=self.get_pending_user(), 84 duo_user_id=enroll["user_id"], 85 stage=stage, 86 last_t=now(), 87 ) 88 else: 89 return self.executor.stage_invalid("Device with Credential ID already exists.") 90 return self.executor.stage_ok()
PLAN_CONTEXT_DUO_ENROLL =
'goauthentik.io/stages/authenticator_duo/enroll'
21class AuthenticatorDuoChallenge(WithUserInfoChallenge): 22 """Duo Challenge""" 23 24 activation_barcode = CharField() 25 activation_code = CharField() 26 stage_uuid = CharField() 27 component = CharField(default="ak-stage-authenticator-duo")
Duo Challenge
30class AuthenticatorDuoChallengeResponse(ChallengeResponse): 31 """Pseudo class for duo response""" 32 33 component = CharField(default="ak-stage-authenticator-duo")
Pseudo class for duo response
36class AuthenticatorDuoStageView(ChallengeStageView): 37 """Duo stage""" 38 39 response_class = AuthenticatorDuoChallengeResponse 40 41 def duo_enroll(self): 42 """Enroll User with Duo API and save results""" 43 user = self.get_pending_user() 44 stage: AuthenticatorDuoStage = self.executor.current_stage 45 try: 46 enroll = stage.auth_client().enroll(user.username) 47 except RuntimeError as exc: 48 Event.new( 49 EventAction.CONFIGURATION_ERROR, 50 message=f"Failed to enroll user: {str(exc)}", 51 user=user, 52 ).from_http(self.request, user) 53 raise InvalidStageError(str(exc)) from exc 54 self.executor.plan.context[PLAN_CONTEXT_DUO_ENROLL] = enroll 55 return enroll 56 57 def get_challenge(self, *args, **kwargs) -> Challenge: 58 stage: AuthenticatorDuoStage = self.executor.current_stage 59 if PLAN_CONTEXT_DUO_ENROLL not in self.executor.plan.context: 60 self.duo_enroll() 61 enroll = self.executor.plan.context[PLAN_CONTEXT_DUO_ENROLL] 62 return AuthenticatorDuoChallenge( 63 data={ 64 "activation_barcode": enroll["activation_barcode"], 65 "activation_code": enroll["activation_code"], 66 "stage_uuid": str(stage.stage_uuid), 67 } 68 ) 69 70 def challenge_valid(self, response: ChallengeResponse) -> HttpResponse: 71 # Duo Challenge has already been validated 72 stage: AuthenticatorDuoStage = self.executor.current_stage 73 enroll = self.executor.plan.context.get(PLAN_CONTEXT_DUO_ENROLL) 74 enroll_status = stage.auth_client().enroll_status( 75 enroll["user_id"], enroll["activation_code"] 76 ) 77 if enroll_status != "success": 78 return self.executor.stage_invalid(f"Invalid enrollment status: {enroll_status}.") 79 existing_device = DuoDevice.objects.filter(duo_user_id=enroll["user_id"]).first() 80 self.executor.plan.context.pop(PLAN_CONTEXT_DUO_ENROLL) 81 if not existing_device: 82 DuoDevice.objects.create( 83 name="Duo Authenticator", 84 user=self.get_pending_user(), 85 duo_user_id=enroll["user_id"], 86 stage=stage, 87 last_t=now(), 88 ) 89 else: 90 return self.executor.stage_invalid("Device with Credential ID already exists.") 91 return self.executor.stage_ok()
Duo stage
response_class =
<class 'AuthenticatorDuoChallengeResponse'>
def
duo_enroll(self):
41 def duo_enroll(self): 42 """Enroll User with Duo API and save results""" 43 user = self.get_pending_user() 44 stage: AuthenticatorDuoStage = self.executor.current_stage 45 try: 46 enroll = stage.auth_client().enroll(user.username) 47 except RuntimeError as exc: 48 Event.new( 49 EventAction.CONFIGURATION_ERROR, 50 message=f"Failed to enroll user: {str(exc)}", 51 user=user, 52 ).from_http(self.request, user) 53 raise InvalidStageError(str(exc)) from exc 54 self.executor.plan.context[PLAN_CONTEXT_DUO_ENROLL] = enroll 55 return enroll
Enroll User with Duo API and save results
57 def get_challenge(self, *args, **kwargs) -> Challenge: 58 stage: AuthenticatorDuoStage = self.executor.current_stage 59 if PLAN_CONTEXT_DUO_ENROLL not in self.executor.plan.context: 60 self.duo_enroll() 61 enroll = self.executor.plan.context[PLAN_CONTEXT_DUO_ENROLL] 62 return AuthenticatorDuoChallenge( 63 data={ 64 "activation_barcode": enroll["activation_barcode"], 65 "activation_code": enroll["activation_code"], 66 "stage_uuid": str(stage.stage_uuid), 67 } 68 )
Return the challenge that the client should solve
def
challenge_valid( self, response: authentik.flows.challenge.ChallengeResponse) -> django.http.response.HttpResponse:
70 def challenge_valid(self, response: ChallengeResponse) -> HttpResponse: 71 # Duo Challenge has already been validated 72 stage: AuthenticatorDuoStage = self.executor.current_stage 73 enroll = self.executor.plan.context.get(PLAN_CONTEXT_DUO_ENROLL) 74 enroll_status = stage.auth_client().enroll_status( 75 enroll["user_id"], enroll["activation_code"] 76 ) 77 if enroll_status != "success": 78 return self.executor.stage_invalid(f"Invalid enrollment status: {enroll_status}.") 79 existing_device = DuoDevice.objects.filter(duo_user_id=enroll["user_id"]).first() 80 self.executor.plan.context.pop(PLAN_CONTEXT_DUO_ENROLL) 81 if not existing_device: 82 DuoDevice.objects.create( 83 name="Duo Authenticator", 84 user=self.get_pending_user(), 85 duo_user_id=enroll["user_id"], 86 stage=stage, 87 last_t=now(), 88 ) 89 else: 90 return self.executor.stage_invalid("Device with Credential ID already exists.") 91 return self.executor.stage_ok()
Callback when the challenge has the correct format