authentik.stages.authenticator.models
Base authenticator models
1"""Base authenticator models""" 2 3from datetime import timedelta 4 5from django.apps import apps 6from django.core.exceptions import ObjectDoesNotExist 7from django.db import models 8from django.utils import timezone 9from django.utils.functional import cached_property 10 11from authentik.core.models import User 12from authentik.lib.models import CreatedUpdatedModel 13from authentik.stages.authenticator.util import random_number_token 14 15 16class DeviceManager(models.Manager): 17 """ 18 The :class:`~django.db.models.Manager` object installed as 19 ``Device.objects``. 20 """ 21 22 def devices_for_user(self, user: User, confirmed: bool | None = None): 23 """ 24 Returns a queryset for all devices of this class that belong to the 25 given user. 26 27 :param user: The user. 28 :type user: :class:`~django.contrib.auth.models.User` 29 30 :param confirmed: If ``None``, all matching devices are returned. 31 Otherwise, this can be any true or false value to limit the query 32 to confirmed or unconfirmed devices, respectively. 33 """ 34 devices = self.model.objects.filter(user=user) 35 if confirmed is not None: 36 devices = devices.filter(confirmed=bool(confirmed)) 37 38 return devices 39 40 41class Device(CreatedUpdatedModel): 42 """ 43 Abstract base model for a :term:`device` attached to a user. Plugins must 44 subclass this to define their OTP models. 45 46 .. _unsaved_device_warning: 47 48 .. warning:: 49 50 OTP devices are inherently stateful. For example, verifying a token is 51 logically a mutating operation on the device, which may involve 52 incrementing a counter or otherwise consuming a token. A device must be 53 committed to the database before it can be used in any way. 54 55 .. attribute:: user 56 57 *ForeignKey*: Foreign key to your user model, as configured by 58 :setting:`AUTH_USER_MODEL` (:class:`~django.contrib.auth.models.User` 59 by default). 60 61 .. attribute:: name 62 63 *CharField*: A human-readable name to help the user identify their 64 devices. 65 66 .. attribute:: confirmed 67 68 *BooleanField*: A boolean value that tells us whether this device has 69 been confirmed as valid. It defaults to ``True``, but subclasses or 70 individual deployments can force it to ``False`` if they wish to create 71 a device and then ask the user for confirmation. As a rule, built-in 72 APIs that enumerate devices will only include those that are confirmed. 73 74 .. attribute:: objects 75 76 A :class:`~authentik.stages.authenticator.models.DeviceManager`. 77 """ 78 79 user = models.ForeignKey( 80 User, 81 help_text="The user that this device belongs to.", 82 on_delete=models.CASCADE, 83 ) 84 85 name = models.CharField(max_length=64, help_text="The human-readable name of this device.") 86 87 confirmed = models.BooleanField(default=True, help_text="Is this device ready for use?") 88 89 last_used = models.DateTimeField(null=True) 90 91 objects = DeviceManager() 92 93 class Meta: 94 abstract = True 95 96 def __str__(self): 97 try: 98 user = self.user 99 except ObjectDoesNotExist: 100 user = None 101 102 return f"{self.name} ({user})" 103 104 @property 105 def persistent_id(self): 106 """ 107 A stable device identifier for forms and APIs. 108 """ 109 return f"{self.model_label()}/{self.id}" 110 111 @classmethod 112 def model_label(cls): 113 """ 114 Returns an identifier for this Django model class. 115 116 This is just the standard "<app_label>.<model_name>" form. 117 118 """ 119 return f"{cls._meta.app_label}.{cls._meta.model_name}" 120 121 @classmethod 122 def from_persistent_id(cls, persistent_id, for_verify=False): 123 """ 124 Loads a device from its persistent id:: 125 126 device == Device.from_persistent_id(device.persistent_id) 127 128 :param bool for_verify: If ``True``, we'll load the device with 129 :meth:`~django.db.models.query.QuerySet.select_for_update` to 130 prevent concurrent verifications from succeeding. In which case, 131 this must be called inside a transaction. 132 133 """ 134 device = None 135 136 try: 137 model_label, device_id = persistent_id.rsplit("/", 1) 138 app_label, model_name = model_label.split(".") 139 140 device_cls = apps.get_model(app_label, model_name) 141 if issubclass(device_cls, Device): 142 device_set = device_cls.objects.filter(id=int(device_id)) 143 if for_verify: 144 device_set = device_set.select_for_update() 145 device = device_set.first() 146 except ValueError, LookupError: 147 pass 148 149 return device 150 151 def is_interactive(self): 152 """ 153 Returns ``True`` if this is an interactive device. The default 154 implementation returns ``True`` if 155 :meth:`~authentik.stages.authenticator.models.Device.generate_challenge` has been 156 overridden, but subclasses are welcome to provide smarter 157 implementations. 158 159 :rtype: bool 160 """ 161 return not hasattr(self.generate_challenge, "stub") 162 163 def generate_challenge(self): 164 """ 165 Generates a challenge value that the user will need to produce a token. 166 This method is permitted to have side effects, such as transmitting 167 information to the user through some other channel (email or SMS, 168 perhaps). And, of course, some devices may need to commit the 169 challenge to the database. 170 171 :returns: A message to the user. This should be a string that fits 172 comfortably in the template ``'OTP Challenge: {0}'``. This may 173 return ``None`` if this device is not interactive. 174 :rtype: string or ``None`` 175 176 :raises: Any :exc:`~exceptions.Exception` is permitted. Callers should 177 trap ``Exception`` and report it to the user. 178 """ 179 return None 180 181 generate_challenge.stub = True 182 183 def verify_is_allowed(self): 184 """ 185 Checks whether it is permissible to call :meth:`verify_token`. If it is 186 allowed, returns ``(True, None)``. Otherwise returns ``(False, 187 data_dict)``, where ``data_dict`` contains extra information, defined 188 by the implementation. 189 190 This method can be used to implement throttling or locking, for 191 example. Client code should check this method before calling 192 :meth:`verify_token` and report problems to the user. 193 194 To report specific problems, the data dictionary can return include a 195 ``'reason'`` member with a value from the constants in 196 :class:`VerifyNotAllowed`. Otherwise, an ``'error_message'`` member 197 should be provided with an error message. 198 199 :meth:`verify_token` should also call this method and return False if 200 verification is not allowed. 201 202 :rtype: (bool, dict or ``None``) 203 204 """ 205 return (True, None) 206 207 def verify_token(self, token): 208 """ 209 Verifies a token. As a rule, the token should no longer be valid if 210 this returns ``True``. 211 212 :param str token: The OTP token provided by the user. 213 :rtype: bool 214 """ 215 return False 216 217 218class SideChannelDevice(Device): 219 """ 220 Abstract base model for a side-channel :term:`device` attached to a user. 221 222 This model implements token generation, verification and expiration, so the 223 concrete devices only have to implement delivery. 224 225 """ 226 227 token = models.CharField(max_length=16, blank=True, null=True) 228 229 valid_until = models.DateTimeField( 230 default=timezone.now, 231 help_text="The timestamp of the moment of expiry of the saved token.", 232 ) 233 234 class Meta: 235 abstract = True 236 237 def generate_token(self, length=6, valid_secs=300, commit=True): 238 """ 239 Generates a token of the specified length, then sets it on the model 240 and sets the expiration of the token on the model. 241 242 Pass 'commit=False' to avoid calling self.save(). 243 244 :param int length: Number of decimal digits in the generated token. 245 :param int valid_secs: Amount of seconds the token should be valid. 246 :param bool commit: Whether to autosave the generated token. 247 248 """ 249 self.token = random_number_token(length) 250 self.valid_until = timezone.now() + timedelta(seconds=valid_secs) 251 if commit: 252 self.save() 253 254 def verify_token(self, token): 255 """ 256 Verifies a token by content and expiry. 257 258 On success, the token is cleared and the device saved. 259 260 :param str token: The OTP token provided by the user. 261 :rtype: bool 262 263 """ 264 _now = timezone.now() 265 266 if (self.token is not None) and (token == self.token) and (_now < self.valid_until): 267 self.token = None 268 self.valid_until = _now 269 self.save() 270 271 return True 272 return False 273 274 275class VerifyNotAllowed: 276 """ 277 Constants that may be returned in the ``reason`` member of the extra 278 information dictionary returned by 279 :meth:`~authentik.stages.authenticator.models.Device.verify_is_allowed` 280 281 .. data:: N_FAILED_ATTEMPTS 282 283 Indicates that verification is disallowed because of ``n`` successive 284 failed attempts. The data dictionary should include the value of ``n`` 285 in member ``failure_count`` 286 287 """ 288 289 N_FAILED_ATTEMPTS = "N_FAILED_ATTEMPTS" 290 291 292class ThrottlingMixin(models.Model): 293 """ 294 Mixin class for models that want throttling behaviour. 295 296 This implements exponential back-off for verifying tokens. Subclasses must 297 implement :meth:`get_throttle_factor`, and must use the 298 :meth:`verify_is_allowed`, :meth:`throttle_reset` and 299 :meth:`throttle_increment` methods from within their verify_token() method. 300 301 See the implementation of 302 :class:`~authentik.stages.authenticator.plugins.otp_email.models.EmailDevice` for an example. 303 304 """ 305 306 throttling_failure_timestamp = models.DateTimeField( 307 null=True, 308 blank=True, 309 default=None, 310 help_text=( 311 "A timestamp of the last failed verification attempt. " 312 "Null if last attempt succeeded." 313 ), 314 ) 315 316 throttling_failure_count = models.PositiveIntegerField( 317 default=0, help_text="Number of successive failed attempts." 318 ) 319 320 class Meta: 321 abstract = True 322 323 def verify_is_allowed(self): 324 """ 325 If verification is allowed, returns ``(True, None)``. 326 Otherwise, returns ``(False, data_dict)``. 327 328 ``data_dict`` contains further information. Currently it can be:: 329 330 { 331 'reason': VerifyNotAllowed.N_FAILED_ATTEMPTS, 332 'failure_count': n 333 } 334 335 where ``n`` is the number of successive failures. See 336 :class:`~authentik.stages.authenticator.models.VerifyNotAllowed`. 337 338 """ 339 if ( 340 self.throttling_enabled 341 and self.throttling_failure_count > 0 342 and self.throttling_failure_timestamp is not None 343 ): 344 now = timezone.now() 345 delay = (now - self.throttling_failure_timestamp).total_seconds() 346 # Required delays should be 1, 2, 4, 8 ... 347 delay_required = self.get_throttle_factor() * (2 ** (self.throttling_failure_count - 1)) 348 if delay < delay_required: 349 return ( 350 False, 351 { 352 "reason": VerifyNotAllowed.N_FAILED_ATTEMPTS, 353 "failure_count": self.throttling_failure_count, 354 "locked_until": self.throttling_failure_timestamp 355 + timedelta(seconds=delay_required), 356 }, 357 ) 358 359 return super().verify_is_allowed() 360 361 def throttle_reset(self, commit=True): 362 """ 363 Call this method to reset throttling (normally when a verify attempt 364 succeeded). 365 366 Pass 'commit=False' to avoid calling self.save(). 367 368 """ 369 self.throttling_failure_timestamp = None 370 self.throttling_failure_count = 0 371 if commit: 372 self.save() 373 374 def throttle_increment(self, commit=True): 375 """ 376 Call this method to increase throttling (normally when a verify attempt 377 failed). 378 379 Pass 'commit=False' to avoid calling self.save(). 380 381 """ 382 self.throttling_failure_timestamp = timezone.now() 383 self.throttling_failure_count += 1 384 if commit: 385 self.save() 386 387 @cached_property 388 def throttling_enabled(self) -> bool: 389 """Check if throttling is enabled""" 390 return self.get_throttle_factor() > 0 391 392 def get_throttle_factor(self): # pragma: no cover 393 """ 394 This must be implemented to return the throttle factor. 395 396 The number of seconds required between verification attempts will be 397 :math:`c2^{n-1}` where `c` is this factor and `n` is the number of 398 previous failures. A factor of 1 translates to delays of 1, 2, 4, 8, 399 etc. seconds. A factor of 0 disables the throttling. 400 401 Normally this is just a wrapper for a plugin-specific setting like 402 :setting:`OTP_EMAIL_THROTTLE_FACTOR`. 403 404 """ 405 raise NotImplementedError()
17class DeviceManager(models.Manager): 18 """ 19 The :class:`~django.db.models.Manager` object installed as 20 ``Device.objects``. 21 """ 22 23 def devices_for_user(self, user: User, confirmed: bool | None = None): 24 """ 25 Returns a queryset for all devices of this class that belong to the 26 given user. 27 28 :param user: The user. 29 :type user: :class:`~django.contrib.auth.models.User` 30 31 :param confirmed: If ``None``, all matching devices are returned. 32 Otherwise, this can be any true or false value to limit the query 33 to confirmed or unconfirmed devices, respectively. 34 """ 35 devices = self.model.objects.filter(user=user) 36 if confirmed is not None: 37 devices = devices.filter(confirmed=bool(confirmed)) 38 39 return devices
The :class:~django.db.models.Manager object installed as
Device.objects.
23 def devices_for_user(self, user: User, confirmed: bool | None = None): 24 """ 25 Returns a queryset for all devices of this class that belong to the 26 given user. 27 28 :param user: The user. 29 :type user: :class:`~django.contrib.auth.models.User` 30 31 :param confirmed: If ``None``, all matching devices are returned. 32 Otherwise, this can be any true or false value to limit the query 33 to confirmed or unconfirmed devices, respectively. 34 """ 35 devices = self.model.objects.filter(user=user) 36 if confirmed is not None: 37 devices = devices.filter(confirmed=bool(confirmed)) 38 39 return devices
Returns a queryset for all devices of this class that belong to the given user.
:param user: The user.
:type user: :class:~django.contrib.auth.models.User
:param confirmed: If None, all matching devices are returned.
Otherwise, this can be any true or false value to limit the query
to confirmed or unconfirmed devices, respectively.
42class Device(CreatedUpdatedModel): 43 """ 44 Abstract base model for a :term:`device` attached to a user. Plugins must 45 subclass this to define their OTP models. 46 47 .. _unsaved_device_warning: 48 49 .. warning:: 50 51 OTP devices are inherently stateful. For example, verifying a token is 52 logically a mutating operation on the device, which may involve 53 incrementing a counter or otherwise consuming a token. A device must be 54 committed to the database before it can be used in any way. 55 56 .. attribute:: user 57 58 *ForeignKey*: Foreign key to your user model, as configured by 59 :setting:`AUTH_USER_MODEL` (:class:`~django.contrib.auth.models.User` 60 by default). 61 62 .. attribute:: name 63 64 *CharField*: A human-readable name to help the user identify their 65 devices. 66 67 .. attribute:: confirmed 68 69 *BooleanField*: A boolean value that tells us whether this device has 70 been confirmed as valid. It defaults to ``True``, but subclasses or 71 individual deployments can force it to ``False`` if they wish to create 72 a device and then ask the user for confirmation. As a rule, built-in 73 APIs that enumerate devices will only include those that are confirmed. 74 75 .. attribute:: objects 76 77 A :class:`~authentik.stages.authenticator.models.DeviceManager`. 78 """ 79 80 user = models.ForeignKey( 81 User, 82 help_text="The user that this device belongs to.", 83 on_delete=models.CASCADE, 84 ) 85 86 name = models.CharField(max_length=64, help_text="The human-readable name of this device.") 87 88 confirmed = models.BooleanField(default=True, help_text="Is this device ready for use?") 89 90 last_used = models.DateTimeField(null=True) 91 92 objects = DeviceManager() 93 94 class Meta: 95 abstract = True 96 97 def __str__(self): 98 try: 99 user = self.user 100 except ObjectDoesNotExist: 101 user = None 102 103 return f"{self.name} ({user})" 104 105 @property 106 def persistent_id(self): 107 """ 108 A stable device identifier for forms and APIs. 109 """ 110 return f"{self.model_label()}/{self.id}" 111 112 @classmethod 113 def model_label(cls): 114 """ 115 Returns an identifier for this Django model class. 116 117 This is just the standard "<app_label>.<model_name>" form. 118 119 """ 120 return f"{cls._meta.app_label}.{cls._meta.model_name}" 121 122 @classmethod 123 def from_persistent_id(cls, persistent_id, for_verify=False): 124 """ 125 Loads a device from its persistent id:: 126 127 device == Device.from_persistent_id(device.persistent_id) 128 129 :param bool for_verify: If ``True``, we'll load the device with 130 :meth:`~django.db.models.query.QuerySet.select_for_update` to 131 prevent concurrent verifications from succeeding. In which case, 132 this must be called inside a transaction. 133 134 """ 135 device = None 136 137 try: 138 model_label, device_id = persistent_id.rsplit("/", 1) 139 app_label, model_name = model_label.split(".") 140 141 device_cls = apps.get_model(app_label, model_name) 142 if issubclass(device_cls, Device): 143 device_set = device_cls.objects.filter(id=int(device_id)) 144 if for_verify: 145 device_set = device_set.select_for_update() 146 device = device_set.first() 147 except ValueError, LookupError: 148 pass 149 150 return device 151 152 def is_interactive(self): 153 """ 154 Returns ``True`` if this is an interactive device. The default 155 implementation returns ``True`` if 156 :meth:`~authentik.stages.authenticator.models.Device.generate_challenge` has been 157 overridden, but subclasses are welcome to provide smarter 158 implementations. 159 160 :rtype: bool 161 """ 162 return not hasattr(self.generate_challenge, "stub") 163 164 def generate_challenge(self): 165 """ 166 Generates a challenge value that the user will need to produce a token. 167 This method is permitted to have side effects, such as transmitting 168 information to the user through some other channel (email or SMS, 169 perhaps). And, of course, some devices may need to commit the 170 challenge to the database. 171 172 :returns: A message to the user. This should be a string that fits 173 comfortably in the template ``'OTP Challenge: {0}'``. This may 174 return ``None`` if this device is not interactive. 175 :rtype: string or ``None`` 176 177 :raises: Any :exc:`~exceptions.Exception` is permitted. Callers should 178 trap ``Exception`` and report it to the user. 179 """ 180 return None 181 182 generate_challenge.stub = True 183 184 def verify_is_allowed(self): 185 """ 186 Checks whether it is permissible to call :meth:`verify_token`. If it is 187 allowed, returns ``(True, None)``. Otherwise returns ``(False, 188 data_dict)``, where ``data_dict`` contains extra information, defined 189 by the implementation. 190 191 This method can be used to implement throttling or locking, for 192 example. Client code should check this method before calling 193 :meth:`verify_token` and report problems to the user. 194 195 To report specific problems, the data dictionary can return include a 196 ``'reason'`` member with a value from the constants in 197 :class:`VerifyNotAllowed`. Otherwise, an ``'error_message'`` member 198 should be provided with an error message. 199 200 :meth:`verify_token` should also call this method and return False if 201 verification is not allowed. 202 203 :rtype: (bool, dict or ``None``) 204 205 """ 206 return (True, None) 207 208 def verify_token(self, token): 209 """ 210 Verifies a token. As a rule, the token should no longer be valid if 211 this returns ``True``. 212 213 :param str token: The OTP token provided by the user. 214 :rtype: bool 215 """ 216 return False
Abstract base model for a :term:device attached to a user. Plugins must
subclass this to define their OTP models.
.. _unsaved_device_warning:
.. warning::
OTP devices are inherently stateful. For example, verifying a token is
logically a mutating operation on the device, which may involve
incrementing a counter or otherwise consuming a token. A device must be
committed to the database before it can be used in any way.
.. attribute:: user
*ForeignKey*: Foreign key to your user model, as configured by
:setting:`AUTH_USER_MODEL` (:class:`~django.contrib.auth.models.User`
by default).
.. attribute:: name
*CharField*: A human-readable name to help the user identify their
devices.
.. attribute:: confirmed
*BooleanField*: A boolean value that tells us whether this device has
been confirmed as valid. It defaults to ``True``, but subclasses or
individual deployments can force it to ``False`` if they wish to create
a device and then ask the user for confirmation. As a rule, built-in
APIs that enumerate devices will only include those that are confirmed.
.. attribute:: objects
A :class:`~authentik.stages.authenticator.models.DeviceManager`.
Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Child.parent is a ForwardManyToOneDescriptor instance.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
105 @property 106 def persistent_id(self): 107 """ 108 A stable device identifier for forms and APIs. 109 """ 110 return f"{self.model_label()}/{self.id}"
A stable device identifier for forms and APIs.
112 @classmethod 113 def model_label(cls): 114 """ 115 Returns an identifier for this Django model class. 116 117 This is just the standard "<app_label>.<model_name>" form. 118 119 """ 120 return f"{cls._meta.app_label}.{cls._meta.model_name}"
Returns an identifier for this Django model class.
This is just the standard "
122 @classmethod 123 def from_persistent_id(cls, persistent_id, for_verify=False): 124 """ 125 Loads a device from its persistent id:: 126 127 device == Device.from_persistent_id(device.persistent_id) 128 129 :param bool for_verify: If ``True``, we'll load the device with 130 :meth:`~django.db.models.query.QuerySet.select_for_update` to 131 prevent concurrent verifications from succeeding. In which case, 132 this must be called inside a transaction. 133 134 """ 135 device = None 136 137 try: 138 model_label, device_id = persistent_id.rsplit("/", 1) 139 app_label, model_name = model_label.split(".") 140 141 device_cls = apps.get_model(app_label, model_name) 142 if issubclass(device_cls, Device): 143 device_set = device_cls.objects.filter(id=int(device_id)) 144 if for_verify: 145 device_set = device_set.select_for_update() 146 device = device_set.first() 147 except ValueError, LookupError: 148 pass 149 150 return device
Loads a device from its persistent id::
device == Device.from_persistent_id(device.persistent_id)
:param bool for_verify: If True, we'll load the device with
:meth:~django.db.models.query.QuerySet.select_for_update to
prevent concurrent verifications from succeeding. In which case,
this must be called inside a transaction.
152 def is_interactive(self): 153 """ 154 Returns ``True`` if this is an interactive device. The default 155 implementation returns ``True`` if 156 :meth:`~authentik.stages.authenticator.models.Device.generate_challenge` has been 157 overridden, but subclasses are welcome to provide smarter 158 implementations. 159 160 :rtype: bool 161 """ 162 return not hasattr(self.generate_challenge, "stub")
Returns True if this is an interactive device. The default
implementation returns True if
:meth:~authentik.stages.authenticator.models.Device.generate_challenge has been
overridden, but subclasses are welcome to provide smarter
implementations.
:rtype: bool
164 def generate_challenge(self): 165 """ 166 Generates a challenge value that the user will need to produce a token. 167 This method is permitted to have side effects, such as transmitting 168 information to the user through some other channel (email or SMS, 169 perhaps). And, of course, some devices may need to commit the 170 challenge to the database. 171 172 :returns: A message to the user. This should be a string that fits 173 comfortably in the template ``'OTP Challenge: {0}'``. This may 174 return ``None`` if this device is not interactive. 175 :rtype: string or ``None`` 176 177 :raises: Any :exc:`~exceptions.Exception` is permitted. Callers should 178 trap ``Exception`` and report it to the user. 179 """ 180 return None
Generates a challenge value that the user will need to produce a token. This method is permitted to have side effects, such as transmitting information to the user through some other channel (email or SMS, perhaps). And, of course, some devices may need to commit the challenge to the database.
:returns: A message to the user. This should be a string that fits
comfortably in the template 'OTP Challenge: {0}'. This may
return None if this device is not interactive.
:rtype: string or None
:raises: Any :exc:~exceptions.Exception is permitted. Callers should
trap Exception and report it to the user.
184 def verify_is_allowed(self): 185 """ 186 Checks whether it is permissible to call :meth:`verify_token`. If it is 187 allowed, returns ``(True, None)``. Otherwise returns ``(False, 188 data_dict)``, where ``data_dict`` contains extra information, defined 189 by the implementation. 190 191 This method can be used to implement throttling or locking, for 192 example. Client code should check this method before calling 193 :meth:`verify_token` and report problems to the user. 194 195 To report specific problems, the data dictionary can return include a 196 ``'reason'`` member with a value from the constants in 197 :class:`VerifyNotAllowed`. Otherwise, an ``'error_message'`` member 198 should be provided with an error message. 199 200 :meth:`verify_token` should also call this method and return False if 201 verification is not allowed. 202 203 :rtype: (bool, dict or ``None``) 204 205 """ 206 return (True, None)
Checks whether it is permissible to call :meth:verify_token. If it is
allowed, returns (True, None). Otherwise returns (False,
data_dict), where data_dict contains extra information, defined
by the implementation.
This method can be used to implement throttling or locking, for
example. Client code should check this method before calling
:meth:verify_token and report problems to the user.
To report specific problems, the data dictionary can return include a
'reason' member with a value from the constants in
:class:VerifyNotAllowed. Otherwise, an 'error_message' member
should be provided with an error message.
:meth:verify_token should also call this method and return False if
verification is not allowed.
:rtype: (bool, dict or None)
208 def verify_token(self, token): 209 """ 210 Verifies a token. As a rule, the token should no longer be valid if 211 this returns ``True``. 212 213 :param str token: The OTP token provided by the user. 214 :rtype: bool 215 """ 216 return False
Verifies a token. As a rule, the token should no longer be valid if
this returns True.
:param str token: The OTP token provided by the user. :rtype: bool
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
Method descriptor with partial application of the given arguments and keywords.
Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.
Method descriptor with partial application of the given arguments and keywords.
Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.
219class SideChannelDevice(Device): 220 """ 221 Abstract base model for a side-channel :term:`device` attached to a user. 222 223 This model implements token generation, verification and expiration, so the 224 concrete devices only have to implement delivery. 225 226 """ 227 228 token = models.CharField(max_length=16, blank=True, null=True) 229 230 valid_until = models.DateTimeField( 231 default=timezone.now, 232 help_text="The timestamp of the moment of expiry of the saved token.", 233 ) 234 235 class Meta: 236 abstract = True 237 238 def generate_token(self, length=6, valid_secs=300, commit=True): 239 """ 240 Generates a token of the specified length, then sets it on the model 241 and sets the expiration of the token on the model. 242 243 Pass 'commit=False' to avoid calling self.save(). 244 245 :param int length: Number of decimal digits in the generated token. 246 :param int valid_secs: Amount of seconds the token should be valid. 247 :param bool commit: Whether to autosave the generated token. 248 249 """ 250 self.token = random_number_token(length) 251 self.valid_until = timezone.now() + timedelta(seconds=valid_secs) 252 if commit: 253 self.save() 254 255 def verify_token(self, token): 256 """ 257 Verifies a token by content and expiry. 258 259 On success, the token is cleared and the device saved. 260 261 :param str token: The OTP token provided by the user. 262 :rtype: bool 263 264 """ 265 _now = timezone.now() 266 267 if (self.token is not None) and (token == self.token) and (_now < self.valid_until): 268 self.token = None 269 self.valid_until = _now 270 self.save() 271 272 return True 273 return False
Abstract base model for a side-channel :term:device attached to a user.
This model implements token generation, verification and expiration, so the concrete devices only have to implement delivery.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
238 def generate_token(self, length=6, valid_secs=300, commit=True): 239 """ 240 Generates a token of the specified length, then sets it on the model 241 and sets the expiration of the token on the model. 242 243 Pass 'commit=False' to avoid calling self.save(). 244 245 :param int length: Number of decimal digits in the generated token. 246 :param int valid_secs: Amount of seconds the token should be valid. 247 :param bool commit: Whether to autosave the generated token. 248 249 """ 250 self.token = random_number_token(length) 251 self.valid_until = timezone.now() + timedelta(seconds=valid_secs) 252 if commit: 253 self.save()
Generates a token of the specified length, then sets it on the model and sets the expiration of the token on the model.
Pass 'commit=False' to avoid calling self.save().
:param int length: Number of decimal digits in the generated token. :param int valid_secs: Amount of seconds the token should be valid. :param bool commit: Whether to autosave the generated token.
255 def verify_token(self, token): 256 """ 257 Verifies a token by content and expiry. 258 259 On success, the token is cleared and the device saved. 260 261 :param str token: The OTP token provided by the user. 262 :rtype: bool 263 264 """ 265 _now = timezone.now() 266 267 if (self.token is not None) and (token == self.token) and (_now < self.valid_until): 268 self.token = None 269 self.valid_until = _now 270 self.save() 271 272 return True 273 return False
Verifies a token by content and expiry.
On success, the token is cleared and the device saved.
:param str token: The OTP token provided by the user. :rtype: bool
Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Child.parent is a ForwardManyToOneDescriptor instance.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
Method descriptor with partial application of the given arguments and keywords.
Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.
Method descriptor with partial application of the given arguments and keywords.
Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.
Method descriptor with partial application of the given arguments and keywords.
Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.
Method descriptor with partial application of the given arguments and keywords.
Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.
Method descriptor with partial application of the given arguments and keywords.
Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.
276class VerifyNotAllowed: 277 """ 278 Constants that may be returned in the ``reason`` member of the extra 279 information dictionary returned by 280 :meth:`~authentik.stages.authenticator.models.Device.verify_is_allowed` 281 282 .. data:: N_FAILED_ATTEMPTS 283 284 Indicates that verification is disallowed because of ``n`` successive 285 failed attempts. The data dictionary should include the value of ``n`` 286 in member ``failure_count`` 287 288 """ 289 290 N_FAILED_ATTEMPTS = "N_FAILED_ATTEMPTS"
Constants that may be returned in the reason member of the extra
information dictionary returned by
:meth:~authentik.stages.authenticator.models.Device.verify_is_allowed
.. data:: N_FAILED_ATTEMPTS
Indicates that verification is disallowed because of n successive
failed attempts. The data dictionary should include the value of n
in member failure_count
293class ThrottlingMixin(models.Model): 294 """ 295 Mixin class for models that want throttling behaviour. 296 297 This implements exponential back-off for verifying tokens. Subclasses must 298 implement :meth:`get_throttle_factor`, and must use the 299 :meth:`verify_is_allowed`, :meth:`throttle_reset` and 300 :meth:`throttle_increment` methods from within their verify_token() method. 301 302 See the implementation of 303 :class:`~authentik.stages.authenticator.plugins.otp_email.models.EmailDevice` for an example. 304 305 """ 306 307 throttling_failure_timestamp = models.DateTimeField( 308 null=True, 309 blank=True, 310 default=None, 311 help_text=( 312 "A timestamp of the last failed verification attempt. " 313 "Null if last attempt succeeded." 314 ), 315 ) 316 317 throttling_failure_count = models.PositiveIntegerField( 318 default=0, help_text="Number of successive failed attempts." 319 ) 320 321 class Meta: 322 abstract = True 323 324 def verify_is_allowed(self): 325 """ 326 If verification is allowed, returns ``(True, None)``. 327 Otherwise, returns ``(False, data_dict)``. 328 329 ``data_dict`` contains further information. Currently it can be:: 330 331 { 332 'reason': VerifyNotAllowed.N_FAILED_ATTEMPTS, 333 'failure_count': n 334 } 335 336 where ``n`` is the number of successive failures. See 337 :class:`~authentik.stages.authenticator.models.VerifyNotAllowed`. 338 339 """ 340 if ( 341 self.throttling_enabled 342 and self.throttling_failure_count > 0 343 and self.throttling_failure_timestamp is not None 344 ): 345 now = timezone.now() 346 delay = (now - self.throttling_failure_timestamp).total_seconds() 347 # Required delays should be 1, 2, 4, 8 ... 348 delay_required = self.get_throttle_factor() * (2 ** (self.throttling_failure_count - 1)) 349 if delay < delay_required: 350 return ( 351 False, 352 { 353 "reason": VerifyNotAllowed.N_FAILED_ATTEMPTS, 354 "failure_count": self.throttling_failure_count, 355 "locked_until": self.throttling_failure_timestamp 356 + timedelta(seconds=delay_required), 357 }, 358 ) 359 360 return super().verify_is_allowed() 361 362 def throttle_reset(self, commit=True): 363 """ 364 Call this method to reset throttling (normally when a verify attempt 365 succeeded). 366 367 Pass 'commit=False' to avoid calling self.save(). 368 369 """ 370 self.throttling_failure_timestamp = None 371 self.throttling_failure_count = 0 372 if commit: 373 self.save() 374 375 def throttle_increment(self, commit=True): 376 """ 377 Call this method to increase throttling (normally when a verify attempt 378 failed). 379 380 Pass 'commit=False' to avoid calling self.save(). 381 382 """ 383 self.throttling_failure_timestamp = timezone.now() 384 self.throttling_failure_count += 1 385 if commit: 386 self.save() 387 388 @cached_property 389 def throttling_enabled(self) -> bool: 390 """Check if throttling is enabled""" 391 return self.get_throttle_factor() > 0 392 393 def get_throttle_factor(self): # pragma: no cover 394 """ 395 This must be implemented to return the throttle factor. 396 397 The number of seconds required between verification attempts will be 398 :math:`c2^{n-1}` where `c` is this factor and `n` is the number of 399 previous failures. A factor of 1 translates to delays of 1, 2, 4, 8, 400 etc. seconds. A factor of 0 disables the throttling. 401 402 Normally this is just a wrapper for a plugin-specific setting like 403 :setting:`OTP_EMAIL_THROTTLE_FACTOR`. 404 405 """ 406 raise NotImplementedError()
Mixin class for models that want throttling behaviour.
This implements exponential back-off for verifying tokens. Subclasses must
implement :meth:get_throttle_factor, and must use the
:meth:verify_is_allowed, :meth:throttle_reset and
:meth:throttle_increment methods from within their verify_token() method.
See the implementation of
:class:~authentik.stages.authenticator.plugins.otp_email.models.EmailDevice for an example.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
324 def verify_is_allowed(self): 325 """ 326 If verification is allowed, returns ``(True, None)``. 327 Otherwise, returns ``(False, data_dict)``. 328 329 ``data_dict`` contains further information. Currently it can be:: 330 331 { 332 'reason': VerifyNotAllowed.N_FAILED_ATTEMPTS, 333 'failure_count': n 334 } 335 336 where ``n`` is the number of successive failures. See 337 :class:`~authentik.stages.authenticator.models.VerifyNotAllowed`. 338 339 """ 340 if ( 341 self.throttling_enabled 342 and self.throttling_failure_count > 0 343 and self.throttling_failure_timestamp is not None 344 ): 345 now = timezone.now() 346 delay = (now - self.throttling_failure_timestamp).total_seconds() 347 # Required delays should be 1, 2, 4, 8 ... 348 delay_required = self.get_throttle_factor() * (2 ** (self.throttling_failure_count - 1)) 349 if delay < delay_required: 350 return ( 351 False, 352 { 353 "reason": VerifyNotAllowed.N_FAILED_ATTEMPTS, 354 "failure_count": self.throttling_failure_count, 355 "locked_until": self.throttling_failure_timestamp 356 + timedelta(seconds=delay_required), 357 }, 358 ) 359 360 return super().verify_is_allowed()
If verification is allowed, returns (True, None).
Otherwise, returns (False, data_dict).
data_dict contains further information. Currently it can be::
{
'reason': VerifyNotAllowed.N_FAILED_ATTEMPTS,
'failure_count': n
}
where n is the number of successive failures. See
:class:~authentik.stages.authenticator.models.VerifyNotAllowed.
362 def throttle_reset(self, commit=True): 363 """ 364 Call this method to reset throttling (normally when a verify attempt 365 succeeded). 366 367 Pass 'commit=False' to avoid calling self.save(). 368 369 """ 370 self.throttling_failure_timestamp = None 371 self.throttling_failure_count = 0 372 if commit: 373 self.save()
Call this method to reset throttling (normally when a verify attempt succeeded).
Pass 'commit=False' to avoid calling self.save().
375 def throttle_increment(self, commit=True): 376 """ 377 Call this method to increase throttling (normally when a verify attempt 378 failed). 379 380 Pass 'commit=False' to avoid calling self.save(). 381 382 """ 383 self.throttling_failure_timestamp = timezone.now() 384 self.throttling_failure_count += 1 385 if commit: 386 self.save()
Call this method to increase throttling (normally when a verify attempt failed).
Pass 'commit=False' to avoid calling self.save().
393 def get_throttle_factor(self): # pragma: no cover 394 """ 395 This must be implemented to return the throttle factor. 396 397 The number of seconds required between verification attempts will be 398 :math:`c2^{n-1}` where `c` is this factor and `n` is the number of 399 previous failures. A factor of 1 translates to delays of 1, 2, 4, 8, 400 etc. seconds. A factor of 0 disables the throttling. 401 402 Normally this is just a wrapper for a plugin-specific setting like 403 :setting:`OTP_EMAIL_THROTTLE_FACTOR`. 404 405 """ 406 raise NotImplementedError()
This must be implemented to return the throttle factor.
The number of seconds required between verification attempts will be
:math:c2^{n-1} where c is this factor and n is the number of
previous failures. A factor of 1 translates to delays of 1, 2, 4, 8,
etc. seconds. A factor of 0 disables the throttling.
Normally this is just a wrapper for a plugin-specific setting like
:setting:OTP_EMAIL_THROTTLE_FACTOR.