authentik.stages.authenticator.models

Base authenticator models

  1"""Base authenticator models"""
  2
  3from datetime import timedelta
  4
  5from django.apps import apps
  6from django.core.exceptions import ObjectDoesNotExist
  7from django.db import models
  8from django.utils import timezone
  9from django.utils.functional import cached_property
 10
 11from authentik.core.models import User
 12from authentik.lib.models import CreatedUpdatedModel
 13from authentik.stages.authenticator.util import random_number_token
 14
 15
 16class DeviceManager(models.Manager):
 17    """
 18    The :class:`~django.db.models.Manager` object installed as
 19    ``Device.objects``.
 20    """
 21
 22    def devices_for_user(self, user: User, confirmed: bool | None = None):
 23        """
 24        Returns a queryset for all devices of this class that belong to the
 25        given user.
 26
 27        :param user: The user.
 28        :type user: :class:`~django.contrib.auth.models.User`
 29
 30        :param confirmed: If ``None``, all matching devices are returned.
 31            Otherwise, this can be any true or false value to limit the query
 32            to confirmed or unconfirmed devices, respectively.
 33        """
 34        devices = self.model.objects.filter(user=user)
 35        if confirmed is not None:
 36            devices = devices.filter(confirmed=bool(confirmed))
 37
 38        return devices
 39
 40
 41class Device(CreatedUpdatedModel):
 42    """
 43    Abstract base model for a :term:`device` attached to a user. Plugins must
 44    subclass this to define their OTP models.
 45
 46    .. _unsaved_device_warning:
 47
 48    .. warning::
 49
 50        OTP devices are inherently stateful. For example, verifying a token is
 51        logically a mutating operation on the device, which may involve
 52        incrementing a counter or otherwise consuming a token. A device must be
 53        committed to the database before it can be used in any way.
 54
 55    .. attribute:: user
 56
 57        *ForeignKey*: Foreign key to your user model, as configured by
 58        :setting:`AUTH_USER_MODEL` (:class:`~django.contrib.auth.models.User`
 59        by default).
 60
 61    .. attribute:: name
 62
 63        *CharField*: A human-readable name to help the user identify their
 64        devices.
 65
 66    .. attribute:: confirmed
 67
 68        *BooleanField*: A boolean value that tells us whether this device has
 69        been confirmed as valid. It defaults to ``True``, but subclasses or
 70        individual deployments can force it to ``False`` if they wish to create
 71        a device and then ask the user for confirmation. As a rule, built-in
 72        APIs that enumerate devices will only include those that are confirmed.
 73
 74    .. attribute:: objects
 75
 76        A :class:`~authentik.stages.authenticator.models.DeviceManager`.
 77    """
 78
 79    user = models.ForeignKey(
 80        User,
 81        help_text="The user that this device belongs to.",
 82        on_delete=models.CASCADE,
 83    )
 84
 85    name = models.CharField(max_length=64, help_text="The human-readable name of this device.")
 86
 87    confirmed = models.BooleanField(default=True, help_text="Is this device ready for use?")
 88
 89    last_used = models.DateTimeField(null=True)
 90
 91    objects = DeviceManager()
 92
 93    class Meta:
 94        abstract = True
 95
 96    def __str__(self):
 97        try:
 98            user = self.user
 99        except ObjectDoesNotExist:
100            user = None
101
102        return f"{self.name} ({user})"
103
104    @property
105    def persistent_id(self):
106        """
107        A stable device identifier for forms and APIs.
108        """
109        return f"{self.model_label()}/{self.id}"
110
111    @classmethod
112    def model_label(cls):
113        """
114        Returns an identifier for this Django model class.
115
116        This is just the standard "<app_label>.<model_name>" form.
117
118        """
119        return f"{cls._meta.app_label}.{cls._meta.model_name}"
120
121    @classmethod
122    def from_persistent_id(cls, persistent_id, for_verify=False):
123        """
124        Loads a device from its persistent id::
125
126            device == Device.from_persistent_id(device.persistent_id)
127
128        :param bool for_verify: If ``True``, we'll load the device with
129            :meth:`~django.db.models.query.QuerySet.select_for_update` to
130            prevent concurrent verifications from succeeding. In which case,
131            this must be called inside a transaction.
132
133        """
134        device = None
135
136        try:
137            model_label, device_id = persistent_id.rsplit("/", 1)
138            app_label, model_name = model_label.split(".")
139
140            device_cls = apps.get_model(app_label, model_name)
141            if issubclass(device_cls, Device):
142                device_set = device_cls.objects.filter(id=int(device_id))
143                if for_verify:
144                    device_set = device_set.select_for_update()
145                device = device_set.first()
146        except ValueError, LookupError:
147            pass
148
149        return device
150
151    def is_interactive(self):
152        """
153        Returns ``True`` if this is an interactive device. The default
154        implementation returns ``True`` if
155        :meth:`~authentik.stages.authenticator.models.Device.generate_challenge` has been
156        overridden, but subclasses are welcome to provide smarter
157        implementations.
158
159        :rtype: bool
160        """
161        return not hasattr(self.generate_challenge, "stub")
162
163    def generate_challenge(self):
164        """
165        Generates a challenge value that the user will need to produce a token.
166        This method is permitted to have side effects, such as transmitting
167        information to the user through some other channel (email or SMS,
168        perhaps). And, of course, some devices may need to commit the
169        challenge to the database.
170
171        :returns: A message to the user. This should be a string that fits
172            comfortably in the template ``'OTP Challenge: {0}'``. This may
173            return ``None`` if this device is not interactive.
174        :rtype: string or ``None``
175
176        :raises: Any :exc:`~exceptions.Exception` is permitted. Callers should
177            trap ``Exception`` and report it to the user.
178        """
179        return None
180
181    generate_challenge.stub = True
182
183    def verify_is_allowed(self):
184        """
185        Checks whether it is permissible to call :meth:`verify_token`. If it is
186        allowed, returns ``(True, None)``. Otherwise returns ``(False,
187        data_dict)``, where ``data_dict`` contains extra information, defined
188        by the implementation.
189
190        This method can be used to implement throttling or locking, for
191        example. Client code should check this method before calling
192        :meth:`verify_token` and report problems to the user.
193
194        To report specific problems, the data dictionary can return include a
195        ``'reason'`` member with a value from the constants in
196        :class:`VerifyNotAllowed`. Otherwise, an ``'error_message'`` member
197        should be provided with an error message.
198
199        :meth:`verify_token` should also call this method and return False if
200        verification is not allowed.
201
202        :rtype: (bool, dict or ``None``)
203
204        """
205        return (True, None)
206
207    def verify_token(self, token):
208        """
209        Verifies a token. As a rule, the token should no longer be valid if
210        this returns ``True``.
211
212        :param str token: The OTP token provided by the user.
213        :rtype: bool
214        """
215        return False
216
217
218class SideChannelDevice(Device):
219    """
220    Abstract base model for a side-channel :term:`device` attached to a user.
221
222    This model implements token generation, verification and expiration, so the
223    concrete devices only have to implement delivery.
224
225    """
226
227    token = models.CharField(max_length=16, blank=True, null=True)
228
229    valid_until = models.DateTimeField(
230        default=timezone.now,
231        help_text="The timestamp of the moment of expiry of the saved token.",
232    )
233
234    class Meta:
235        abstract = True
236
237    def generate_token(self, length=6, valid_secs=300, commit=True):
238        """
239        Generates a token of the specified length, then sets it on the model
240        and sets the expiration of the token on the model.
241
242        Pass 'commit=False' to avoid calling self.save().
243
244        :param int length: Number of decimal digits in the generated token.
245        :param int valid_secs: Amount of seconds the token should be valid.
246        :param bool commit: Whether to autosave the generated token.
247
248        """
249        self.token = random_number_token(length)
250        self.valid_until = timezone.now() + timedelta(seconds=valid_secs)
251        if commit:
252            self.save()
253
254    def verify_token(self, token):
255        """
256        Verifies a token by content and expiry.
257
258        On success, the token is cleared and the device saved.
259
260        :param str token: The OTP token provided by the user.
261        :rtype: bool
262
263        """
264        _now = timezone.now()
265
266        if (self.token is not None) and (token == self.token) and (_now < self.valid_until):
267            self.token = None
268            self.valid_until = _now
269            self.save()
270
271            return True
272        return False
273
274
275class VerifyNotAllowed:
276    """
277    Constants that may be returned in the ``reason`` member of the extra
278    information dictionary returned by
279    :meth:`~authentik.stages.authenticator.models.Device.verify_is_allowed`
280
281    .. data:: N_FAILED_ATTEMPTS
282
283       Indicates that verification is disallowed because of ``n`` successive
284       failed attempts. The data dictionary should include the value of ``n``
285       in member ``failure_count``
286
287    """
288
289    N_FAILED_ATTEMPTS = "N_FAILED_ATTEMPTS"
290
291
292class ThrottlingMixin(models.Model):
293    """
294    Mixin class for models that want throttling behaviour.
295
296    This implements exponential back-off for verifying tokens. Subclasses must
297    implement :meth:`get_throttle_factor`, and must use the
298    :meth:`verify_is_allowed`, :meth:`throttle_reset` and
299    :meth:`throttle_increment` methods from within their verify_token() method.
300
301    See the implementation of
302    :class:`~authentik.stages.authenticator.plugins.otp_email.models.EmailDevice` for an example.
303
304    """
305
306    throttling_failure_timestamp = models.DateTimeField(
307        null=True,
308        blank=True,
309        default=None,
310        help_text=(
311            "A timestamp of the last failed verification attempt. "
312            "Null if last attempt succeeded."
313        ),
314    )
315
316    throttling_failure_count = models.PositiveIntegerField(
317        default=0, help_text="Number of successive failed attempts."
318    )
319
320    class Meta:
321        abstract = True
322
323    def verify_is_allowed(self):
324        """
325        If verification is allowed, returns ``(True, None)``.
326        Otherwise, returns ``(False, data_dict)``.
327
328        ``data_dict`` contains further information. Currently it can be::
329
330            {
331                'reason': VerifyNotAllowed.N_FAILED_ATTEMPTS,
332                'failure_count': n
333            }
334
335        where ``n`` is the number of successive failures. See
336        :class:`~authentik.stages.authenticator.models.VerifyNotAllowed`.
337
338        """
339        if (
340            self.throttling_enabled
341            and self.throttling_failure_count > 0
342            and self.throttling_failure_timestamp is not None
343        ):
344            now = timezone.now()
345            delay = (now - self.throttling_failure_timestamp).total_seconds()
346            # Required delays should be 1, 2, 4, 8 ...
347            delay_required = self.get_throttle_factor() * (2 ** (self.throttling_failure_count - 1))
348            if delay < delay_required:
349                return (
350                    False,
351                    {
352                        "reason": VerifyNotAllowed.N_FAILED_ATTEMPTS,
353                        "failure_count": self.throttling_failure_count,
354                        "locked_until": self.throttling_failure_timestamp
355                        + timedelta(seconds=delay_required),
356                    },
357                )
358
359        return super().verify_is_allowed()
360
361    def throttle_reset(self, commit=True):
362        """
363        Call this method to reset throttling (normally when a verify attempt
364        succeeded).
365
366        Pass 'commit=False' to avoid calling self.save().
367
368        """
369        self.throttling_failure_timestamp = None
370        self.throttling_failure_count = 0
371        if commit:
372            self.save()
373
374    def throttle_increment(self, commit=True):
375        """
376        Call this method to increase throttling (normally when a verify attempt
377        failed).
378
379        Pass 'commit=False' to avoid calling self.save().
380
381        """
382        self.throttling_failure_timestamp = timezone.now()
383        self.throttling_failure_count += 1
384        if commit:
385            self.save()
386
387    @cached_property
388    def throttling_enabled(self) -> bool:
389        """Check if throttling is enabled"""
390        return self.get_throttle_factor() > 0
391
392    def get_throttle_factor(self):  # pragma: no cover
393        """
394        This must be implemented to return the throttle factor.
395
396        The number of seconds required between verification attempts will be
397        :math:`c2^{n-1}` where `c` is this factor and `n` is the number of
398        previous failures. A factor of 1 translates to delays of 1, 2, 4, 8,
399        etc. seconds. A factor of 0 disables the throttling.
400
401        Normally this is just a wrapper for a plugin-specific setting like
402        :setting:`OTP_EMAIL_THROTTLE_FACTOR`.
403
404        """
405        raise NotImplementedError()
class DeviceManager(django.db.models.manager.Manager):
17class DeviceManager(models.Manager):
18    """
19    The :class:`~django.db.models.Manager` object installed as
20    ``Device.objects``.
21    """
22
23    def devices_for_user(self, user: User, confirmed: bool | None = None):
24        """
25        Returns a queryset for all devices of this class that belong to the
26        given user.
27
28        :param user: The user.
29        :type user: :class:`~django.contrib.auth.models.User`
30
31        :param confirmed: If ``None``, all matching devices are returned.
32            Otherwise, this can be any true or false value to limit the query
33            to confirmed or unconfirmed devices, respectively.
34        """
35        devices = self.model.objects.filter(user=user)
36        if confirmed is not None:
37            devices = devices.filter(confirmed=bool(confirmed))
38
39        return devices

The :class:~django.db.models.Manager object installed as Device.objects.

def devices_for_user( self, user: authentik.core.models.User, confirmed: bool | None = None):
23    def devices_for_user(self, user: User, confirmed: bool | None = None):
24        """
25        Returns a queryset for all devices of this class that belong to the
26        given user.
27
28        :param user: The user.
29        :type user: :class:`~django.contrib.auth.models.User`
30
31        :param confirmed: If ``None``, all matching devices are returned.
32            Otherwise, this can be any true or false value to limit the query
33            to confirmed or unconfirmed devices, respectively.
34        """
35        devices = self.model.objects.filter(user=user)
36        if confirmed is not None:
37            devices = devices.filter(confirmed=bool(confirmed))
38
39        return devices

Returns a queryset for all devices of this class that belong to the given user.

:param user: The user. :type user: :class:~django.contrib.auth.models.User

:param confirmed: If None, all matching devices are returned. Otherwise, this can be any true or false value to limit the query to confirmed or unconfirmed devices, respectively.

class Device(authentik.lib.models.CreatedUpdatedModel):
 42class Device(CreatedUpdatedModel):
 43    """
 44    Abstract base model for a :term:`device` attached to a user. Plugins must
 45    subclass this to define their OTP models.
 46
 47    .. _unsaved_device_warning:
 48
 49    .. warning::
 50
 51        OTP devices are inherently stateful. For example, verifying a token is
 52        logically a mutating operation on the device, which may involve
 53        incrementing a counter or otherwise consuming a token. A device must be
 54        committed to the database before it can be used in any way.
 55
 56    .. attribute:: user
 57
 58        *ForeignKey*: Foreign key to your user model, as configured by
 59        :setting:`AUTH_USER_MODEL` (:class:`~django.contrib.auth.models.User`
 60        by default).
 61
 62    .. attribute:: name
 63
 64        *CharField*: A human-readable name to help the user identify their
 65        devices.
 66
 67    .. attribute:: confirmed
 68
 69        *BooleanField*: A boolean value that tells us whether this device has
 70        been confirmed as valid. It defaults to ``True``, but subclasses or
 71        individual deployments can force it to ``False`` if they wish to create
 72        a device and then ask the user for confirmation. As a rule, built-in
 73        APIs that enumerate devices will only include those that are confirmed.
 74
 75    .. attribute:: objects
 76
 77        A :class:`~authentik.stages.authenticator.models.DeviceManager`.
 78    """
 79
 80    user = models.ForeignKey(
 81        User,
 82        help_text="The user that this device belongs to.",
 83        on_delete=models.CASCADE,
 84    )
 85
 86    name = models.CharField(max_length=64, help_text="The human-readable name of this device.")
 87
 88    confirmed = models.BooleanField(default=True, help_text="Is this device ready for use?")
 89
 90    last_used = models.DateTimeField(null=True)
 91
 92    objects = DeviceManager()
 93
 94    class Meta:
 95        abstract = True
 96
 97    def __str__(self):
 98        try:
 99            user = self.user
100        except ObjectDoesNotExist:
101            user = None
102
103        return f"{self.name} ({user})"
104
105    @property
106    def persistent_id(self):
107        """
108        A stable device identifier for forms and APIs.
109        """
110        return f"{self.model_label()}/{self.id}"
111
112    @classmethod
113    def model_label(cls):
114        """
115        Returns an identifier for this Django model class.
116
117        This is just the standard "<app_label>.<model_name>" form.
118
119        """
120        return f"{cls._meta.app_label}.{cls._meta.model_name}"
121
122    @classmethod
123    def from_persistent_id(cls, persistent_id, for_verify=False):
124        """
125        Loads a device from its persistent id::
126
127            device == Device.from_persistent_id(device.persistent_id)
128
129        :param bool for_verify: If ``True``, we'll load the device with
130            :meth:`~django.db.models.query.QuerySet.select_for_update` to
131            prevent concurrent verifications from succeeding. In which case,
132            this must be called inside a transaction.
133
134        """
135        device = None
136
137        try:
138            model_label, device_id = persistent_id.rsplit("/", 1)
139            app_label, model_name = model_label.split(".")
140
141            device_cls = apps.get_model(app_label, model_name)
142            if issubclass(device_cls, Device):
143                device_set = device_cls.objects.filter(id=int(device_id))
144                if for_verify:
145                    device_set = device_set.select_for_update()
146                device = device_set.first()
147        except ValueError, LookupError:
148            pass
149
150        return device
151
152    def is_interactive(self):
153        """
154        Returns ``True`` if this is an interactive device. The default
155        implementation returns ``True`` if
156        :meth:`~authentik.stages.authenticator.models.Device.generate_challenge` has been
157        overridden, but subclasses are welcome to provide smarter
158        implementations.
159
160        :rtype: bool
161        """
162        return not hasattr(self.generate_challenge, "stub")
163
164    def generate_challenge(self):
165        """
166        Generates a challenge value that the user will need to produce a token.
167        This method is permitted to have side effects, such as transmitting
168        information to the user through some other channel (email or SMS,
169        perhaps). And, of course, some devices may need to commit the
170        challenge to the database.
171
172        :returns: A message to the user. This should be a string that fits
173            comfortably in the template ``'OTP Challenge: {0}'``. This may
174            return ``None`` if this device is not interactive.
175        :rtype: string or ``None``
176
177        :raises: Any :exc:`~exceptions.Exception` is permitted. Callers should
178            trap ``Exception`` and report it to the user.
179        """
180        return None
181
182    generate_challenge.stub = True
183
184    def verify_is_allowed(self):
185        """
186        Checks whether it is permissible to call :meth:`verify_token`. If it is
187        allowed, returns ``(True, None)``. Otherwise returns ``(False,
188        data_dict)``, where ``data_dict`` contains extra information, defined
189        by the implementation.
190
191        This method can be used to implement throttling or locking, for
192        example. Client code should check this method before calling
193        :meth:`verify_token` and report problems to the user.
194
195        To report specific problems, the data dictionary can return include a
196        ``'reason'`` member with a value from the constants in
197        :class:`VerifyNotAllowed`. Otherwise, an ``'error_message'`` member
198        should be provided with an error message.
199
200        :meth:`verify_token` should also call this method and return False if
201        verification is not allowed.
202
203        :rtype: (bool, dict or ``None``)
204
205        """
206        return (True, None)
207
208    def verify_token(self, token):
209        """
210        Verifies a token. As a rule, the token should no longer be valid if
211        this returns ``True``.
212
213        :param str token: The OTP token provided by the user.
214        :rtype: bool
215        """
216        return False

Abstract base model for a :term:device attached to a user. Plugins must subclass this to define their OTP models.

.. _unsaved_device_warning:

.. warning::

OTP devices are inherently stateful. For example, verifying a token is
logically a mutating operation on the device, which may involve
incrementing a counter or otherwise consuming a token. A device must be
committed to the database before it can be used in any way.

.. attribute:: user

*ForeignKey*: Foreign key to your user model, as configured by
:setting:`AUTH_USER_MODEL` (:class:`~django.contrib.auth.models.User`
by default).

.. attribute:: name

*CharField*: A human-readable name to help the user identify their
devices.

.. attribute:: confirmed

*BooleanField*: A boolean value that tells us whether this device has
been confirmed as valid. It defaults to ``True``, but subclasses or
individual deployments can force it to ``False`` if they wish to create
a device and then ask the user for confirmation. As a rule, built-in
APIs that enumerate devices will only include those that are confirmed.

.. attribute:: objects

A :class:`~authentik.stages.authenticator.models.DeviceManager`.
user

Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.

In the example::

class Child(Model):
    parent = ForeignKey(Parent, related_name='children')

Child.parent is a ForwardManyToOneDescriptor instance.

def name(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def confirmed(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def last_used(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def objects(unknown):

The type of the None singleton.

persistent_id
105    @property
106    def persistent_id(self):
107        """
108        A stable device identifier for forms and APIs.
109        """
110        return f"{self.model_label()}/{self.id}"

A stable device identifier for forms and APIs.

@classmethod
def model_label(cls):
112    @classmethod
113    def model_label(cls):
114        """
115        Returns an identifier for this Django model class.
116
117        This is just the standard "<app_label>.<model_name>" form.
118
119        """
120        return f"{cls._meta.app_label}.{cls._meta.model_name}"

Returns an identifier for this Django model class.

This is just the standard "." form.

@classmethod
def from_persistent_id(cls, persistent_id, for_verify=False):
122    @classmethod
123    def from_persistent_id(cls, persistent_id, for_verify=False):
124        """
125        Loads a device from its persistent id::
126
127            device == Device.from_persistent_id(device.persistent_id)
128
129        :param bool for_verify: If ``True``, we'll load the device with
130            :meth:`~django.db.models.query.QuerySet.select_for_update` to
131            prevent concurrent verifications from succeeding. In which case,
132            this must be called inside a transaction.
133
134        """
135        device = None
136
137        try:
138            model_label, device_id = persistent_id.rsplit("/", 1)
139            app_label, model_name = model_label.split(".")
140
141            device_cls = apps.get_model(app_label, model_name)
142            if issubclass(device_cls, Device):
143                device_set = device_cls.objects.filter(id=int(device_id))
144                if for_verify:
145                    device_set = device_set.select_for_update()
146                device = device_set.first()
147        except ValueError, LookupError:
148            pass
149
150        return device

Loads a device from its persistent id::

device == Device.from_persistent_id(device.persistent_id)

:param bool for_verify: If True, we'll load the device with :meth:~django.db.models.query.QuerySet.select_for_update to prevent concurrent verifications from succeeding. In which case, this must be called inside a transaction.

def is_interactive(self):
152    def is_interactive(self):
153        """
154        Returns ``True`` if this is an interactive device. The default
155        implementation returns ``True`` if
156        :meth:`~authentik.stages.authenticator.models.Device.generate_challenge` has been
157        overridden, but subclasses are welcome to provide smarter
158        implementations.
159
160        :rtype: bool
161        """
162        return not hasattr(self.generate_challenge, "stub")

Returns True if this is an interactive device. The default implementation returns True if :meth:~authentik.stages.authenticator.models.Device.generate_challenge has been overridden, but subclasses are welcome to provide smarter implementations.

:rtype: bool

def generate_challenge(self):
164    def generate_challenge(self):
165        """
166        Generates a challenge value that the user will need to produce a token.
167        This method is permitted to have side effects, such as transmitting
168        information to the user through some other channel (email or SMS,
169        perhaps). And, of course, some devices may need to commit the
170        challenge to the database.
171
172        :returns: A message to the user. This should be a string that fits
173            comfortably in the template ``'OTP Challenge: {0}'``. This may
174            return ``None`` if this device is not interactive.
175        :rtype: string or ``None``
176
177        :raises: Any :exc:`~exceptions.Exception` is permitted. Callers should
178            trap ``Exception`` and report it to the user.
179        """
180        return None

Generates a challenge value that the user will need to produce a token. This method is permitted to have side effects, such as transmitting information to the user through some other channel (email or SMS, perhaps). And, of course, some devices may need to commit the challenge to the database.

:returns: A message to the user. This should be a string that fits comfortably in the template 'OTP Challenge: {0}'. This may return None if this device is not interactive. :rtype: string or None

:raises: Any :exc:~exceptions.Exception is permitted. Callers should trap Exception and report it to the user.

def verify_is_allowed(self):
184    def verify_is_allowed(self):
185        """
186        Checks whether it is permissible to call :meth:`verify_token`. If it is
187        allowed, returns ``(True, None)``. Otherwise returns ``(False,
188        data_dict)``, where ``data_dict`` contains extra information, defined
189        by the implementation.
190
191        This method can be used to implement throttling or locking, for
192        example. Client code should check this method before calling
193        :meth:`verify_token` and report problems to the user.
194
195        To report specific problems, the data dictionary can return include a
196        ``'reason'`` member with a value from the constants in
197        :class:`VerifyNotAllowed`. Otherwise, an ``'error_message'`` member
198        should be provided with an error message.
199
200        :meth:`verify_token` should also call this method and return False if
201        verification is not allowed.
202
203        :rtype: (bool, dict or ``None``)
204
205        """
206        return (True, None)

Checks whether it is permissible to call :meth:verify_token. If it is allowed, returns (True, None). Otherwise returns (False, data_dict), where data_dict contains extra information, defined by the implementation.

This method can be used to implement throttling or locking, for example. Client code should check this method before calling :meth:verify_token and report problems to the user.

To report specific problems, the data dictionary can return include a 'reason' member with a value from the constants in :class:VerifyNotAllowed. Otherwise, an 'error_message' member should be provided with an error message.

:meth:verify_token should also call this method and return False if verification is not allowed.

:rtype: (bool, dict or None)

def verify_token(self, token):
208    def verify_token(self, token):
209        """
210        Verifies a token. As a rule, the token should no longer be valid if
211        this returns ``True``.
212
213        :param str token: The OTP token provided by the user.
214        :rtype: bool
215        """
216        return False

Verifies a token. As a rule, the token should no longer be valid if this returns True.

:param str token: The OTP token provided by the user. :rtype: bool

def created(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def last_updated(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

user_id
def get_next_by_created(unknown):

Method descriptor with partial application of the given arguments and keywords.

Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.

def get_previous_by_created(unknown):

Method descriptor with partial application of the given arguments and keywords.

Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.

def get_next_by_last_updated(unknown):

Method descriptor with partial application of the given arguments and keywords.

Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.

def get_previous_by_last_updated(unknown):

Method descriptor with partial application of the given arguments and keywords.

Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.

class Device.Meta:
94    class Meta:
95        abstract = True
abstract = False
class SideChannelDevice(Device):
219class SideChannelDevice(Device):
220    """
221    Abstract base model for a side-channel :term:`device` attached to a user.
222
223    This model implements token generation, verification and expiration, so the
224    concrete devices only have to implement delivery.
225
226    """
227
228    token = models.CharField(max_length=16, blank=True, null=True)
229
230    valid_until = models.DateTimeField(
231        default=timezone.now,
232        help_text="The timestamp of the moment of expiry of the saved token.",
233    )
234
235    class Meta:
236        abstract = True
237
238    def generate_token(self, length=6, valid_secs=300, commit=True):
239        """
240        Generates a token of the specified length, then sets it on the model
241        and sets the expiration of the token on the model.
242
243        Pass 'commit=False' to avoid calling self.save().
244
245        :param int length: Number of decimal digits in the generated token.
246        :param int valid_secs: Amount of seconds the token should be valid.
247        :param bool commit: Whether to autosave the generated token.
248
249        """
250        self.token = random_number_token(length)
251        self.valid_until = timezone.now() + timedelta(seconds=valid_secs)
252        if commit:
253            self.save()
254
255    def verify_token(self, token):
256        """
257        Verifies a token by content and expiry.
258
259        On success, the token is cleared and the device saved.
260
261        :param str token: The OTP token provided by the user.
262        :rtype: bool
263
264        """
265        _now = timezone.now()
266
267        if (self.token is not None) and (token == self.token) and (_now < self.valid_until):
268            self.token = None
269            self.valid_until = _now
270            self.save()
271
272            return True
273        return False

Abstract base model for a side-channel :term:device attached to a user.

This model implements token generation, verification and expiration, so the concrete devices only have to implement delivery.

def token(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def valid_until(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def generate_token(self, length=6, valid_secs=300, commit=True):
238    def generate_token(self, length=6, valid_secs=300, commit=True):
239        """
240        Generates a token of the specified length, then sets it on the model
241        and sets the expiration of the token on the model.
242
243        Pass 'commit=False' to avoid calling self.save().
244
245        :param int length: Number of decimal digits in the generated token.
246        :param int valid_secs: Amount of seconds the token should be valid.
247        :param bool commit: Whether to autosave the generated token.
248
249        """
250        self.token = random_number_token(length)
251        self.valid_until = timezone.now() + timedelta(seconds=valid_secs)
252        if commit:
253            self.save()

Generates a token of the specified length, then sets it on the model and sets the expiration of the token on the model.

Pass 'commit=False' to avoid calling self.save().

:param int length: Number of decimal digits in the generated token. :param int valid_secs: Amount of seconds the token should be valid. :param bool commit: Whether to autosave the generated token.

def verify_token(self, token):
255    def verify_token(self, token):
256        """
257        Verifies a token by content and expiry.
258
259        On success, the token is cleared and the device saved.
260
261        :param str token: The OTP token provided by the user.
262        :rtype: bool
263
264        """
265        _now = timezone.now()
266
267        if (self.token is not None) and (token == self.token) and (_now < self.valid_until):
268            self.token = None
269            self.valid_until = _now
270            self.save()
271
272            return True
273        return False

Verifies a token by content and expiry.

On success, the token is cleared and the device saved.

:param str token: The OTP token provided by the user. :rtype: bool

user

Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.

In the example::

class Child(Model):
    parent = ForeignKey(Parent, related_name='children')

Child.parent is a ForwardManyToOneDescriptor instance.

def name(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def confirmed(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def last_used(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def created(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def last_updated(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def get_next_by_valid_until(unknown):

Method descriptor with partial application of the given arguments and keywords.

Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.

def get_previous_by_valid_until(unknown):

Method descriptor with partial application of the given arguments and keywords.

Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.

def get_next_by_created(unknown):

Method descriptor with partial application of the given arguments and keywords.

Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.

def get_previous_by_created(unknown):

Method descriptor with partial application of the given arguments and keywords.

Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.

def get_next_by_last_updated(unknown):

Method descriptor with partial application of the given arguments and keywords.

Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.

def get_previous_by_last_updated(unknown):

Method descriptor with partial application of the given arguments and keywords.

Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.

user_id
class SideChannelDevice.Meta:
235    class Meta:
236        abstract = True
abstract = False
class VerifyNotAllowed:
276class VerifyNotAllowed:
277    """
278    Constants that may be returned in the ``reason`` member of the extra
279    information dictionary returned by
280    :meth:`~authentik.stages.authenticator.models.Device.verify_is_allowed`
281
282    .. data:: N_FAILED_ATTEMPTS
283
284       Indicates that verification is disallowed because of ``n`` successive
285       failed attempts. The data dictionary should include the value of ``n``
286       in member ``failure_count``
287
288    """
289
290    N_FAILED_ATTEMPTS = "N_FAILED_ATTEMPTS"

Constants that may be returned in the reason member of the extra information dictionary returned by :meth:~authentik.stages.authenticator.models.Device.verify_is_allowed

.. data:: N_FAILED_ATTEMPTS

Indicates that verification is disallowed because of n successive failed attempts. The data dictionary should include the value of n in member failure_count

N_FAILED_ATTEMPTS = 'N_FAILED_ATTEMPTS'
class ThrottlingMixin(django.db.models.base.Model):
293class ThrottlingMixin(models.Model):
294    """
295    Mixin class for models that want throttling behaviour.
296
297    This implements exponential back-off for verifying tokens. Subclasses must
298    implement :meth:`get_throttle_factor`, and must use the
299    :meth:`verify_is_allowed`, :meth:`throttle_reset` and
300    :meth:`throttle_increment` methods from within their verify_token() method.
301
302    See the implementation of
303    :class:`~authentik.stages.authenticator.plugins.otp_email.models.EmailDevice` for an example.
304
305    """
306
307    throttling_failure_timestamp = models.DateTimeField(
308        null=True,
309        blank=True,
310        default=None,
311        help_text=(
312            "A timestamp of the last failed verification attempt. "
313            "Null if last attempt succeeded."
314        ),
315    )
316
317    throttling_failure_count = models.PositiveIntegerField(
318        default=0, help_text="Number of successive failed attempts."
319    )
320
321    class Meta:
322        abstract = True
323
324    def verify_is_allowed(self):
325        """
326        If verification is allowed, returns ``(True, None)``.
327        Otherwise, returns ``(False, data_dict)``.
328
329        ``data_dict`` contains further information. Currently it can be::
330
331            {
332                'reason': VerifyNotAllowed.N_FAILED_ATTEMPTS,
333                'failure_count': n
334            }
335
336        where ``n`` is the number of successive failures. See
337        :class:`~authentik.stages.authenticator.models.VerifyNotAllowed`.
338
339        """
340        if (
341            self.throttling_enabled
342            and self.throttling_failure_count > 0
343            and self.throttling_failure_timestamp is not None
344        ):
345            now = timezone.now()
346            delay = (now - self.throttling_failure_timestamp).total_seconds()
347            # Required delays should be 1, 2, 4, 8 ...
348            delay_required = self.get_throttle_factor() * (2 ** (self.throttling_failure_count - 1))
349            if delay < delay_required:
350                return (
351                    False,
352                    {
353                        "reason": VerifyNotAllowed.N_FAILED_ATTEMPTS,
354                        "failure_count": self.throttling_failure_count,
355                        "locked_until": self.throttling_failure_timestamp
356                        + timedelta(seconds=delay_required),
357                    },
358                )
359
360        return super().verify_is_allowed()
361
362    def throttle_reset(self, commit=True):
363        """
364        Call this method to reset throttling (normally when a verify attempt
365        succeeded).
366
367        Pass 'commit=False' to avoid calling self.save().
368
369        """
370        self.throttling_failure_timestamp = None
371        self.throttling_failure_count = 0
372        if commit:
373            self.save()
374
375    def throttle_increment(self, commit=True):
376        """
377        Call this method to increase throttling (normally when a verify attempt
378        failed).
379
380        Pass 'commit=False' to avoid calling self.save().
381
382        """
383        self.throttling_failure_timestamp = timezone.now()
384        self.throttling_failure_count += 1
385        if commit:
386            self.save()
387
388    @cached_property
389    def throttling_enabled(self) -> bool:
390        """Check if throttling is enabled"""
391        return self.get_throttle_factor() > 0
392
393    def get_throttle_factor(self):  # pragma: no cover
394        """
395        This must be implemented to return the throttle factor.
396
397        The number of seconds required between verification attempts will be
398        :math:`c2^{n-1}` where `c` is this factor and `n` is the number of
399        previous failures. A factor of 1 translates to delays of 1, 2, 4, 8,
400        etc. seconds. A factor of 0 disables the throttling.
401
402        Normally this is just a wrapper for a plugin-specific setting like
403        :setting:`OTP_EMAIL_THROTTLE_FACTOR`.
404
405        """
406        raise NotImplementedError()

Mixin class for models that want throttling behaviour.

This implements exponential back-off for verifying tokens. Subclasses must implement :meth:get_throttle_factor, and must use the :meth:verify_is_allowed, :meth:throttle_reset and :meth:throttle_increment methods from within their verify_token() method.

See the implementation of :class:~authentik.stages.authenticator.plugins.otp_email.models.EmailDevice for an example.

def throttling_failure_timestamp(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def throttling_failure_count(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def verify_is_allowed(self):
324    def verify_is_allowed(self):
325        """
326        If verification is allowed, returns ``(True, None)``.
327        Otherwise, returns ``(False, data_dict)``.
328
329        ``data_dict`` contains further information. Currently it can be::
330
331            {
332                'reason': VerifyNotAllowed.N_FAILED_ATTEMPTS,
333                'failure_count': n
334            }
335
336        where ``n`` is the number of successive failures. See
337        :class:`~authentik.stages.authenticator.models.VerifyNotAllowed`.
338
339        """
340        if (
341            self.throttling_enabled
342            and self.throttling_failure_count > 0
343            and self.throttling_failure_timestamp is not None
344        ):
345            now = timezone.now()
346            delay = (now - self.throttling_failure_timestamp).total_seconds()
347            # Required delays should be 1, 2, 4, 8 ...
348            delay_required = self.get_throttle_factor() * (2 ** (self.throttling_failure_count - 1))
349            if delay < delay_required:
350                return (
351                    False,
352                    {
353                        "reason": VerifyNotAllowed.N_FAILED_ATTEMPTS,
354                        "failure_count": self.throttling_failure_count,
355                        "locked_until": self.throttling_failure_timestamp
356                        + timedelta(seconds=delay_required),
357                    },
358                )
359
360        return super().verify_is_allowed()

If verification is allowed, returns (True, None). Otherwise, returns (False, data_dict).

data_dict contains further information. Currently it can be::

{
    'reason': VerifyNotAllowed.N_FAILED_ATTEMPTS,
    'failure_count': n
}

where n is the number of successive failures. See :class:~authentik.stages.authenticator.models.VerifyNotAllowed.

def throttle_reset(self, commit=True):
362    def throttle_reset(self, commit=True):
363        """
364        Call this method to reset throttling (normally when a verify attempt
365        succeeded).
366
367        Pass 'commit=False' to avoid calling self.save().
368
369        """
370        self.throttling_failure_timestamp = None
371        self.throttling_failure_count = 0
372        if commit:
373            self.save()

Call this method to reset throttling (normally when a verify attempt succeeded).

Pass 'commit=False' to avoid calling self.save().

def throttle_increment(self, commit=True):
375    def throttle_increment(self, commit=True):
376        """
377        Call this method to increase throttling (normally when a verify attempt
378        failed).
379
380        Pass 'commit=False' to avoid calling self.save().
381
382        """
383        self.throttling_failure_timestamp = timezone.now()
384        self.throttling_failure_count += 1
385        if commit:
386            self.save()

Call this method to increase throttling (normally when a verify attempt failed).

Pass 'commit=False' to avoid calling self.save().

def throttling_enabled(unknown):

Check if throttling is enabled

def get_throttle_factor(self):
393    def get_throttle_factor(self):  # pragma: no cover
394        """
395        This must be implemented to return the throttle factor.
396
397        The number of seconds required between verification attempts will be
398        :math:`c2^{n-1}` where `c` is this factor and `n` is the number of
399        previous failures. A factor of 1 translates to delays of 1, 2, 4, 8,
400        etc. seconds. A factor of 0 disables the throttling.
401
402        Normally this is just a wrapper for a plugin-specific setting like
403        :setting:`OTP_EMAIL_THROTTLE_FACTOR`.
404
405        """
406        raise NotImplementedError()

This must be implemented to return the throttle factor.

The number of seconds required between verification attempts will be :math:c2^{n-1} where c is this factor and n is the number of previous failures. A factor of 1 translates to delays of 1, 2, 4, 8, etc. seconds. A factor of 0 disables the throttling.

Normally this is just a wrapper for a plugin-specific setting like :setting:OTP_EMAIL_THROTTLE_FACTOR.

class ThrottlingMixin.Meta:
321    class Meta:
322        abstract = True
abstract = False