authentik.stages.authenticator_static.stage
Static OTP Setup stage
1"""Static OTP Setup stage""" 2 3from django.http import HttpRequest, HttpResponse 4from rest_framework.fields import CharField, ListField 5 6from authentik.flows.challenge import ChallengeResponse, WithUserInfoChallenge 7from authentik.flows.stage import ChallengeStageView 8from authentik.lib.generators import generate_id 9from authentik.stages.authenticator_static.models import ( 10 AuthenticatorStaticStage, 11 StaticDevice, 12 StaticToken, 13) 14 15SESSION_STATIC_DEVICE = "static_device" 16SESSION_STATIC_TOKENS = "static_device_tokens" 17 18 19class AuthenticatorStaticChallenge(WithUserInfoChallenge): 20 """Static authenticator challenge""" 21 22 codes = ListField(child=CharField()) 23 component = CharField(default="ak-stage-authenticator-static") 24 25 26class AuthenticatorStaticChallengeResponse(ChallengeResponse): 27 """Pseudo class for static response""" 28 29 component = CharField(default="ak-stage-authenticator-static") 30 31 32class AuthenticatorStaticStageView(ChallengeStageView): 33 """Static OTP Setup stage""" 34 35 response_class = AuthenticatorStaticChallengeResponse 36 37 def get_challenge(self, *args, **kwargs) -> AuthenticatorStaticChallenge: 38 tokens: list[StaticToken] = self.request.session[SESSION_STATIC_TOKENS] 39 return AuthenticatorStaticChallenge( 40 data={ 41 "codes": [token.token for token in tokens], 42 } 43 ) 44 45 def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 46 user = self.get_pending_user() 47 if not user.is_authenticated: 48 self.logger.debug("No pending user, continuing") 49 return self.executor.stage_ok() 50 51 stage: AuthenticatorStaticStage = self.executor.current_stage 52 53 if SESSION_STATIC_DEVICE not in self.request.session: 54 device = StaticDevice(user=user, confirmed=False, name="Static Token") 55 tokens = [] 56 for _ in range(0, stage.token_count): 57 tokens.append( 58 StaticToken(device=device, token=generate_id(length=stage.token_length)) 59 ) 60 self.request.session[SESSION_STATIC_DEVICE] = device 61 self.request.session[SESSION_STATIC_TOKENS] = tokens 62 return super().get(request, *args, **kwargs) 63 64 def challenge_valid(self, response: ChallengeResponse) -> HttpResponse: 65 """Verify OTP Token""" 66 device: StaticDevice = self.request.session[SESSION_STATIC_DEVICE] 67 device.confirmed = True 68 device.save() 69 for token in self.request.session[SESSION_STATIC_TOKENS]: 70 token.save() 71 del self.request.session[SESSION_STATIC_DEVICE] 72 del self.request.session[SESSION_STATIC_TOKENS] 73 return self.executor.stage_ok()
SESSION_STATIC_DEVICE =
'static_device'
SESSION_STATIC_TOKENS =
'static_device_tokens'
20class AuthenticatorStaticChallenge(WithUserInfoChallenge): 21 """Static authenticator challenge""" 22 23 codes = ListField(child=CharField()) 24 component = CharField(default="ak-stage-authenticator-static")
Static authenticator challenge
27class AuthenticatorStaticChallengeResponse(ChallengeResponse): 28 """Pseudo class for static response""" 29 30 component = CharField(default="ak-stage-authenticator-static")
Pseudo class for static response
33class AuthenticatorStaticStageView(ChallengeStageView): 34 """Static OTP Setup stage""" 35 36 response_class = AuthenticatorStaticChallengeResponse 37 38 def get_challenge(self, *args, **kwargs) -> AuthenticatorStaticChallenge: 39 tokens: list[StaticToken] = self.request.session[SESSION_STATIC_TOKENS] 40 return AuthenticatorStaticChallenge( 41 data={ 42 "codes": [token.token for token in tokens], 43 } 44 ) 45 46 def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 47 user = self.get_pending_user() 48 if not user.is_authenticated: 49 self.logger.debug("No pending user, continuing") 50 return self.executor.stage_ok() 51 52 stage: AuthenticatorStaticStage = self.executor.current_stage 53 54 if SESSION_STATIC_DEVICE not in self.request.session: 55 device = StaticDevice(user=user, confirmed=False, name="Static Token") 56 tokens = [] 57 for _ in range(0, stage.token_count): 58 tokens.append( 59 StaticToken(device=device, token=generate_id(length=stage.token_length)) 60 ) 61 self.request.session[SESSION_STATIC_DEVICE] = device 62 self.request.session[SESSION_STATIC_TOKENS] = tokens 63 return super().get(request, *args, **kwargs) 64 65 def challenge_valid(self, response: ChallengeResponse) -> HttpResponse: 66 """Verify OTP Token""" 67 device: StaticDevice = self.request.session[SESSION_STATIC_DEVICE] 68 device.confirmed = True 69 device.save() 70 for token in self.request.session[SESSION_STATIC_TOKENS]: 71 token.save() 72 del self.request.session[SESSION_STATIC_DEVICE] 73 del self.request.session[SESSION_STATIC_TOKENS] 74 return self.executor.stage_ok()
Static OTP Setup stage
response_class =
<class 'AuthenticatorStaticChallengeResponse'>
38 def get_challenge(self, *args, **kwargs) -> AuthenticatorStaticChallenge: 39 tokens: list[StaticToken] = self.request.session[SESSION_STATIC_TOKENS] 40 return AuthenticatorStaticChallenge( 41 data={ 42 "codes": [token.token for token in tokens], 43 } 44 )
Return the challenge that the client should solve
def
get( self, request: django.http.request.HttpRequest, *args, **kwargs) -> django.http.response.HttpResponse:
46 def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 47 user = self.get_pending_user() 48 if not user.is_authenticated: 49 self.logger.debug("No pending user, continuing") 50 return self.executor.stage_ok() 51 52 stage: AuthenticatorStaticStage = self.executor.current_stage 53 54 if SESSION_STATIC_DEVICE not in self.request.session: 55 device = StaticDevice(user=user, confirmed=False, name="Static Token") 56 tokens = [] 57 for _ in range(0, stage.token_count): 58 tokens.append( 59 StaticToken(device=device, token=generate_id(length=stage.token_length)) 60 ) 61 self.request.session[SESSION_STATIC_DEVICE] = device 62 self.request.session[SESSION_STATIC_TOKENS] = tokens 63 return super().get(request, *args, **kwargs)
Return a challenge for the frontend to solve
def
challenge_valid( self, response: authentik.flows.challenge.ChallengeResponse) -> django.http.response.HttpResponse:
65 def challenge_valid(self, response: ChallengeResponse) -> HttpResponse: 66 """Verify OTP Token""" 67 device: StaticDevice = self.request.session[SESSION_STATIC_DEVICE] 68 device.confirmed = True 69 device.save() 70 for token in self.request.session[SESSION_STATIC_TOKENS]: 71 token.save() 72 del self.request.session[SESSION_STATIC_DEVICE] 73 del self.request.session[SESSION_STATIC_TOKENS] 74 return self.executor.stage_ok()
Verify OTP Token