authentik.stages.authenticator_email.models

  1from django.contrib.auth import get_user_model
  2from django.core.mail.backends.base import BaseEmailBackend
  3from django.core.mail.backends.smtp import EmailBackend
  4from django.db import models
  5from django.template import TemplateSyntaxError
  6from django.utils.translation import gettext_lazy as _
  7from django.views import View
  8from rest_framework.serializers import BaseSerializer
  9
 10from authentik.core.types import UserSettingSerializer
 11from authentik.events.models import Event, EventAction
 12from authentik.flows.exceptions import StageInvalidException
 13from authentik.flows.models import ConfigurableStage, FriendlyNamedStage, Stage
 14from authentik.lib.config import CONFIG
 15from authentik.lib.models import SerializerModel
 16from authentik.lib.utils.time import timedelta_string_validator
 17from authentik.stages.authenticator.models import SideChannelDevice, ThrottlingMixin
 18from authentik.stages.email.models import EmailTemplates
 19from authentik.stages.email.utils import TemplateEmailMessage
 20
 21
 22class AuthenticatorEmailStage(ConfigurableStage, FriendlyNamedStage, Stage):
 23    """Setup Email-based authentication for the user."""
 24
 25    use_global_settings = models.BooleanField(
 26        default=False,
 27        help_text=_(
 28            "When enabled, global Email connection settings will be used and "
 29            "connection settings below will be ignored."
 30        ),
 31    )
 32
 33    host = models.TextField(default="localhost")
 34    port = models.IntegerField(default=25)
 35    username = models.TextField(default="", blank=True)
 36    password = models.TextField(default="", blank=True)
 37    use_tls = models.BooleanField(default=False)
 38    use_ssl = models.BooleanField(default=False)
 39    timeout = models.IntegerField(default=10)
 40    from_address = models.EmailField(default="system@authentik.local")
 41
 42    token_expiry = models.TextField(
 43        default="minutes=30",
 44        validators=[timedelta_string_validator],
 45        help_text=_("Time the token sent is valid (Format: hours=3,minutes=17,seconds=300)."),
 46    )
 47    subject = models.TextField(default="authentik Sign-in code")
 48    template = models.TextField(default=EmailTemplates.EMAIL_OTP)
 49
 50    @property
 51    def serializer(self) -> type[BaseSerializer]:
 52        from authentik.stages.authenticator_email.api import AuthenticatorEmailStageSerializer
 53
 54        return AuthenticatorEmailStageSerializer
 55
 56    @property
 57    def view(self) -> type[View]:
 58        from authentik.stages.authenticator_email.stage import AuthenticatorEmailStageView
 59
 60        return AuthenticatorEmailStageView
 61
 62    @property
 63    def component(self) -> str:
 64        return "ak-stage-authenticator-email-form"
 65
 66    def ui_user_settings(self) -> UserSettingSerializer | None:
 67        return UserSettingSerializer(
 68            data={
 69                "title": self.friendly_name or str(self._meta.verbose_name),
 70                "component": "ak-user-settings-authenticator-email",
 71            }
 72        )
 73
 74    @property
 75    def backend_class(self) -> type[BaseEmailBackend]:
 76        """Get the email backend class to use"""
 77        return EmailBackend
 78
 79    @property
 80    def backend(self) -> BaseEmailBackend:
 81        """Get fully configured Email Backend instance"""
 82        if self.use_global_settings:
 83            CONFIG.refresh("email.password")
 84            return self.backend_class(
 85                host=CONFIG.get("email.host"),
 86                port=CONFIG.get_int("email.port"),
 87                username=CONFIG.get("email.username"),
 88                password=CONFIG.get("email.password"),
 89                use_tls=CONFIG.get_bool("email.use_tls", False),
 90                use_ssl=CONFIG.get_bool("email.use_ssl", False),
 91                timeout=CONFIG.get_int("email.timeout"),
 92            )
 93        return self.backend_class(
 94            host=self.host,
 95            port=self.port,
 96            username=self.username,
 97            password=self.password,
 98            use_tls=self.use_tls,
 99            use_ssl=self.use_ssl,
100            timeout=self.timeout,
101        )
102
103    def send(self, device: EmailDevice):
104        # Lazy import here to avoid circular import
105        from authentik.stages.email.tasks import send_mails
106
107        # Compose the message using templates
108        message = device._compose_email()
109        return send_mails(device.stage, message)
110
111    def __str__(self):
112        return f"Email Authenticator Stage {self.name}"
113
114    class Meta:
115        verbose_name = _("Email Authenticator Setup Stage")
116        verbose_name_plural = _("Email Authenticator Setup Stages")
117
118
119class EmailDevice(SerializerModel, ThrottlingMixin, SideChannelDevice):
120    """Email Device"""
121
122    user = models.ForeignKey(get_user_model(), on_delete=models.CASCADE)
123    email = models.EmailField()
124    stage = models.ForeignKey(AuthenticatorEmailStage, on_delete=models.CASCADE)
125    last_used = models.DateTimeField(auto_now=True)
126
127    @property
128    def serializer(self) -> type[BaseSerializer]:
129        from authentik.stages.authenticator_email.api import EmailDeviceSerializer
130
131        return EmailDeviceSerializer
132
133    def verify_token(self, token: str) -> bool:
134        verify_allowed, _ = self.verify_is_allowed()
135        if verify_allowed:
136            verified = super().verify_token(token)
137
138            if verified:
139                self.throttle_reset()
140            else:
141                self.throttle_increment()
142        else:
143            verified = False
144
145        return verified
146
147    def _compose_email(self) -> TemplateEmailMessage:
148        try:
149            pending_user = self.user
150            stage = self.stage
151            email = self.email
152
153            message = TemplateEmailMessage(
154                subject=_(stage.subject),
155                to=[(pending_user.name, email)],
156                template_name=stage.template,
157                template_context={
158                    "user": pending_user,
159                    "expires": self.valid_until,
160                    "token": self.token,
161                },
162            )
163            return message
164        except TemplateSyntaxError as exc:
165            Event.new(
166                EventAction.CONFIGURATION_ERROR,
167                message=_("Exception occurred while rendering E-mail template"),
168                template=stage.template,
169            ).with_exception(exc).from_http(self.request)
170            raise StageInvalidException from exc
171
172    def __str__(self):
173        if not self.pk:
174            return "New Email Device"
175        return f"Email Device for {self.user_id}"
176
177    class Meta:
178        verbose_name = _("Email Device")
179        verbose_name_plural = _("Email Devices")
180        unique_together = (("user", "email"),)
 23class AuthenticatorEmailStage(ConfigurableStage, FriendlyNamedStage, Stage):
 24    """Setup Email-based authentication for the user."""
 25
 26    use_global_settings = models.BooleanField(
 27        default=False,
 28        help_text=_(
 29            "When enabled, global Email connection settings will be used and "
 30            "connection settings below will be ignored."
 31        ),
 32    )
 33
 34    host = models.TextField(default="localhost")
 35    port = models.IntegerField(default=25)
 36    username = models.TextField(default="", blank=True)
 37    password = models.TextField(default="", blank=True)
 38    use_tls = models.BooleanField(default=False)
 39    use_ssl = models.BooleanField(default=False)
 40    timeout = models.IntegerField(default=10)
 41    from_address = models.EmailField(default="system@authentik.local")
 42
 43    token_expiry = models.TextField(
 44        default="minutes=30",
 45        validators=[timedelta_string_validator],
 46        help_text=_("Time the token sent is valid (Format: hours=3,minutes=17,seconds=300)."),
 47    )
 48    subject = models.TextField(default="authentik Sign-in code")
 49    template = models.TextField(default=EmailTemplates.EMAIL_OTP)
 50
 51    @property
 52    def serializer(self) -> type[BaseSerializer]:
 53        from authentik.stages.authenticator_email.api import AuthenticatorEmailStageSerializer
 54
 55        return AuthenticatorEmailStageSerializer
 56
 57    @property
 58    def view(self) -> type[View]:
 59        from authentik.stages.authenticator_email.stage import AuthenticatorEmailStageView
 60
 61        return AuthenticatorEmailStageView
 62
 63    @property
 64    def component(self) -> str:
 65        return "ak-stage-authenticator-email-form"
 66
 67    def ui_user_settings(self) -> UserSettingSerializer | None:
 68        return UserSettingSerializer(
 69            data={
 70                "title": self.friendly_name or str(self._meta.verbose_name),
 71                "component": "ak-user-settings-authenticator-email",
 72            }
 73        )
 74
 75    @property
 76    def backend_class(self) -> type[BaseEmailBackend]:
 77        """Get the email backend class to use"""
 78        return EmailBackend
 79
 80    @property
 81    def backend(self) -> BaseEmailBackend:
 82        """Get fully configured Email Backend instance"""
 83        if self.use_global_settings:
 84            CONFIG.refresh("email.password")
 85            return self.backend_class(
 86                host=CONFIG.get("email.host"),
 87                port=CONFIG.get_int("email.port"),
 88                username=CONFIG.get("email.username"),
 89                password=CONFIG.get("email.password"),
 90                use_tls=CONFIG.get_bool("email.use_tls", False),
 91                use_ssl=CONFIG.get_bool("email.use_ssl", False),
 92                timeout=CONFIG.get_int("email.timeout"),
 93            )
 94        return self.backend_class(
 95            host=self.host,
 96            port=self.port,
 97            username=self.username,
 98            password=self.password,
 99            use_tls=self.use_tls,
100            use_ssl=self.use_ssl,
101            timeout=self.timeout,
102        )
103
104    def send(self, device: EmailDevice):
105        # Lazy import here to avoid circular import
106        from authentik.stages.email.tasks import send_mails
107
108        # Compose the message using templates
109        message = device._compose_email()
110        return send_mails(device.stage, message)
111
112    def __str__(self):
113        return f"Email Authenticator Stage {self.name}"
114
115    class Meta:
116        verbose_name = _("Email Authenticator Setup Stage")
117        verbose_name_plural = _("Email Authenticator Setup Stages")

Setup Email-based authentication for the user.

def use_global_settings(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def host(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def port(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def username(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def password(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def use_tls(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def use_ssl(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def timeout(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def from_address(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def token_expiry(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def subject(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def template(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

serializer: type[rest_framework.serializers.BaseSerializer]
51    @property
52    def serializer(self) -> type[BaseSerializer]:
53        from authentik.stages.authenticator_email.api import AuthenticatorEmailStageSerializer
54
55        return AuthenticatorEmailStageSerializer

Get serializer for this model

view: type[django.views.generic.base.View]
57    @property
58    def view(self) -> type[View]:
59        from authentik.stages.authenticator_email.stage import AuthenticatorEmailStageView
60
61        return AuthenticatorEmailStageView

Return StageView class that implements logic for this stage

component: str
63    @property
64    def component(self) -> str:
65        return "ak-stage-authenticator-email-form"

Return component used to edit this object

def ui_user_settings(self) -> authentik.core.types.UserSettingSerializer | None:
67    def ui_user_settings(self) -> UserSettingSerializer | None:
68        return UserSettingSerializer(
69            data={
70                "title": self.friendly_name or str(self._meta.verbose_name),
71                "component": "ak-user-settings-authenticator-email",
72            }
73        )

Entrypoint to integrate with User settings. Can either return None if no user settings are available, or a challenge.

backend_class: type[django.core.mail.backends.base.BaseEmailBackend]
75    @property
76    def backend_class(self) -> type[BaseEmailBackend]:
77        """Get the email backend class to use"""
78        return EmailBackend

Get the email backend class to use

backend: django.core.mail.backends.base.BaseEmailBackend
 80    @property
 81    def backend(self) -> BaseEmailBackend:
 82        """Get fully configured Email Backend instance"""
 83        if self.use_global_settings:
 84            CONFIG.refresh("email.password")
 85            return self.backend_class(
 86                host=CONFIG.get("email.host"),
 87                port=CONFIG.get_int("email.port"),
 88                username=CONFIG.get("email.username"),
 89                password=CONFIG.get("email.password"),
 90                use_tls=CONFIG.get_bool("email.use_tls", False),
 91                use_ssl=CONFIG.get_bool("email.use_ssl", False),
 92                timeout=CONFIG.get_int("email.timeout"),
 93            )
 94        return self.backend_class(
 95            host=self.host,
 96            port=self.port,
 97            username=self.username,
 98            password=self.password,
 99            use_tls=self.use_tls,
100            use_ssl=self.use_ssl,
101            timeout=self.timeout,
102        )

Get fully configured Email Backend instance

def send( self, device: EmailDevice):
104    def send(self, device: EmailDevice):
105        # Lazy import here to avoid circular import
106        from authentik.stages.email.tasks import send_mails
107
108        # Compose the message using templates
109        message = device._compose_email()
110        return send_mails(device.stage, message)
configure_flow

Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.

In the example::

class Child(Model):
    parent = ForeignKey(Parent, related_name='children')

Child.parent is a ForwardManyToOneDescriptor instance.

def friendly_name(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

configure_flow_id
stage_ptr_id
stage_ptr

Accessor to the related object on the forward side of a one-to-one relation.

In the example::

class Restaurant(Model):
    place = OneToOneField(Place, related_name='restaurant')

Restaurant.place is a ForwardOneToOneDescriptor instance.

emaildevice_set

Accessor to the related objects manager on the reverse side of a many-to-one relation.

In the example::

class Child(Model):
    parent = ForeignKey(Parent, related_name='children')

Parent.children is a ReverseManyToOneDescriptor instance.

Most of the implementation is delegated to a dynamically defined manager class built by create_forward_many_to_many_manager() defined below.

class AuthenticatorEmailStage.DoesNotExist(authentik.flows.models.Stage.DoesNotExist):

The requested object does not exist

class AuthenticatorEmailStage.MultipleObjectsReturned(authentik.flows.models.Stage.MultipleObjectsReturned):

The query returned multiple objects when only one was expected.

120class EmailDevice(SerializerModel, ThrottlingMixin, SideChannelDevice):
121    """Email Device"""
122
123    user = models.ForeignKey(get_user_model(), on_delete=models.CASCADE)
124    email = models.EmailField()
125    stage = models.ForeignKey(AuthenticatorEmailStage, on_delete=models.CASCADE)
126    last_used = models.DateTimeField(auto_now=True)
127
128    @property
129    def serializer(self) -> type[BaseSerializer]:
130        from authentik.stages.authenticator_email.api import EmailDeviceSerializer
131
132        return EmailDeviceSerializer
133
134    def verify_token(self, token: str) -> bool:
135        verify_allowed, _ = self.verify_is_allowed()
136        if verify_allowed:
137            verified = super().verify_token(token)
138
139            if verified:
140                self.throttle_reset()
141            else:
142                self.throttle_increment()
143        else:
144            verified = False
145
146        return verified
147
148    def _compose_email(self) -> TemplateEmailMessage:
149        try:
150            pending_user = self.user
151            stage = self.stage
152            email = self.email
153
154            message = TemplateEmailMessage(
155                subject=_(stage.subject),
156                to=[(pending_user.name, email)],
157                template_name=stage.template,
158                template_context={
159                    "user": pending_user,
160                    "expires": self.valid_until,
161                    "token": self.token,
162                },
163            )
164            return message
165        except TemplateSyntaxError as exc:
166            Event.new(
167                EventAction.CONFIGURATION_ERROR,
168                message=_("Exception occurred while rendering E-mail template"),
169                template=stage.template,
170            ).with_exception(exc).from_http(self.request)
171            raise StageInvalidException from exc
172
173    def __str__(self):
174        if not self.pk:
175            return "New Email Device"
176        return f"Email Device for {self.user_id}"
177
178    class Meta:
179        verbose_name = _("Email Device")
180        verbose_name_plural = _("Email Devices")
181        unique_together = (("user", "email"),)

Email Device

user

Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.

In the example::

class Child(Model):
    parent = ForeignKey(Parent, related_name='children')

Child.parent is a ForwardManyToOneDescriptor instance.

def email(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

stage

Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.

In the example::

class Child(Model):
    parent = ForeignKey(Parent, related_name='children')

Child.parent is a ForwardManyToOneDescriptor instance.

def last_used(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

serializer: type[rest_framework.serializers.BaseSerializer]
128    @property
129    def serializer(self) -> type[BaseSerializer]:
130        from authentik.stages.authenticator_email.api import EmailDeviceSerializer
131
132        return EmailDeviceSerializer

Get serializer for this model

def verify_token(self, token: str) -> bool:
134    def verify_token(self, token: str) -> bool:
135        verify_allowed, _ = self.verify_is_allowed()
136        if verify_allowed:
137            verified = super().verify_token(token)
138
139            if verified:
140                self.throttle_reset()
141            else:
142                self.throttle_increment()
143        else:
144            verified = False
145
146        return verified

Verifies a token by content and expiry.

On success, the token is cleared and the device saved.

:param str token: The OTP token provided by the user. :rtype: bool

def throttling_failure_timestamp(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def throttling_failure_count(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def token(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def valid_until(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def name(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def confirmed(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def created(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def last_updated(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

user_id
stage_id
def get_next_by_last_used(unknown):

Method descriptor with partial application of the given arguments and keywords.

Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.

def get_previous_by_last_used(unknown):

Method descriptor with partial application of the given arguments and keywords.

Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.

def get_next_by_created(unknown):

Method descriptor with partial application of the given arguments and keywords.

Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.

def get_previous_by_created(unknown):

Method descriptor with partial application of the given arguments and keywords.

Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.

def get_next_by_last_updated(unknown):

Method descriptor with partial application of the given arguments and keywords.

Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.

def get_previous_by_last_updated(unknown):

Method descriptor with partial application of the given arguments and keywords.

Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.

def get_next_by_valid_until(unknown):

Method descriptor with partial application of the given arguments and keywords.

Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.

def get_previous_by_valid_until(unknown):

Method descriptor with partial application of the given arguments and keywords.

Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.

def id(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

class EmailDevice.DoesNotExist(django.core.exceptions.ObjectDoesNotExist):

The requested object does not exist

class EmailDevice.MultipleObjectsReturned(django.core.exceptions.MultipleObjectsReturned):

The query returned multiple objects when only one was expected.