authentik.stages.authenticator_sms.models
SMS Authenticator models
1"""SMS Authenticator models""" 2 3from hashlib import sha256 4 5from django.contrib.auth import get_user_model 6from django.db import models 7from django.http import HttpRequest, HttpResponseBadRequest 8from django.utils.translation import gettext_lazy as _ 9from django.views import View 10from requests.exceptions import RequestException 11from rest_framework.exceptions import ValidationError 12from rest_framework.serializers import BaseSerializer 13from structlog.stdlib import get_logger 14from twilio.base.exceptions import TwilioRestException 15from twilio.rest import Client 16 17from authentik.core.types import UserSettingSerializer 18from authentik.events.models import Event, EventAction, NotificationWebhookMapping 19from authentik.events.utils import sanitize_item 20from authentik.flows.models import ConfigurableStage, FriendlyNamedStage, Stage 21from authentik.lib.models import SerializerModel 22from authentik.lib.utils.http import get_http_session 23from authentik.stages.authenticator.models import SideChannelDevice 24 25LOGGER = get_logger() 26 27 28class SMSProviders(models.TextChoices): 29 """Supported SMS Providers""" 30 31 TWILIO = "twilio" 32 GENERIC = "generic" 33 34 35class SMSAuthTypes(models.TextChoices): 36 """Supported SMS Auth Types""" 37 38 BASIC = "basic" 39 BEARER = "bearer" 40 41 42class AuthenticatorSMSStage(ConfigurableStage, FriendlyNamedStage, Stage): 43 """Use SMS-based TOTP instead of authenticator-based.""" 44 45 provider = models.TextField(choices=SMSProviders.choices) 46 47 from_number = models.TextField() 48 49 account_sid = models.TextField() 50 auth = models.TextField() 51 auth_password = models.TextField(default="", blank=True) 52 auth_type = models.TextField(choices=SMSAuthTypes.choices, default=SMSAuthTypes.BASIC) 53 54 verify_only = models.BooleanField( 55 default=False, 56 help_text=_( 57 "When enabled, the Phone number is only used during enrollment to verify the " 58 "users authenticity. Only a hash of the phone number is saved to ensure it is " 59 "not reused in the future." 60 ), 61 ) 62 63 mapping = models.ForeignKey( 64 NotificationWebhookMapping, 65 null=True, 66 default=None, 67 on_delete=models.SET_NULL, 68 help_text=_("Optionally modify the payload being sent to custom providers."), 69 ) 70 71 def send(self, request: HttpRequest, token: str, device: SMSDevice): 72 """Send message via selected provider""" 73 if self.provider == SMSProviders.TWILIO: 74 return self.send_twilio(request, token, device) 75 if self.provider == SMSProviders.GENERIC: 76 return self.send_generic(request, token, device) 77 raise ValueError(f"invalid provider {self.provider}") 78 79 def get_message(self, token: str) -> str: 80 """Get SMS message""" 81 return _("Use this code to authenticate in authentik: {token}".format_map({"token": token})) 82 83 def send_twilio(self, request: HttpRequest, token: str, device: SMSDevice): 84 """send sms via twilio provider""" 85 client = Client(self.account_sid, self.auth) 86 message_body = str(self.get_message(token)) 87 if self.mapping: 88 payload = sanitize_item( 89 self.mapping.evaluate( 90 user=device.user, 91 request=request, 92 device=device, 93 token=token, 94 stage=self, 95 ) 96 ) 97 message_body = payload.get("message", message_body) 98 99 try: 100 message = client.messages.create( 101 to=device.phone_number, from_=self.from_number, body=message_body 102 ) 103 LOGGER.debug("Sent SMS", to=device, message=message.sid) 104 except TwilioRestException as exc: 105 LOGGER.warning("Error sending token by Twilio SMS", exc=exc, msg=exc.msg) 106 raise ValidationError(exc.msg) from None 107 108 def send_generic(self, request: HttpRequest, token: str, device: SMSDevice): 109 """Send SMS via outside API""" 110 payload = { 111 "From": self.from_number, 112 "To": device.phone_number, 113 "Body": token, 114 "Message": str(self.get_message(token)), 115 } 116 117 if self.mapping: 118 payload = sanitize_item( 119 self.mapping.evaluate( 120 user=device.user, 121 request=request, 122 device=device, 123 token=token, 124 stage=self, 125 ) 126 ) 127 128 if self.auth_type == SMSAuthTypes.BEARER: 129 response = get_http_session().post( 130 self.account_sid, 131 json=payload, 132 headers={"Authorization": f"Bearer {self.auth}"}, 133 ) 134 elif self.auth_type == SMSAuthTypes.BASIC: 135 response = get_http_session().post( 136 self.account_sid, 137 json=payload, 138 auth=(self.auth, self.auth_password), 139 ) 140 else: 141 raise ValueError(f"Invalid Auth type '{self.auth_type}'") 142 143 LOGGER.debug("Sent SMS", to=device.phone_number) 144 try: 145 response.raise_for_status() 146 except RequestException as exc: 147 LOGGER.warning( 148 "Error sending token by generic SMS", 149 exc=exc, 150 status=response.status_code, 151 body=response.text[:100], 152 ) 153 Event.new( 154 EventAction.CONFIGURATION_ERROR, 155 message="Error sending SMS", 156 status_code=response.status_code, 157 body=response.text, 158 ).with_exception(exc).set_user(device.user).save() 159 if response.status_code >= HttpResponseBadRequest.status_code: 160 raise ValidationError(response.text) from None 161 raise 162 163 @property 164 def serializer(self) -> type[BaseSerializer]: 165 from authentik.stages.authenticator_sms.api import AuthenticatorSMSStageSerializer 166 167 return AuthenticatorSMSStageSerializer 168 169 @property 170 def view(self) -> type[View]: 171 from authentik.stages.authenticator_sms.stage import AuthenticatorSMSStageView 172 173 return AuthenticatorSMSStageView 174 175 @property 176 def component(self) -> str: 177 return "ak-stage-authenticator-sms-form" 178 179 def ui_user_settings(self) -> UserSettingSerializer | None: 180 return UserSettingSerializer( 181 data={ 182 "title": self.friendly_name or str(self._meta.verbose_name), 183 "component": "ak-user-settings-authenticator-sms", 184 } 185 ) 186 187 def __str__(self) -> str: 188 return f"SMS Authenticator Setup Stage {self.name}" 189 190 class Meta: 191 verbose_name = _("SMS Authenticator Setup Stage") 192 verbose_name_plural = _("SMS Authenticator Setup Stages") 193 194 195def hash_phone_number(phone_number: str) -> str: 196 """Hash phone number with prefix""" 197 return "hash:" + sha256(phone_number.encode()).hexdigest() 198 199 200class SMSDevice(SerializerModel, SideChannelDevice): 201 """SMS Device""" 202 203 user = models.ForeignKey(get_user_model(), on_delete=models.CASCADE) 204 205 # Connect to the stage to when validating access we know the API Credentials 206 stage = models.ForeignKey(AuthenticatorSMSStage, on_delete=models.CASCADE) 207 208 phone_number = models.TextField() 209 210 last_t = models.DateTimeField(auto_now=True) 211 212 def set_hashed_number(self): 213 """Set phone_number to hashed number""" 214 self.phone_number = hash_phone_number(self.phone_number) 215 216 @property 217 def is_hashed(self) -> bool: 218 """Check if the phone number is hashed""" 219 return self.phone_number.startswith("hash:") 220 221 @property 222 def serializer(self) -> type[BaseSerializer]: 223 from authentik.stages.authenticator_sms.api import SMSDeviceSerializer 224 225 return SMSDeviceSerializer 226 227 def verify_token(self, token): 228 valid = super().verify_token(token) 229 if valid: 230 self.save() 231 return valid 232 233 def __str__(self): 234 return str(self.name) or str(self.user_id) 235 236 class Meta: 237 verbose_name = _("SMS Device") 238 verbose_name_plural = _("SMS Devices") 239 unique_together = (("stage", "phone_number"),)
29class SMSProviders(models.TextChoices): 30 """Supported SMS Providers""" 31 32 TWILIO = "twilio" 33 GENERIC = "generic"
Supported SMS Providers
36class SMSAuthTypes(models.TextChoices): 37 """Supported SMS Auth Types""" 38 39 BASIC = "basic" 40 BEARER = "bearer"
Supported SMS Auth Types
43class AuthenticatorSMSStage(ConfigurableStage, FriendlyNamedStage, Stage): 44 """Use SMS-based TOTP instead of authenticator-based.""" 45 46 provider = models.TextField(choices=SMSProviders.choices) 47 48 from_number = models.TextField() 49 50 account_sid = models.TextField() 51 auth = models.TextField() 52 auth_password = models.TextField(default="", blank=True) 53 auth_type = models.TextField(choices=SMSAuthTypes.choices, default=SMSAuthTypes.BASIC) 54 55 verify_only = models.BooleanField( 56 default=False, 57 help_text=_( 58 "When enabled, the Phone number is only used during enrollment to verify the " 59 "users authenticity. Only a hash of the phone number is saved to ensure it is " 60 "not reused in the future." 61 ), 62 ) 63 64 mapping = models.ForeignKey( 65 NotificationWebhookMapping, 66 null=True, 67 default=None, 68 on_delete=models.SET_NULL, 69 help_text=_("Optionally modify the payload being sent to custom providers."), 70 ) 71 72 def send(self, request: HttpRequest, token: str, device: SMSDevice): 73 """Send message via selected provider""" 74 if self.provider == SMSProviders.TWILIO: 75 return self.send_twilio(request, token, device) 76 if self.provider == SMSProviders.GENERIC: 77 return self.send_generic(request, token, device) 78 raise ValueError(f"invalid provider {self.provider}") 79 80 def get_message(self, token: str) -> str: 81 """Get SMS message""" 82 return _("Use this code to authenticate in authentik: {token}".format_map({"token": token})) 83 84 def send_twilio(self, request: HttpRequest, token: str, device: SMSDevice): 85 """send sms via twilio provider""" 86 client = Client(self.account_sid, self.auth) 87 message_body = str(self.get_message(token)) 88 if self.mapping: 89 payload = sanitize_item( 90 self.mapping.evaluate( 91 user=device.user, 92 request=request, 93 device=device, 94 token=token, 95 stage=self, 96 ) 97 ) 98 message_body = payload.get("message", message_body) 99 100 try: 101 message = client.messages.create( 102 to=device.phone_number, from_=self.from_number, body=message_body 103 ) 104 LOGGER.debug("Sent SMS", to=device, message=message.sid) 105 except TwilioRestException as exc: 106 LOGGER.warning("Error sending token by Twilio SMS", exc=exc, msg=exc.msg) 107 raise ValidationError(exc.msg) from None 108 109 def send_generic(self, request: HttpRequest, token: str, device: SMSDevice): 110 """Send SMS via outside API""" 111 payload = { 112 "From": self.from_number, 113 "To": device.phone_number, 114 "Body": token, 115 "Message": str(self.get_message(token)), 116 } 117 118 if self.mapping: 119 payload = sanitize_item( 120 self.mapping.evaluate( 121 user=device.user, 122 request=request, 123 device=device, 124 token=token, 125 stage=self, 126 ) 127 ) 128 129 if self.auth_type == SMSAuthTypes.BEARER: 130 response = get_http_session().post( 131 self.account_sid, 132 json=payload, 133 headers={"Authorization": f"Bearer {self.auth}"}, 134 ) 135 elif self.auth_type == SMSAuthTypes.BASIC: 136 response = get_http_session().post( 137 self.account_sid, 138 json=payload, 139 auth=(self.auth, self.auth_password), 140 ) 141 else: 142 raise ValueError(f"Invalid Auth type '{self.auth_type}'") 143 144 LOGGER.debug("Sent SMS", to=device.phone_number) 145 try: 146 response.raise_for_status() 147 except RequestException as exc: 148 LOGGER.warning( 149 "Error sending token by generic SMS", 150 exc=exc, 151 status=response.status_code, 152 body=response.text[:100], 153 ) 154 Event.new( 155 EventAction.CONFIGURATION_ERROR, 156 message="Error sending SMS", 157 status_code=response.status_code, 158 body=response.text, 159 ).with_exception(exc).set_user(device.user).save() 160 if response.status_code >= HttpResponseBadRequest.status_code: 161 raise ValidationError(response.text) from None 162 raise 163 164 @property 165 def serializer(self) -> type[BaseSerializer]: 166 from authentik.stages.authenticator_sms.api import AuthenticatorSMSStageSerializer 167 168 return AuthenticatorSMSStageSerializer 169 170 @property 171 def view(self) -> type[View]: 172 from authentik.stages.authenticator_sms.stage import AuthenticatorSMSStageView 173 174 return AuthenticatorSMSStageView 175 176 @property 177 def component(self) -> str: 178 return "ak-stage-authenticator-sms-form" 179 180 def ui_user_settings(self) -> UserSettingSerializer | None: 181 return UserSettingSerializer( 182 data={ 183 "title": self.friendly_name or str(self._meta.verbose_name), 184 "component": "ak-user-settings-authenticator-sms", 185 } 186 ) 187 188 def __str__(self) -> str: 189 return f"SMS Authenticator Setup Stage {self.name}" 190 191 class Meta: 192 verbose_name = _("SMS Authenticator Setup Stage") 193 verbose_name_plural = _("SMS Authenticator Setup Stages")
Use SMS-based TOTP instead of authenticator-based.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Child.parent is a ForwardManyToOneDescriptor instance.
72 def send(self, request: HttpRequest, token: str, device: SMSDevice): 73 """Send message via selected provider""" 74 if self.provider == SMSProviders.TWILIO: 75 return self.send_twilio(request, token, device) 76 if self.provider == SMSProviders.GENERIC: 77 return self.send_generic(request, token, device) 78 raise ValueError(f"invalid provider {self.provider}")
Send message via selected provider
80 def get_message(self, token: str) -> str: 81 """Get SMS message""" 82 return _("Use this code to authenticate in authentik: {token}".format_map({"token": token}))
Get SMS message
84 def send_twilio(self, request: HttpRequest, token: str, device: SMSDevice): 85 """send sms via twilio provider""" 86 client = Client(self.account_sid, self.auth) 87 message_body = str(self.get_message(token)) 88 if self.mapping: 89 payload = sanitize_item( 90 self.mapping.evaluate( 91 user=device.user, 92 request=request, 93 device=device, 94 token=token, 95 stage=self, 96 ) 97 ) 98 message_body = payload.get("message", message_body) 99 100 try: 101 message = client.messages.create( 102 to=device.phone_number, from_=self.from_number, body=message_body 103 ) 104 LOGGER.debug("Sent SMS", to=device, message=message.sid) 105 except TwilioRestException as exc: 106 LOGGER.warning("Error sending token by Twilio SMS", exc=exc, msg=exc.msg) 107 raise ValidationError(exc.msg) from None
send sms via twilio provider
109 def send_generic(self, request: HttpRequest, token: str, device: SMSDevice): 110 """Send SMS via outside API""" 111 payload = { 112 "From": self.from_number, 113 "To": device.phone_number, 114 "Body": token, 115 "Message": str(self.get_message(token)), 116 } 117 118 if self.mapping: 119 payload = sanitize_item( 120 self.mapping.evaluate( 121 user=device.user, 122 request=request, 123 device=device, 124 token=token, 125 stage=self, 126 ) 127 ) 128 129 if self.auth_type == SMSAuthTypes.BEARER: 130 response = get_http_session().post( 131 self.account_sid, 132 json=payload, 133 headers={"Authorization": f"Bearer {self.auth}"}, 134 ) 135 elif self.auth_type == SMSAuthTypes.BASIC: 136 response = get_http_session().post( 137 self.account_sid, 138 json=payload, 139 auth=(self.auth, self.auth_password), 140 ) 141 else: 142 raise ValueError(f"Invalid Auth type '{self.auth_type}'") 143 144 LOGGER.debug("Sent SMS", to=device.phone_number) 145 try: 146 response.raise_for_status() 147 except RequestException as exc: 148 LOGGER.warning( 149 "Error sending token by generic SMS", 150 exc=exc, 151 status=response.status_code, 152 body=response.text[:100], 153 ) 154 Event.new( 155 EventAction.CONFIGURATION_ERROR, 156 message="Error sending SMS", 157 status_code=response.status_code, 158 body=response.text, 159 ).with_exception(exc).set_user(device.user).save() 160 if response.status_code >= HttpResponseBadRequest.status_code: 161 raise ValidationError(response.text) from None 162 raise
Send SMS via outside API
164 @property 165 def serializer(self) -> type[BaseSerializer]: 166 from authentik.stages.authenticator_sms.api import AuthenticatorSMSStageSerializer 167 168 return AuthenticatorSMSStageSerializer
Get serializer for this model
170 @property 171 def view(self) -> type[View]: 172 from authentik.stages.authenticator_sms.stage import AuthenticatorSMSStageView 173 174 return AuthenticatorSMSStageView
Return StageView class that implements logic for this stage
180 def ui_user_settings(self) -> UserSettingSerializer | None: 181 return UserSettingSerializer( 182 data={ 183 "title": self.friendly_name or str(self._meta.verbose_name), 184 "component": "ak-user-settings-authenticator-sms", 185 } 186 )
Entrypoint to integrate with User settings. Can either return None if no user settings are available, or a challenge.
Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Child.parent is a ForwardManyToOneDescriptor instance.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
Method descriptor with partial application of the given arguments and keywords.
Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.
Method descriptor with partial application of the given arguments and keywords.
Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.
Accessor to the related object on the forward side of a one-to-one relation.
In the example::
class Restaurant(Model):
place = OneToOneField(Place, related_name='restaurant')
Restaurant.place is a ForwardOneToOneDescriptor instance.
Accessor to the related objects manager on the reverse side of a many-to-one relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Parent.children is a ReverseManyToOneDescriptor instance.
Most of the implementation is delegated to a dynamically defined manager
class built by create_forward_many_to_many_manager() defined below.
Inherited Members
- authentik.flows.models.Stage
- stage_uuid
- name
- objects
- is_in_memory
- flow_set
- flowstagebinding_set
- emailstage
- endpointstage
- invitationstage
- passwordstage
- promptstage
- authenticatorstaticstage
- authenticatorduostage
- authenticatoremailstage
- authenticatorsmsstage
- authenticatorwebauthnstage
- authenticatorvalidatestage
- captchastage
- identificationstage
- authenticatortotpstage
- consentstage
- denystage
- dummystage
- redirectstage
- userdeletestage
- userloginstage
- userlogoutstage
- userwritestage
- authenticatorendpointgdtcstage
- mutualtlsstage
- sourcestage
The requested object does not exist
The query returned multiple objects when only one was expected.
196def hash_phone_number(phone_number: str) -> str: 197 """Hash phone number with prefix""" 198 return "hash:" + sha256(phone_number.encode()).hexdigest()
Hash phone number with prefix
201class SMSDevice(SerializerModel, SideChannelDevice): 202 """SMS Device""" 203 204 user = models.ForeignKey(get_user_model(), on_delete=models.CASCADE) 205 206 # Connect to the stage to when validating access we know the API Credentials 207 stage = models.ForeignKey(AuthenticatorSMSStage, on_delete=models.CASCADE) 208 209 phone_number = models.TextField() 210 211 last_t = models.DateTimeField(auto_now=True) 212 213 def set_hashed_number(self): 214 """Set phone_number to hashed number""" 215 self.phone_number = hash_phone_number(self.phone_number) 216 217 @property 218 def is_hashed(self) -> bool: 219 """Check if the phone number is hashed""" 220 return self.phone_number.startswith("hash:") 221 222 @property 223 def serializer(self) -> type[BaseSerializer]: 224 from authentik.stages.authenticator_sms.api import SMSDeviceSerializer 225 226 return SMSDeviceSerializer 227 228 def verify_token(self, token): 229 valid = super().verify_token(token) 230 if valid: 231 self.save() 232 return valid 233 234 def __str__(self): 235 return str(self.name) or str(self.user_id) 236 237 class Meta: 238 verbose_name = _("SMS Device") 239 verbose_name_plural = _("SMS Devices") 240 unique_together = (("stage", "phone_number"),)
SMS Device
Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Child.parent is a ForwardManyToOneDescriptor instance.
Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Child.parent is a ForwardManyToOneDescriptor instance.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
213 def set_hashed_number(self): 214 """Set phone_number to hashed number""" 215 self.phone_number = hash_phone_number(self.phone_number)
Set phone_number to hashed number
217 @property 218 def is_hashed(self) -> bool: 219 """Check if the phone number is hashed""" 220 return self.phone_number.startswith("hash:")
Check if the phone number is hashed
222 @property 223 def serializer(self) -> type[BaseSerializer]: 224 from authentik.stages.authenticator_sms.api import SMSDeviceSerializer 225 226 return SMSDeviceSerializer
Get serializer for this model
228 def verify_token(self, token): 229 valid = super().verify_token(token) 230 if valid: 231 self.save() 232 return valid
Verifies a token by content and expiry.
On success, the token is cleared and the device saved.
:param str token: The OTP token provided by the user. :rtype: bool
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
Method descriptor with partial application of the given arguments and keywords.
Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.
Method descriptor with partial application of the given arguments and keywords.
Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.
Method descriptor with partial application of the given arguments and keywords.
Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.
Method descriptor with partial application of the given arguments and keywords.
Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.
Method descriptor with partial application of the given arguments and keywords.
Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.
Method descriptor with partial application of the given arguments and keywords.
Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.
Method descriptor with partial application of the given arguments and keywords.
Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.
Method descriptor with partial application of the given arguments and keywords.
Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.
The requested object does not exist
The query returned multiple objects when only one was expected.