authentik.stages.authenticator_duo.stage

Duo stage

 1"""Duo stage"""
 2
 3from django.http import HttpResponse
 4from django.utils.timezone import now
 5from rest_framework.fields import CharField
 6
 7from authentik.events.models import Event, EventAction
 8from authentik.flows.challenge import (
 9    Challenge,
10    ChallengeResponse,
11    WithUserInfoChallenge,
12)
13from authentik.flows.stage import ChallengeStageView
14from authentik.flows.views.executor import InvalidStageError
15from authentik.stages.authenticator_duo.models import AuthenticatorDuoStage, DuoDevice
16
17PLAN_CONTEXT_DUO_ENROLL = "goauthentik.io/stages/authenticator_duo/enroll"
18
19
20class AuthenticatorDuoChallenge(WithUserInfoChallenge):
21    """Duo Challenge"""
22
23    activation_barcode = CharField()
24    activation_code = CharField()
25    stage_uuid = CharField()
26    component = CharField(default="ak-stage-authenticator-duo")
27
28
29class AuthenticatorDuoChallengeResponse(ChallengeResponse):
30    """Pseudo class for duo response"""
31
32    component = CharField(default="ak-stage-authenticator-duo")
33
34
35class AuthenticatorDuoStageView(ChallengeStageView):
36    """Duo stage"""
37
38    response_class = AuthenticatorDuoChallengeResponse
39
40    def duo_enroll(self):
41        """Enroll User with Duo API and save results"""
42        user = self.get_pending_user()
43        stage: AuthenticatorDuoStage = self.executor.current_stage
44        try:
45            enroll = stage.auth_client().enroll(user.username)
46        except RuntimeError as exc:
47            Event.new(
48                EventAction.CONFIGURATION_ERROR,
49                message=f"Failed to enroll user: {str(exc)}",
50                user=user,
51            ).from_http(self.request, user)
52            raise InvalidStageError(str(exc)) from exc
53        self.executor.plan.context[PLAN_CONTEXT_DUO_ENROLL] = enroll
54        return enroll
55
56    def get_challenge(self, *args, **kwargs) -> Challenge:
57        stage: AuthenticatorDuoStage = self.executor.current_stage
58        if PLAN_CONTEXT_DUO_ENROLL not in self.executor.plan.context:
59            self.duo_enroll()
60        enroll = self.executor.plan.context[PLAN_CONTEXT_DUO_ENROLL]
61        return AuthenticatorDuoChallenge(
62            data={
63                "activation_barcode": enroll["activation_barcode"],
64                "activation_code": enroll["activation_code"],
65                "stage_uuid": str(stage.stage_uuid),
66            }
67        )
68
69    def challenge_valid(self, response: ChallengeResponse) -> HttpResponse:
70        # Duo Challenge has already been validated
71        stage: AuthenticatorDuoStage = self.executor.current_stage
72        enroll = self.executor.plan.context.get(PLAN_CONTEXT_DUO_ENROLL)
73        enroll_status = stage.auth_client().enroll_status(
74            enroll["user_id"], enroll["activation_code"]
75        )
76        if enroll_status != "success":
77            return self.executor.stage_invalid(f"Invalid enrollment status: {enroll_status}.")
78        existing_device = DuoDevice.objects.filter(duo_user_id=enroll["user_id"]).first()
79        self.executor.plan.context.pop(PLAN_CONTEXT_DUO_ENROLL)
80        if not existing_device:
81            DuoDevice.objects.create(
82                name="Duo Authenticator",
83                user=self.get_pending_user(),
84                duo_user_id=enroll["user_id"],
85                stage=stage,
86                last_t=now(),
87            )
88        else:
89            return self.executor.stage_invalid("Device with Credential ID already exists.")
90        return self.executor.stage_ok()
PLAN_CONTEXT_DUO_ENROLL = 'goauthentik.io/stages/authenticator_duo/enroll'
class AuthenticatorDuoChallenge(authentik.flows.challenge.WithUserInfoChallenge):
21class AuthenticatorDuoChallenge(WithUserInfoChallenge):
22    """Duo Challenge"""
23
24    activation_barcode = CharField()
25    activation_code = CharField()
26    stage_uuid = CharField()
27    component = CharField(default="ak-stage-authenticator-duo")

Duo Challenge

activation_barcode
activation_code
stage_uuid
component
class AuthenticatorDuoChallengeResponse(authentik.flows.challenge.ChallengeResponse):
30class AuthenticatorDuoChallengeResponse(ChallengeResponse):
31    """Pseudo class for duo response"""
32
33    component = CharField(default="ak-stage-authenticator-duo")

Pseudo class for duo response

component
class AuthenticatorDuoStageView(authentik.flows.stage.ChallengeStageView):
36class AuthenticatorDuoStageView(ChallengeStageView):
37    """Duo stage"""
38
39    response_class = AuthenticatorDuoChallengeResponse
40
41    def duo_enroll(self):
42        """Enroll User with Duo API and save results"""
43        user = self.get_pending_user()
44        stage: AuthenticatorDuoStage = self.executor.current_stage
45        try:
46            enroll = stage.auth_client().enroll(user.username)
47        except RuntimeError as exc:
48            Event.new(
49                EventAction.CONFIGURATION_ERROR,
50                message=f"Failed to enroll user: {str(exc)}",
51                user=user,
52            ).from_http(self.request, user)
53            raise InvalidStageError(str(exc)) from exc
54        self.executor.plan.context[PLAN_CONTEXT_DUO_ENROLL] = enroll
55        return enroll
56
57    def get_challenge(self, *args, **kwargs) -> Challenge:
58        stage: AuthenticatorDuoStage = self.executor.current_stage
59        if PLAN_CONTEXT_DUO_ENROLL not in self.executor.plan.context:
60            self.duo_enroll()
61        enroll = self.executor.plan.context[PLAN_CONTEXT_DUO_ENROLL]
62        return AuthenticatorDuoChallenge(
63            data={
64                "activation_barcode": enroll["activation_barcode"],
65                "activation_code": enroll["activation_code"],
66                "stage_uuid": str(stage.stage_uuid),
67            }
68        )
69
70    def challenge_valid(self, response: ChallengeResponse) -> HttpResponse:
71        # Duo Challenge has already been validated
72        stage: AuthenticatorDuoStage = self.executor.current_stage
73        enroll = self.executor.plan.context.get(PLAN_CONTEXT_DUO_ENROLL)
74        enroll_status = stage.auth_client().enroll_status(
75            enroll["user_id"], enroll["activation_code"]
76        )
77        if enroll_status != "success":
78            return self.executor.stage_invalid(f"Invalid enrollment status: {enroll_status}.")
79        existing_device = DuoDevice.objects.filter(duo_user_id=enroll["user_id"]).first()
80        self.executor.plan.context.pop(PLAN_CONTEXT_DUO_ENROLL)
81        if not existing_device:
82            DuoDevice.objects.create(
83                name="Duo Authenticator",
84                user=self.get_pending_user(),
85                duo_user_id=enroll["user_id"],
86                stage=stage,
87                last_t=now(),
88            )
89        else:
90            return self.executor.stage_invalid("Device with Credential ID already exists.")
91        return self.executor.stage_ok()

Duo stage

response_class = <class 'AuthenticatorDuoChallengeResponse'>
def duo_enroll(self):
41    def duo_enroll(self):
42        """Enroll User with Duo API and save results"""
43        user = self.get_pending_user()
44        stage: AuthenticatorDuoStage = self.executor.current_stage
45        try:
46            enroll = stage.auth_client().enroll(user.username)
47        except RuntimeError as exc:
48            Event.new(
49                EventAction.CONFIGURATION_ERROR,
50                message=f"Failed to enroll user: {str(exc)}",
51                user=user,
52            ).from_http(self.request, user)
53            raise InvalidStageError(str(exc)) from exc
54        self.executor.plan.context[PLAN_CONTEXT_DUO_ENROLL] = enroll
55        return enroll

Enroll User with Duo API and save results

def get_challenge(self, *args, **kwargs) -> authentik.flows.challenge.Challenge:
57    def get_challenge(self, *args, **kwargs) -> Challenge:
58        stage: AuthenticatorDuoStage = self.executor.current_stage
59        if PLAN_CONTEXT_DUO_ENROLL not in self.executor.plan.context:
60            self.duo_enroll()
61        enroll = self.executor.plan.context[PLAN_CONTEXT_DUO_ENROLL]
62        return AuthenticatorDuoChallenge(
63            data={
64                "activation_barcode": enroll["activation_barcode"],
65                "activation_code": enroll["activation_code"],
66                "stage_uuid": str(stage.stage_uuid),
67            }
68        )

Return the challenge that the client should solve

def challenge_valid( self, response: authentik.flows.challenge.ChallengeResponse) -> django.http.response.HttpResponse:
70    def challenge_valid(self, response: ChallengeResponse) -> HttpResponse:
71        # Duo Challenge has already been validated
72        stage: AuthenticatorDuoStage = self.executor.current_stage
73        enroll = self.executor.plan.context.get(PLAN_CONTEXT_DUO_ENROLL)
74        enroll_status = stage.auth_client().enroll_status(
75            enroll["user_id"], enroll["activation_code"]
76        )
77        if enroll_status != "success":
78            return self.executor.stage_invalid(f"Invalid enrollment status: {enroll_status}.")
79        existing_device = DuoDevice.objects.filter(duo_user_id=enroll["user_id"]).first()
80        self.executor.plan.context.pop(PLAN_CONTEXT_DUO_ENROLL)
81        if not existing_device:
82            DuoDevice.objects.create(
83                name="Duo Authenticator",
84                user=self.get_pending_user(),
85                duo_user_id=enroll["user_id"],
86                stage=stage,
87                last_t=now(),
88            )
89        else:
90            return self.executor.stage_invalid("Device with Credential ID already exists.")
91        return self.executor.stage_ok()

Callback when the challenge has the correct format