authentik.stages.authenticator_static.stage

Static OTP Setup stage

 1"""Static OTP Setup stage"""
 2
 3from django.http import HttpRequest, HttpResponse
 4from rest_framework.fields import CharField, ListField
 5
 6from authentik.flows.challenge import ChallengeResponse, WithUserInfoChallenge
 7from authentik.flows.stage import ChallengeStageView
 8from authentik.lib.generators import generate_id
 9from authentik.stages.authenticator_static.models import (
10    AuthenticatorStaticStage,
11    StaticDevice,
12    StaticToken,
13)
14
15SESSION_STATIC_DEVICE = "static_device"
16SESSION_STATIC_TOKENS = "static_device_tokens"
17
18
19class AuthenticatorStaticChallenge(WithUserInfoChallenge):
20    """Static authenticator challenge"""
21
22    codes = ListField(child=CharField())
23    component = CharField(default="ak-stage-authenticator-static")
24
25
26class AuthenticatorStaticChallengeResponse(ChallengeResponse):
27    """Pseudo class for static response"""
28
29    component = CharField(default="ak-stage-authenticator-static")
30
31
32class AuthenticatorStaticStageView(ChallengeStageView):
33    """Static OTP Setup stage"""
34
35    response_class = AuthenticatorStaticChallengeResponse
36
37    def get_challenge(self, *args, **kwargs) -> AuthenticatorStaticChallenge:
38        tokens: list[StaticToken] = self.request.session[SESSION_STATIC_TOKENS]
39        return AuthenticatorStaticChallenge(
40            data={
41                "codes": [token.token for token in tokens],
42            }
43        )
44
45    def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
46        user = self.get_pending_user()
47        if not user.is_authenticated:
48            self.logger.debug("No pending user, continuing")
49            return self.executor.stage_ok()
50
51        stage: AuthenticatorStaticStage = self.executor.current_stage
52
53        if SESSION_STATIC_DEVICE not in self.request.session:
54            device = StaticDevice(user=user, confirmed=False, name="Static Token")
55            tokens = []
56            for _ in range(0, stage.token_count):
57                tokens.append(
58                    StaticToken(device=device, token=generate_id(length=stage.token_length))
59                )
60            self.request.session[SESSION_STATIC_DEVICE] = device
61            self.request.session[SESSION_STATIC_TOKENS] = tokens
62        return super().get(request, *args, **kwargs)
63
64    def challenge_valid(self, response: ChallengeResponse) -> HttpResponse:
65        """Verify OTP Token"""
66        device: StaticDevice = self.request.session[SESSION_STATIC_DEVICE]
67        device.confirmed = True
68        device.save()
69        for token in self.request.session[SESSION_STATIC_TOKENS]:
70            token.save()
71        del self.request.session[SESSION_STATIC_DEVICE]
72        del self.request.session[SESSION_STATIC_TOKENS]
73        return self.executor.stage_ok()
SESSION_STATIC_DEVICE = 'static_device'
SESSION_STATIC_TOKENS = 'static_device_tokens'
class AuthenticatorStaticChallenge(authentik.flows.challenge.WithUserInfoChallenge):
20class AuthenticatorStaticChallenge(WithUserInfoChallenge):
21    """Static authenticator challenge"""
22
23    codes = ListField(child=CharField())
24    component = CharField(default="ak-stage-authenticator-static")

Static authenticator challenge

codes
component
class AuthenticatorStaticChallengeResponse(authentik.flows.challenge.ChallengeResponse):
27class AuthenticatorStaticChallengeResponse(ChallengeResponse):
28    """Pseudo class for static response"""
29
30    component = CharField(default="ak-stage-authenticator-static")

Pseudo class for static response

component
class AuthenticatorStaticStageView(authentik.flows.stage.ChallengeStageView):
33class AuthenticatorStaticStageView(ChallengeStageView):
34    """Static OTP Setup stage"""
35
36    response_class = AuthenticatorStaticChallengeResponse
37
38    def get_challenge(self, *args, **kwargs) -> AuthenticatorStaticChallenge:
39        tokens: list[StaticToken] = self.request.session[SESSION_STATIC_TOKENS]
40        return AuthenticatorStaticChallenge(
41            data={
42                "codes": [token.token for token in tokens],
43            }
44        )
45
46    def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
47        user = self.get_pending_user()
48        if not user.is_authenticated:
49            self.logger.debug("No pending user, continuing")
50            return self.executor.stage_ok()
51
52        stage: AuthenticatorStaticStage = self.executor.current_stage
53
54        if SESSION_STATIC_DEVICE not in self.request.session:
55            device = StaticDevice(user=user, confirmed=False, name="Static Token")
56            tokens = []
57            for _ in range(0, stage.token_count):
58                tokens.append(
59                    StaticToken(device=device, token=generate_id(length=stage.token_length))
60                )
61            self.request.session[SESSION_STATIC_DEVICE] = device
62            self.request.session[SESSION_STATIC_TOKENS] = tokens
63        return super().get(request, *args, **kwargs)
64
65    def challenge_valid(self, response: ChallengeResponse) -> HttpResponse:
66        """Verify OTP Token"""
67        device: StaticDevice = self.request.session[SESSION_STATIC_DEVICE]
68        device.confirmed = True
69        device.save()
70        for token in self.request.session[SESSION_STATIC_TOKENS]:
71            token.save()
72        del self.request.session[SESSION_STATIC_DEVICE]
73        del self.request.session[SESSION_STATIC_TOKENS]
74        return self.executor.stage_ok()

Static OTP Setup stage

response_class = <class 'AuthenticatorStaticChallengeResponse'>
def get_challenge( self, *args, **kwargs) -> AuthenticatorStaticChallenge:
38    def get_challenge(self, *args, **kwargs) -> AuthenticatorStaticChallenge:
39        tokens: list[StaticToken] = self.request.session[SESSION_STATIC_TOKENS]
40        return AuthenticatorStaticChallenge(
41            data={
42                "codes": [token.token for token in tokens],
43            }
44        )

Return the challenge that the client should solve

def get( self, request: django.http.request.HttpRequest, *args, **kwargs) -> django.http.response.HttpResponse:
46    def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
47        user = self.get_pending_user()
48        if not user.is_authenticated:
49            self.logger.debug("No pending user, continuing")
50            return self.executor.stage_ok()
51
52        stage: AuthenticatorStaticStage = self.executor.current_stage
53
54        if SESSION_STATIC_DEVICE not in self.request.session:
55            device = StaticDevice(user=user, confirmed=False, name="Static Token")
56            tokens = []
57            for _ in range(0, stage.token_count):
58                tokens.append(
59                    StaticToken(device=device, token=generate_id(length=stage.token_length))
60                )
61            self.request.session[SESSION_STATIC_DEVICE] = device
62            self.request.session[SESSION_STATIC_TOKENS] = tokens
63        return super().get(request, *args, **kwargs)

Return a challenge for the frontend to solve

def challenge_valid( self, response: authentik.flows.challenge.ChallengeResponse) -> django.http.response.HttpResponse:
65    def challenge_valid(self, response: ChallengeResponse) -> HttpResponse:
66        """Verify OTP Token"""
67        device: StaticDevice = self.request.session[SESSION_STATIC_DEVICE]
68        device.confirmed = True
69        device.save()
70        for token in self.request.session[SESSION_STATIC_TOKENS]:
71            token.save()
72        del self.request.session[SESSION_STATIC_DEVICE]
73        del self.request.session[SESSION_STATIC_TOKENS]
74        return self.executor.stage_ok()

Verify OTP Token