authentik.stages.authenticator_sms.stage

SMS Setup stage

  1"""SMS Setup stage"""
  2
  3from django.db.models import Q
  4from django.http import HttpRequest, HttpResponse
  5from django.http.request import QueryDict
  6from django.utils.translation import gettext_lazy as _
  7from rest_framework.exceptions import ValidationError
  8from rest_framework.fields import BooleanField, CharField
  9
 10from authentik.flows.challenge import (
 11    Challenge,
 12    ChallengeResponse,
 13    WithUserInfoChallenge,
 14)
 15from authentik.flows.stage import ChallengeStageView
 16from authentik.stages.authenticator_sms.models import (
 17    AuthenticatorSMSStage,
 18    SMSDevice,
 19    hash_phone_number,
 20)
 21from authentik.stages.prompt.stage import PLAN_CONTEXT_PROMPT
 22
 23PLAN_CONTEXT_SMS_DEVICE = "goauthentik.io/stages/authenticator_sms/sms_device"
 24PLAN_CONTEXT_PHONE = "phone"
 25
 26
 27class AuthenticatorSMSChallenge(WithUserInfoChallenge):
 28    """SMS Setup challenge"""
 29
 30    # Set to true if no previous prompt stage set the phone number
 31    # this stage will also check prompt_data.phone
 32    phone_number_required = BooleanField(default=True)
 33    component = CharField(default="ak-stage-authenticator-sms")
 34
 35
 36class AuthenticatorSMSChallengeResponse(ChallengeResponse):
 37    """SMS Challenge response, device is set by get_response_instance"""
 38
 39    device: SMSDevice
 40
 41    code = CharField(required=False)
 42    phone_number = CharField(required=False)
 43
 44    component = CharField(default="ak-stage-authenticator-sms")
 45
 46    def validate(self, attrs: dict) -> dict:
 47        """Check"""
 48        if "code" not in attrs:
 49            if "phone_number" not in attrs:
 50                raise ValidationError("phone_number required")
 51            self.device.phone_number = attrs["phone_number"]
 52            self.stage.validate_and_send(attrs["phone_number"])
 53            return super().validate(attrs)
 54        if not self.device.verify_token(str(attrs["code"])):
 55            raise ValidationError(_("Code does not match"))
 56        self.device.confirmed = True
 57        return super().validate(attrs)
 58
 59
 60class AuthenticatorSMSStageView(ChallengeStageView):
 61    """OTP sms Setup stage"""
 62
 63    response_class = AuthenticatorSMSChallengeResponse
 64
 65    def validate_and_send(self, phone_number: str):
 66        """Validate phone number and send message"""
 67        stage: AuthenticatorSMSStage = self.executor.current_stage
 68        hashed_number = hash_phone_number(phone_number)
 69        query = Q(phone_number=hashed_number) | Q(phone_number=phone_number)
 70        if SMSDevice.objects.filter(query, stage=stage.pk).exists():
 71            raise ValidationError(_("Invalid phone number"))
 72        # No code yet, but we have a phone number, so send a verification message
 73        device: SMSDevice = self.executor.plan.context[PLAN_CONTEXT_SMS_DEVICE]
 74        stage.send(self.request, device.token, device)
 75
 76    def _has_phone_number(self) -> str | None:
 77        context = self.executor.plan.context
 78        if PLAN_CONTEXT_PHONE in context.get(PLAN_CONTEXT_PROMPT, {}):
 79            self.logger.debug("got phone number from plan context")
 80            return context.get(PLAN_CONTEXT_PROMPT, {}).get(PLAN_CONTEXT_PHONE)
 81        if PLAN_CONTEXT_SMS_DEVICE in self.executor.plan.context:
 82            self.logger.debug("got phone number from device in session")
 83            device: SMSDevice = self.executor.plan.context[PLAN_CONTEXT_SMS_DEVICE]
 84            if device.phone_number == "":
 85                return None
 86            return device.phone_number
 87        return None
 88
 89    def get_challenge(self, *args, **kwargs) -> Challenge:
 90        return AuthenticatorSMSChallenge(
 91            data={
 92                "phone_number_required": self._has_phone_number() is None,
 93            }
 94        )
 95
 96    def get_response_instance(self, data: QueryDict) -> ChallengeResponse:
 97        response = super().get_response_instance(data)
 98        response.device = self.executor.plan.context[PLAN_CONTEXT_SMS_DEVICE]
 99        return response
100
101    def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
102        user = self.get_pending_user()
103
104        stage: AuthenticatorSMSStage = self.executor.current_stage
105
106        if PLAN_CONTEXT_SMS_DEVICE not in self.executor.plan.context:
107            device = SMSDevice(user=user, confirmed=False, stage=stage, name="SMS Device")
108            device.generate_token(commit=False)
109            self.executor.plan.context[PLAN_CONTEXT_SMS_DEVICE] = device
110            if phone_number := self._has_phone_number():
111                device.phone_number = phone_number
112                try:
113                    self.validate_and_send(phone_number)
114                except ValidationError as exc:
115                    # We had a phone number given already (at this point only possible from flow
116                    # context), but an error occurred while sending a number (most likely)
117                    # due to a duplicate device, so delete the number we got given, reset the state
118                    # (ish) and retry
119                    device.phone_number = ""
120                    self.executor.plan.context.get(PLAN_CONTEXT_PROMPT, {}).pop(
121                        PLAN_CONTEXT_PHONE, None
122                    )
123                    self.executor.plan.context.pop(PLAN_CONTEXT_SMS_DEVICE, None)
124                    self.logger.warning("failed to send SMS message to pre-set number", exc=exc)
125                    return self.get(request, *args, **kwargs)
126        return super().get(request, *args, **kwargs)
127
128    def challenge_valid(self, response: ChallengeResponse) -> HttpResponse:
129        """SMS Token is validated by challenge"""
130        device: SMSDevice = self.executor.plan.context[PLAN_CONTEXT_SMS_DEVICE]
131        if not device.confirmed:
132            return self.challenge_invalid(response)
133        stage: AuthenticatorSMSStage = self.executor.current_stage
134        if stage.verify_only:
135            self.logger.debug("Hashing number on device")
136            device.set_hashed_number()
137        device.save()
138        del self.executor.plan.context[PLAN_CONTEXT_SMS_DEVICE]
139        return self.executor.stage_ok()
PLAN_CONTEXT_SMS_DEVICE = 'goauthentik.io/stages/authenticator_sms/sms_device'
PLAN_CONTEXT_PHONE = 'phone'
class AuthenticatorSMSChallenge(authentik.flows.challenge.WithUserInfoChallenge):
28class AuthenticatorSMSChallenge(WithUserInfoChallenge):
29    """SMS Setup challenge"""
30
31    # Set to true if no previous prompt stage set the phone number
32    # this stage will also check prompt_data.phone
33    phone_number_required = BooleanField(default=True)
34    component = CharField(default="ak-stage-authenticator-sms")

SMS Setup challenge

phone_number_required
component
class AuthenticatorSMSChallengeResponse(authentik.flows.challenge.ChallengeResponse):
37class AuthenticatorSMSChallengeResponse(ChallengeResponse):
38    """SMS Challenge response, device is set by get_response_instance"""
39
40    device: SMSDevice
41
42    code = CharField(required=False)
43    phone_number = CharField(required=False)
44
45    component = CharField(default="ak-stage-authenticator-sms")
46
47    def validate(self, attrs: dict) -> dict:
48        """Check"""
49        if "code" not in attrs:
50            if "phone_number" not in attrs:
51                raise ValidationError("phone_number required")
52            self.device.phone_number = attrs["phone_number"]
53            self.stage.validate_and_send(attrs["phone_number"])
54            return super().validate(attrs)
55        if not self.device.verify_token(str(attrs["code"])):
56            raise ValidationError(_("Code does not match"))
57        self.device.confirmed = True
58        return super().validate(attrs)

SMS Challenge response, device is set by get_response_instance

code
phone_number
component
def validate(self, attrs: dict) -> dict:
47    def validate(self, attrs: dict) -> dict:
48        """Check"""
49        if "code" not in attrs:
50            if "phone_number" not in attrs:
51                raise ValidationError("phone_number required")
52            self.device.phone_number = attrs["phone_number"]
53            self.stage.validate_and_send(attrs["phone_number"])
54            return super().validate(attrs)
55        if not self.device.verify_token(str(attrs["code"])):
56            raise ValidationError(_("Code does not match"))
57        self.device.confirmed = True
58        return super().validate(attrs)

Check

class AuthenticatorSMSStageView(authentik.flows.stage.ChallengeStageView):
 61class AuthenticatorSMSStageView(ChallengeStageView):
 62    """OTP sms Setup stage"""
 63
 64    response_class = AuthenticatorSMSChallengeResponse
 65
 66    def validate_and_send(self, phone_number: str):
 67        """Validate phone number and send message"""
 68        stage: AuthenticatorSMSStage = self.executor.current_stage
 69        hashed_number = hash_phone_number(phone_number)
 70        query = Q(phone_number=hashed_number) | Q(phone_number=phone_number)
 71        if SMSDevice.objects.filter(query, stage=stage.pk).exists():
 72            raise ValidationError(_("Invalid phone number"))
 73        # No code yet, but we have a phone number, so send a verification message
 74        device: SMSDevice = self.executor.plan.context[PLAN_CONTEXT_SMS_DEVICE]
 75        stage.send(self.request, device.token, device)
 76
 77    def _has_phone_number(self) -> str | None:
 78        context = self.executor.plan.context
 79        if PLAN_CONTEXT_PHONE in context.get(PLAN_CONTEXT_PROMPT, {}):
 80            self.logger.debug("got phone number from plan context")
 81            return context.get(PLAN_CONTEXT_PROMPT, {}).get(PLAN_CONTEXT_PHONE)
 82        if PLAN_CONTEXT_SMS_DEVICE in self.executor.plan.context:
 83            self.logger.debug("got phone number from device in session")
 84            device: SMSDevice = self.executor.plan.context[PLAN_CONTEXT_SMS_DEVICE]
 85            if device.phone_number == "":
 86                return None
 87            return device.phone_number
 88        return None
 89
 90    def get_challenge(self, *args, **kwargs) -> Challenge:
 91        return AuthenticatorSMSChallenge(
 92            data={
 93                "phone_number_required": self._has_phone_number() is None,
 94            }
 95        )
 96
 97    def get_response_instance(self, data: QueryDict) -> ChallengeResponse:
 98        response = super().get_response_instance(data)
 99        response.device = self.executor.plan.context[PLAN_CONTEXT_SMS_DEVICE]
100        return response
101
102    def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
103        user = self.get_pending_user()
104
105        stage: AuthenticatorSMSStage = self.executor.current_stage
106
107        if PLAN_CONTEXT_SMS_DEVICE not in self.executor.plan.context:
108            device = SMSDevice(user=user, confirmed=False, stage=stage, name="SMS Device")
109            device.generate_token(commit=False)
110            self.executor.plan.context[PLAN_CONTEXT_SMS_DEVICE] = device
111            if phone_number := self._has_phone_number():
112                device.phone_number = phone_number
113                try:
114                    self.validate_and_send(phone_number)
115                except ValidationError as exc:
116                    # We had a phone number given already (at this point only possible from flow
117                    # context), but an error occurred while sending a number (most likely)
118                    # due to a duplicate device, so delete the number we got given, reset the state
119                    # (ish) and retry
120                    device.phone_number = ""
121                    self.executor.plan.context.get(PLAN_CONTEXT_PROMPT, {}).pop(
122                        PLAN_CONTEXT_PHONE, None
123                    )
124                    self.executor.plan.context.pop(PLAN_CONTEXT_SMS_DEVICE, None)
125                    self.logger.warning("failed to send SMS message to pre-set number", exc=exc)
126                    return self.get(request, *args, **kwargs)
127        return super().get(request, *args, **kwargs)
128
129    def challenge_valid(self, response: ChallengeResponse) -> HttpResponse:
130        """SMS Token is validated by challenge"""
131        device: SMSDevice = self.executor.plan.context[PLAN_CONTEXT_SMS_DEVICE]
132        if not device.confirmed:
133            return self.challenge_invalid(response)
134        stage: AuthenticatorSMSStage = self.executor.current_stage
135        if stage.verify_only:
136            self.logger.debug("Hashing number on device")
137            device.set_hashed_number()
138        device.save()
139        del self.executor.plan.context[PLAN_CONTEXT_SMS_DEVICE]
140        return self.executor.stage_ok()

OTP sms Setup stage

response_class = <class 'AuthenticatorSMSChallengeResponse'>
def validate_and_send(self, phone_number: str):
66    def validate_and_send(self, phone_number: str):
67        """Validate phone number and send message"""
68        stage: AuthenticatorSMSStage = self.executor.current_stage
69        hashed_number = hash_phone_number(phone_number)
70        query = Q(phone_number=hashed_number) | Q(phone_number=phone_number)
71        if SMSDevice.objects.filter(query, stage=stage.pk).exists():
72            raise ValidationError(_("Invalid phone number"))
73        # No code yet, but we have a phone number, so send a verification message
74        device: SMSDevice = self.executor.plan.context[PLAN_CONTEXT_SMS_DEVICE]
75        stage.send(self.request, device.token, device)

Validate phone number and send message

def get_challenge(self, *args, **kwargs) -> authentik.flows.challenge.Challenge:
90    def get_challenge(self, *args, **kwargs) -> Challenge:
91        return AuthenticatorSMSChallenge(
92            data={
93                "phone_number_required": self._has_phone_number() is None,
94            }
95        )

Return the challenge that the client should solve

def get_response_instance( self, data: django.http.request.QueryDict) -> authentik.flows.challenge.ChallengeResponse:
 97    def get_response_instance(self, data: QueryDict) -> ChallengeResponse:
 98        response = super().get_response_instance(data)
 99        response.device = self.executor.plan.context[PLAN_CONTEXT_SMS_DEVICE]
100        return response

Return the response class type

def get( self, request: django.http.request.HttpRequest, *args, **kwargs) -> django.http.response.HttpResponse:
102    def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
103        user = self.get_pending_user()
104
105        stage: AuthenticatorSMSStage = self.executor.current_stage
106
107        if PLAN_CONTEXT_SMS_DEVICE not in self.executor.plan.context:
108            device = SMSDevice(user=user, confirmed=False, stage=stage, name="SMS Device")
109            device.generate_token(commit=False)
110            self.executor.plan.context[PLAN_CONTEXT_SMS_DEVICE] = device
111            if phone_number := self._has_phone_number():
112                device.phone_number = phone_number
113                try:
114                    self.validate_and_send(phone_number)
115                except ValidationError as exc:
116                    # We had a phone number given already (at this point only possible from flow
117                    # context), but an error occurred while sending a number (most likely)
118                    # due to a duplicate device, so delete the number we got given, reset the state
119                    # (ish) and retry
120                    device.phone_number = ""
121                    self.executor.plan.context.get(PLAN_CONTEXT_PROMPT, {}).pop(
122                        PLAN_CONTEXT_PHONE, None
123                    )
124                    self.executor.plan.context.pop(PLAN_CONTEXT_SMS_DEVICE, None)
125                    self.logger.warning("failed to send SMS message to pre-set number", exc=exc)
126                    return self.get(request, *args, **kwargs)
127        return super().get(request, *args, **kwargs)

Return a challenge for the frontend to solve

def challenge_valid( self, response: authentik.flows.challenge.ChallengeResponse) -> django.http.response.HttpResponse:
129    def challenge_valid(self, response: ChallengeResponse) -> HttpResponse:
130        """SMS Token is validated by challenge"""
131        device: SMSDevice = self.executor.plan.context[PLAN_CONTEXT_SMS_DEVICE]
132        if not device.confirmed:
133            return self.challenge_invalid(response)
134        stage: AuthenticatorSMSStage = self.executor.current_stage
135        if stage.verify_only:
136            self.logger.debug("Hashing number on device")
137            device.set_hashed_number()
138        device.save()
139        del self.executor.plan.context[PLAN_CONTEXT_SMS_DEVICE]
140        return self.executor.stage_ok()

SMS Token is validated by challenge