authentik.stages.authenticator_sms.stage
SMS Setup stage
1"""SMS Setup stage""" 2 3from django.db.models import Q 4from django.http import HttpRequest, HttpResponse 5from django.http.request import QueryDict 6from django.utils.translation import gettext_lazy as _ 7from rest_framework.exceptions import ValidationError 8from rest_framework.fields import BooleanField, CharField 9 10from authentik.flows.challenge import ( 11 Challenge, 12 ChallengeResponse, 13 WithUserInfoChallenge, 14) 15from authentik.flows.stage import ChallengeStageView 16from authentik.stages.authenticator_sms.models import ( 17 AuthenticatorSMSStage, 18 SMSDevice, 19 hash_phone_number, 20) 21from authentik.stages.prompt.stage import PLAN_CONTEXT_PROMPT 22 23PLAN_CONTEXT_SMS_DEVICE = "goauthentik.io/stages/authenticator_sms/sms_device" 24PLAN_CONTEXT_PHONE = "phone" 25 26 27class AuthenticatorSMSChallenge(WithUserInfoChallenge): 28 """SMS Setup challenge""" 29 30 # Set to true if no previous prompt stage set the phone number 31 # this stage will also check prompt_data.phone 32 phone_number_required = BooleanField(default=True) 33 component = CharField(default="ak-stage-authenticator-sms") 34 35 36class AuthenticatorSMSChallengeResponse(ChallengeResponse): 37 """SMS Challenge response, device is set by get_response_instance""" 38 39 device: SMSDevice 40 41 code = CharField(required=False) 42 phone_number = CharField(required=False) 43 44 component = CharField(default="ak-stage-authenticator-sms") 45 46 def validate(self, attrs: dict) -> dict: 47 """Check""" 48 if "code" not in attrs: 49 if "phone_number" not in attrs: 50 raise ValidationError("phone_number required") 51 self.device.phone_number = attrs["phone_number"] 52 self.stage.validate_and_send(attrs["phone_number"]) 53 return super().validate(attrs) 54 if not self.device.verify_token(str(attrs["code"])): 55 raise ValidationError(_("Code does not match")) 56 self.device.confirmed = True 57 return super().validate(attrs) 58 59 60class AuthenticatorSMSStageView(ChallengeStageView): 61 """OTP sms Setup stage""" 62 63 response_class = AuthenticatorSMSChallengeResponse 64 65 def validate_and_send(self, phone_number: str): 66 """Validate phone number and send message""" 67 stage: AuthenticatorSMSStage = self.executor.current_stage 68 hashed_number = hash_phone_number(phone_number) 69 query = Q(phone_number=hashed_number) | Q(phone_number=phone_number) 70 if SMSDevice.objects.filter(query, stage=stage.pk).exists(): 71 raise ValidationError(_("Invalid phone number")) 72 # No code yet, but we have a phone number, so send a verification message 73 device: SMSDevice = self.executor.plan.context[PLAN_CONTEXT_SMS_DEVICE] 74 stage.send(self.request, device.token, device) 75 76 def _has_phone_number(self) -> str | None: 77 context = self.executor.plan.context 78 if PLAN_CONTEXT_PHONE in context.get(PLAN_CONTEXT_PROMPT, {}): 79 self.logger.debug("got phone number from plan context") 80 return context.get(PLAN_CONTEXT_PROMPT, {}).get(PLAN_CONTEXT_PHONE) 81 if PLAN_CONTEXT_SMS_DEVICE in self.executor.plan.context: 82 self.logger.debug("got phone number from device in session") 83 device: SMSDevice = self.executor.plan.context[PLAN_CONTEXT_SMS_DEVICE] 84 if device.phone_number == "": 85 return None 86 return device.phone_number 87 return None 88 89 def get_challenge(self, *args, **kwargs) -> Challenge: 90 return AuthenticatorSMSChallenge( 91 data={ 92 "phone_number_required": self._has_phone_number() is None, 93 } 94 ) 95 96 def get_response_instance(self, data: QueryDict) -> ChallengeResponse: 97 response = super().get_response_instance(data) 98 response.device = self.executor.plan.context[PLAN_CONTEXT_SMS_DEVICE] 99 return response 100 101 def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 102 user = self.get_pending_user() 103 104 stage: AuthenticatorSMSStage = self.executor.current_stage 105 106 if PLAN_CONTEXT_SMS_DEVICE not in self.executor.plan.context: 107 device = SMSDevice(user=user, confirmed=False, stage=stage, name="SMS Device") 108 device.generate_token(commit=False) 109 self.executor.plan.context[PLAN_CONTEXT_SMS_DEVICE] = device 110 if phone_number := self._has_phone_number(): 111 device.phone_number = phone_number 112 try: 113 self.validate_and_send(phone_number) 114 except ValidationError as exc: 115 # We had a phone number given already (at this point only possible from flow 116 # context), but an error occurred while sending a number (most likely) 117 # due to a duplicate device, so delete the number we got given, reset the state 118 # (ish) and retry 119 device.phone_number = "" 120 self.executor.plan.context.get(PLAN_CONTEXT_PROMPT, {}).pop( 121 PLAN_CONTEXT_PHONE, None 122 ) 123 self.executor.plan.context.pop(PLAN_CONTEXT_SMS_DEVICE, None) 124 self.logger.warning("failed to send SMS message to pre-set number", exc=exc) 125 return self.get(request, *args, **kwargs) 126 return super().get(request, *args, **kwargs) 127 128 def challenge_valid(self, response: ChallengeResponse) -> HttpResponse: 129 """SMS Token is validated by challenge""" 130 device: SMSDevice = self.executor.plan.context[PLAN_CONTEXT_SMS_DEVICE] 131 if not device.confirmed: 132 return self.challenge_invalid(response) 133 stage: AuthenticatorSMSStage = self.executor.current_stage 134 if stage.verify_only: 135 self.logger.debug("Hashing number on device") 136 device.set_hashed_number() 137 device.save() 138 del self.executor.plan.context[PLAN_CONTEXT_SMS_DEVICE] 139 return self.executor.stage_ok()
PLAN_CONTEXT_SMS_DEVICE =
'goauthentik.io/stages/authenticator_sms/sms_device'
PLAN_CONTEXT_PHONE =
'phone'
28class AuthenticatorSMSChallenge(WithUserInfoChallenge): 29 """SMS Setup challenge""" 30 31 # Set to true if no previous prompt stage set the phone number 32 # this stage will also check prompt_data.phone 33 phone_number_required = BooleanField(default=True) 34 component = CharField(default="ak-stage-authenticator-sms")
SMS Setup challenge
37class AuthenticatorSMSChallengeResponse(ChallengeResponse): 38 """SMS Challenge response, device is set by get_response_instance""" 39 40 device: SMSDevice 41 42 code = CharField(required=False) 43 phone_number = CharField(required=False) 44 45 component = CharField(default="ak-stage-authenticator-sms") 46 47 def validate(self, attrs: dict) -> dict: 48 """Check""" 49 if "code" not in attrs: 50 if "phone_number" not in attrs: 51 raise ValidationError("phone_number required") 52 self.device.phone_number = attrs["phone_number"] 53 self.stage.validate_and_send(attrs["phone_number"]) 54 return super().validate(attrs) 55 if not self.device.verify_token(str(attrs["code"])): 56 raise ValidationError(_("Code does not match")) 57 self.device.confirmed = True 58 return super().validate(attrs)
SMS Challenge response, device is set by get_response_instance
def
validate(self, attrs: dict) -> dict:
47 def validate(self, attrs: dict) -> dict: 48 """Check""" 49 if "code" not in attrs: 50 if "phone_number" not in attrs: 51 raise ValidationError("phone_number required") 52 self.device.phone_number = attrs["phone_number"] 53 self.stage.validate_and_send(attrs["phone_number"]) 54 return super().validate(attrs) 55 if not self.device.verify_token(str(attrs["code"])): 56 raise ValidationError(_("Code does not match")) 57 self.device.confirmed = True 58 return super().validate(attrs)
Check
61class AuthenticatorSMSStageView(ChallengeStageView): 62 """OTP sms Setup stage""" 63 64 response_class = AuthenticatorSMSChallengeResponse 65 66 def validate_and_send(self, phone_number: str): 67 """Validate phone number and send message""" 68 stage: AuthenticatorSMSStage = self.executor.current_stage 69 hashed_number = hash_phone_number(phone_number) 70 query = Q(phone_number=hashed_number) | Q(phone_number=phone_number) 71 if SMSDevice.objects.filter(query, stage=stage.pk).exists(): 72 raise ValidationError(_("Invalid phone number")) 73 # No code yet, but we have a phone number, so send a verification message 74 device: SMSDevice = self.executor.plan.context[PLAN_CONTEXT_SMS_DEVICE] 75 stage.send(self.request, device.token, device) 76 77 def _has_phone_number(self) -> str | None: 78 context = self.executor.plan.context 79 if PLAN_CONTEXT_PHONE in context.get(PLAN_CONTEXT_PROMPT, {}): 80 self.logger.debug("got phone number from plan context") 81 return context.get(PLAN_CONTEXT_PROMPT, {}).get(PLAN_CONTEXT_PHONE) 82 if PLAN_CONTEXT_SMS_DEVICE in self.executor.plan.context: 83 self.logger.debug("got phone number from device in session") 84 device: SMSDevice = self.executor.plan.context[PLAN_CONTEXT_SMS_DEVICE] 85 if device.phone_number == "": 86 return None 87 return device.phone_number 88 return None 89 90 def get_challenge(self, *args, **kwargs) -> Challenge: 91 return AuthenticatorSMSChallenge( 92 data={ 93 "phone_number_required": self._has_phone_number() is None, 94 } 95 ) 96 97 def get_response_instance(self, data: QueryDict) -> ChallengeResponse: 98 response = super().get_response_instance(data) 99 response.device = self.executor.plan.context[PLAN_CONTEXT_SMS_DEVICE] 100 return response 101 102 def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 103 user = self.get_pending_user() 104 105 stage: AuthenticatorSMSStage = self.executor.current_stage 106 107 if PLAN_CONTEXT_SMS_DEVICE not in self.executor.plan.context: 108 device = SMSDevice(user=user, confirmed=False, stage=stage, name="SMS Device") 109 device.generate_token(commit=False) 110 self.executor.plan.context[PLAN_CONTEXT_SMS_DEVICE] = device 111 if phone_number := self._has_phone_number(): 112 device.phone_number = phone_number 113 try: 114 self.validate_and_send(phone_number) 115 except ValidationError as exc: 116 # We had a phone number given already (at this point only possible from flow 117 # context), but an error occurred while sending a number (most likely) 118 # due to a duplicate device, so delete the number we got given, reset the state 119 # (ish) and retry 120 device.phone_number = "" 121 self.executor.plan.context.get(PLAN_CONTEXT_PROMPT, {}).pop( 122 PLAN_CONTEXT_PHONE, None 123 ) 124 self.executor.plan.context.pop(PLAN_CONTEXT_SMS_DEVICE, None) 125 self.logger.warning("failed to send SMS message to pre-set number", exc=exc) 126 return self.get(request, *args, **kwargs) 127 return super().get(request, *args, **kwargs) 128 129 def challenge_valid(self, response: ChallengeResponse) -> HttpResponse: 130 """SMS Token is validated by challenge""" 131 device: SMSDevice = self.executor.plan.context[PLAN_CONTEXT_SMS_DEVICE] 132 if not device.confirmed: 133 return self.challenge_invalid(response) 134 stage: AuthenticatorSMSStage = self.executor.current_stage 135 if stage.verify_only: 136 self.logger.debug("Hashing number on device") 137 device.set_hashed_number() 138 device.save() 139 del self.executor.plan.context[PLAN_CONTEXT_SMS_DEVICE] 140 return self.executor.stage_ok()
OTP sms Setup stage
response_class =
<class 'AuthenticatorSMSChallengeResponse'>
def
validate_and_send(self, phone_number: str):
66 def validate_and_send(self, phone_number: str): 67 """Validate phone number and send message""" 68 stage: AuthenticatorSMSStage = self.executor.current_stage 69 hashed_number = hash_phone_number(phone_number) 70 query = Q(phone_number=hashed_number) | Q(phone_number=phone_number) 71 if SMSDevice.objects.filter(query, stage=stage.pk).exists(): 72 raise ValidationError(_("Invalid phone number")) 73 # No code yet, but we have a phone number, so send a verification message 74 device: SMSDevice = self.executor.plan.context[PLAN_CONTEXT_SMS_DEVICE] 75 stage.send(self.request, device.token, device)
Validate phone number and send message
90 def get_challenge(self, *args, **kwargs) -> Challenge: 91 return AuthenticatorSMSChallenge( 92 data={ 93 "phone_number_required": self._has_phone_number() is None, 94 } 95 )
Return the challenge that the client should solve
def
get_response_instance( self, data: django.http.request.QueryDict) -> authentik.flows.challenge.ChallengeResponse:
97 def get_response_instance(self, data: QueryDict) -> ChallengeResponse: 98 response = super().get_response_instance(data) 99 response.device = self.executor.plan.context[PLAN_CONTEXT_SMS_DEVICE] 100 return response
Return the response class type
def
get( self, request: django.http.request.HttpRequest, *args, **kwargs) -> django.http.response.HttpResponse:
102 def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 103 user = self.get_pending_user() 104 105 stage: AuthenticatorSMSStage = self.executor.current_stage 106 107 if PLAN_CONTEXT_SMS_DEVICE not in self.executor.plan.context: 108 device = SMSDevice(user=user, confirmed=False, stage=stage, name="SMS Device") 109 device.generate_token(commit=False) 110 self.executor.plan.context[PLAN_CONTEXT_SMS_DEVICE] = device 111 if phone_number := self._has_phone_number(): 112 device.phone_number = phone_number 113 try: 114 self.validate_and_send(phone_number) 115 except ValidationError as exc: 116 # We had a phone number given already (at this point only possible from flow 117 # context), but an error occurred while sending a number (most likely) 118 # due to a duplicate device, so delete the number we got given, reset the state 119 # (ish) and retry 120 device.phone_number = "" 121 self.executor.plan.context.get(PLAN_CONTEXT_PROMPT, {}).pop( 122 PLAN_CONTEXT_PHONE, None 123 ) 124 self.executor.plan.context.pop(PLAN_CONTEXT_SMS_DEVICE, None) 125 self.logger.warning("failed to send SMS message to pre-set number", exc=exc) 126 return self.get(request, *args, **kwargs) 127 return super().get(request, *args, **kwargs)
Return a challenge for the frontend to solve
def
challenge_valid( self, response: authentik.flows.challenge.ChallengeResponse) -> django.http.response.HttpResponse:
129 def challenge_valid(self, response: ChallengeResponse) -> HttpResponse: 130 """SMS Token is validated by challenge""" 131 device: SMSDevice = self.executor.plan.context[PLAN_CONTEXT_SMS_DEVICE] 132 if not device.confirmed: 133 return self.challenge_invalid(response) 134 stage: AuthenticatorSMSStage = self.executor.current_stage 135 if stage.verify_only: 136 self.logger.debug("Hashing number on device") 137 device.set_hashed_number() 138 device.save() 139 del self.executor.plan.context[PLAN_CONTEXT_SMS_DEVICE] 140 return self.executor.stage_ok()
SMS Token is validated by challenge