authentik.events.models

authentik events models

  1"""authentik events models"""
  2
  3from collections.abc import Generator
  4from datetime import timedelta
  5from difflib import get_close_matches
  6from functools import lru_cache
  7from inspect import currentframe
  8from typing import Any
  9from uuid import uuid4
 10
 11from channels.layers import get_channel_layer
 12from django.apps import apps
 13from django.db import models
 14from django.db.models import Q
 15from django.http import HttpRequest
 16from django.http.request import QueryDict
 17from django.utils.timezone import now
 18from django.utils.translation import gettext as _
 19from requests import RequestException
 20from rest_framework.serializers import Serializer
 21from structlog.stdlib import get_logger
 22
 23from authentik import authentik_full_version
 24from authentik.brands.models import Brand
 25from authentik.brands.utils import DEFAULT_BRAND
 26from authentik.core.middleware import (
 27    SESSION_KEY_IMPERSONATE_ORIGINAL_USER,
 28    SESSION_KEY_IMPERSONATE_USER,
 29)
 30from authentik.core.models import ExpiringModel, Group, PropertyMapping, User
 31from authentik.crypto.models import CertificateKeyPair
 32from authentik.events.context_processors.base import get_context_processors
 33from authentik.events.utils import (
 34    cleanse_dict,
 35    get_user,
 36    model_to_dict,
 37    sanitize_dict,
 38    sanitize_item,
 39)
 40from authentik.lib.models import DomainlessURLValidator, SerializerModel
 41from authentik.lib.sentry import SentryIgnoredException
 42from authentik.lib.utils.errors import exception_to_dict
 43from authentik.lib.utils.http import get_http_session
 44from authentik.lib.utils.time import timedelta_from_string
 45from authentik.outposts.docker_tls import DockerInlineTLS
 46from authentik.policies.models import PolicyBindingModel
 47from authentik.root.middleware import ClientIPMiddleware
 48from authentik.root.ws.consumer import build_user_group
 49from authentik.stages.email.models import EmailTemplates
 50from authentik.stages.email.utils import TemplateEmailMessage
 51from authentik.tasks.models import TasksModel
 52from authentik.tenants.models import Tenant
 53from authentik.tenants.utils import get_current_tenant
 54
 55LOGGER = get_logger()
 56DISCORD_FIELD_LIMIT = 25
 57NOTIFICATION_SUMMARY_LENGTH = 75
 58
 59
 60def default_event_duration():
 61    """Default duration an Event is saved.
 62    This is used as a fallback when no brand is available"""
 63    try:
 64        tenant = get_current_tenant(only=["event_retention"])
 65        return now() + timedelta_from_string(tenant.event_retention)
 66    except Tenant.DoesNotExist:
 67        return now() + timedelta(days=365)
 68
 69
 70def default_brand():
 71    """Get a default value for brand"""
 72    return sanitize_dict(model_to_dict(DEFAULT_BRAND))
 73
 74
 75@lru_cache
 76def django_app_names() -> list[str]:
 77    """Get a cached list of all django apps' names (not labels)"""
 78    return [x.name for x in apps.app_configs.values()]
 79
 80
 81class NotificationTransportError(SentryIgnoredException):
 82    """Error raised when a notification fails to be delivered"""
 83
 84
 85class EventAction(models.TextChoices):
 86    """All possible actions to save into the events log"""
 87
 88    LOGIN = "login"
 89    LOGIN_FAILED = "login_failed"
 90    LOGOUT = "logout"
 91
 92    USER_WRITE = "user_write"
 93    SUSPICIOUS_REQUEST = "suspicious_request"
 94    PASSWORD_SET = "password_set"  # noqa # nosec
 95
 96    SECRET_VIEW = "secret_view"  # noqa # nosec
 97    SECRET_ROTATE = "secret_rotate"  # noqa # nosec
 98
 99    INVITE_USED = "invitation_used"
100
101    AUTHORIZE_APPLICATION = "authorize_application"
102    SOURCE_LINKED = "source_linked"
103
104    IMPERSONATION_STARTED = "impersonation_started"
105    IMPERSONATION_ENDED = "impersonation_ended"
106
107    FLOW_EXECUTION = "flow_execution"
108    POLICY_EXECUTION = "policy_execution"
109    POLICY_EXCEPTION = "policy_exception"
110    PROPERTY_MAPPING_EXCEPTION = "property_mapping_exception"
111
112    SYSTEM_TASK_EXECUTION = "system_task_execution"
113    SYSTEM_TASK_EXCEPTION = "system_task_exception"
114    SYSTEM_EXCEPTION = "system_exception"
115
116    CONFIGURATION_ERROR = "configuration_error"
117    CONFIGURATION_WARNING = "configuration_warning"
118
119    MODEL_CREATED = "model_created"
120    MODEL_UPDATED = "model_updated"
121    MODEL_DELETED = "model_deleted"
122    EMAIL_SENT = "email_sent"
123
124    UPDATE_AVAILABLE = "update_available"
125
126    EXPORT_READY = "export_ready"
127
128    REVIEW_INITIATED = "review_initiated"
129    REVIEW_OVERDUE = "review_overdue"
130    REVIEW_ATTESTED = "review_attested"
131    REVIEW_COMPLETED = "review_completed"
132
133    CUSTOM_PREFIX = "custom_"
134
135
136class Event(SerializerModel, ExpiringModel):
137    """An individual Audit/Metrics/Notification/Error Event"""
138
139    event_uuid = models.UUIDField(primary_key=True, editable=False, default=uuid4)
140    user = models.JSONField(default=dict)
141    action = models.TextField(choices=EventAction.choices)
142    app = models.TextField()
143    context = models.JSONField(default=dict, blank=True)
144    client_ip = models.GenericIPAddressField(null=True)
145    created = models.DateTimeField(auto_now_add=True)
146    brand = models.JSONField(default=default_brand, blank=True)
147
148    # Shadow the expires attribute from ExpiringModel to override the default duration
149    expires = models.DateTimeField(default=default_event_duration)
150
151    @staticmethod
152    def _get_app_from_request(request: HttpRequest) -> str:
153        if not isinstance(request, HttpRequest):
154            return ""
155        return request.resolver_match.app_name
156
157    @staticmethod
158    def new(
159        action: str | EventAction,
160        app: str | None = None,
161        **kwargs,
162    ) -> Event:
163        """Create new Event instance from arguments. Instance is NOT saved."""
164        if not isinstance(action, EventAction):
165            action = EventAction.CUSTOM_PREFIX + action
166        if not app:
167            current = currentframe()
168            parent = current.f_back
169            app = parent.f_globals["__name__"]
170            # Attempt to match the calling module to the django app it belongs to
171            # if we can't find a match, keep the module name
172            django_apps: list[str] = get_close_matches(app, django_app_names(), n=1)
173            # Also ensure that closest django app has the correct prefix
174            if len(django_apps) > 0 and django_apps[0].startswith(app):
175                app = django_apps[0]
176        cleaned_kwargs = cleanse_dict(sanitize_dict(kwargs))
177        event = Event(action=action, app=app, context=cleaned_kwargs)
178        return event
179
180    def with_exception(self, exc: Exception) -> Event:
181        """Add data from 'exc' to the event in a database-saveable format"""
182        self.context.setdefault("message", str(exc))
183        self.context["exception"] = exception_to_dict(exc)
184        return self
185
186    def set_user(self, user: User) -> Event:
187        """Set `.user` based on user, ensuring the correct attributes are copied.
188        This should only be used when self.from_http is *not* used."""
189        self.user = get_user(user)
190        return self
191
192    def from_http(self, request: HttpRequest, user: User | None = None) -> Event:
193        """Add data from a Django-HttpRequest, allowing the creation of
194        Events independently from requests.
195        `user` arguments optionally overrides user from requests."""
196        if request:
197            from authentik.flows.views.executor import QS_QUERY
198
199            self.context["http_request"] = {
200                "path": request.path,
201                "method": request.method,
202                "args": cleanse_dict(QueryDict(request.META.get("QUERY_STRING", ""))),
203                "user_agent": request.META.get("HTTP_USER_AGENT", ""),
204            }
205            if hasattr(request, "request_id"):
206                self.context["http_request"]["request_id"] = request.request_id
207            # Special case for events created during flow execution
208            # since they keep the http query within a wrapped query
209            if QS_QUERY in self.context["http_request"]["args"]:
210                wrapped = self.context["http_request"]["args"][QS_QUERY]
211                self.context["http_request"]["args"] = cleanse_dict(QueryDict(wrapped))
212        if hasattr(request, "brand"):
213            brand: Brand = request.brand
214            self.brand = sanitize_dict(model_to_dict(brand))
215        if hasattr(request, "user"):
216            self.user = get_user(request.user)
217        if user:
218            self.user = get_user(user)
219        if hasattr(request, "session"):
220            from authentik.flows.views.executor import SESSION_KEY_PLAN
221
222            # Check if we're currently impersonating, and add that user
223            if SESSION_KEY_IMPERSONATE_ORIGINAL_USER in request.session:
224                self.user = get_user(request.session[SESSION_KEY_IMPERSONATE_ORIGINAL_USER])
225                self.user["on_behalf_of"] = get_user(request.session[SESSION_KEY_IMPERSONATE_USER])
226            # Special case for events that happen during a flow, the user might not be authenticated
227            # yet but is a pending user instead
228            if SESSION_KEY_PLAN in request.session:
229                from authentik.flows.planner import PLAN_CONTEXT_PENDING_USER, FlowPlan
230
231                plan: FlowPlan = request.session[SESSION_KEY_PLAN]
232                pending_user = plan.context.get(PLAN_CONTEXT_PENDING_USER, None)
233                # Only save `authenticated_as` if there's a different pending user in the flow
234                # than the user that is authenticated
235                if pending_user and (
236                    (pending_user.pk and pending_user.pk != self.user.get("pk"))
237                    or (not pending_user.pk)
238                ):
239                    orig_user = self.user.copy()
240
241                    self.user = {"authenticated_as": orig_user, **get_user(pending_user)}
242        # User 255.255.255.255 as fallback if IP cannot be determined
243        self.client_ip = ClientIPMiddleware.get_client_ip(request)
244        # Enrich event data
245        for processor in get_context_processors():
246            processor.enrich_event(self)
247        # If there's no app set, we get it from the requests too
248        if not self.app:
249            self.app = Event._get_app_from_request(request)
250        self.save()
251        return self
252
253    @staticmethod
254    def log_deprecation(
255        identifier: str, message: str, cause: str | None = None, expiry_days=30, **kwargs
256    ):
257        query = Q(
258            action=EventAction.CONFIGURATION_WARNING,
259            context__deprecation=identifier,
260        )
261        cause = str(cause)
262        if cause:
263            query &= Q(context__cause=cause)
264        if Event.objects.filter(query).exists():
265            return
266        event = Event.new(
267            EventAction.CONFIGURATION_WARNING,
268            deprecation=identifier,
269            message=message,
270            cause=cause,
271            **kwargs,
272        )
273        event.expires = now() + timedelta(days=expiry_days)
274        event.save()
275
276    def save(self, *args, **kwargs):
277        if self._state.adding:
278            LOGGER.info(
279                "Created Event",
280                action=self.action,
281                context=self.context,
282                client_ip=self.client_ip,
283                user=self.user,
284            )
285        super().save(*args, **kwargs)
286
287    @property
288    def serializer(self) -> type[Serializer]:
289        from authentik.events.api.events import EventSerializer
290
291        return EventSerializer
292
293    @property
294    def summary(self) -> str:
295        """Return a summary of this event."""
296        if "message" in self.context:
297            return self.context["message"]
298        return f"{self.action}: {self.context}"
299
300    @property
301    def hyperlink(self) -> str | None:
302        return self.context.get("hyperlink")
303
304    @property
305    def hyperlink_label(self) -> str | None:
306        return self.context.get("hyperlink_label")
307
308    def __str__(self) -> str:
309        return f"Event action={self.action} user={self.user} context={self.context}"
310
311    class Meta:
312        verbose_name = _("Event")
313        verbose_name_plural = _("Events")
314        indexes = ExpiringModel.Meta.indexes + [
315            models.Index(fields=["action"]),
316            models.Index(fields=["user"]),
317            models.Index(fields=["app"]),
318            models.Index(fields=["created"]),
319            models.Index(fields=["client_ip"]),
320            models.Index(
321                models.F("context__authorized_application"),
322                name="authentik_e_ctx_app__idx",
323            ),
324            models.Index(
325                models.F("user__pk"),
326                name="authentik_e_user_pk__idx",
327            ),
328        ]
329
330
331class TransportMode(models.TextChoices):
332    """Modes that a notification transport can send a notification"""
333
334    LOCAL = "local", _("authentik inbuilt notifications")
335    WEBHOOK = "webhook", _("Generic Webhook")
336    WEBHOOK_SLACK = "webhook_slack", _("Slack Webhook (Slack/Discord)")
337    EMAIL = "email", _("Email")
338
339
340class NotificationTransport(TasksModel, SerializerModel):
341    """Action which is executed when a Rule matches"""
342
343    uuid = models.UUIDField(primary_key=True, editable=False, default=uuid4)
344
345    name = models.TextField(unique=True)
346    mode = models.TextField(choices=TransportMode.choices, default=TransportMode.LOCAL)
347    send_once = models.BooleanField(
348        default=False,
349        help_text=_(
350            "Only send notification once, for example when sending a webhook into a chat channel."
351        ),
352    )
353
354    email_subject_prefix = models.TextField(default="authentik Notification: ", blank=True)
355    email_template = models.TextField(default=EmailTemplates.EVENT_NOTIFICATION)
356
357    webhook_url = models.TextField(blank=True, validators=[DomainlessURLValidator()])
358    webhook_ca = models.ForeignKey(
359        CertificateKeyPair,
360        null=True,
361        default=None,
362        on_delete=models.SET_DEFAULT,
363        help_text=_(
364            "When set, the selected certificate is used to "
365            "validate the certificate of the webhook server."
366        ),
367    )
368    webhook_mapping_body = models.ForeignKey(
369        "NotificationWebhookMapping",
370        on_delete=models.SET_DEFAULT,
371        null=True,
372        default=None,
373        related_name="+",
374        help_text=_(
375            "Customize the body of the request. "
376            "Mapping should return data that is JSON-serializable."
377        ),
378    )
379    webhook_mapping_headers = models.ForeignKey(
380        "NotificationWebhookMapping",
381        on_delete=models.SET_DEFAULT,
382        null=True,
383        default=None,
384        related_name="+",
385        help_text=_(
386            "Configure additional headers to be sent. "
387            "Mapping should return a dictionary of key-value pairs"
388        ),
389    )
390
391    def send(self, notification: Notification) -> list[str]:
392        """Send notification to user, called from async task"""
393        if self.mode == TransportMode.LOCAL:
394            return self.send_local(notification)
395        if self.mode == TransportMode.WEBHOOK:
396            return self.send_webhook(notification)
397        if self.mode == TransportMode.WEBHOOK_SLACK:
398            return self.send_webhook_slack(notification)
399        if self.mode == TransportMode.EMAIL:
400            return self.send_email(notification)
401        raise ValueError(f"Invalid mode {self.mode} set")
402
403    def send_local(self, notification: Notification) -> list[str]:
404        """Local notification delivery"""
405        if self.webhook_mapping_body:
406            self.webhook_mapping_body.evaluate(
407                user=notification.user,
408                request=None,
409                notification=notification,
410            )
411        notification.save()
412        layer = get_channel_layer()
413        layer.group_send_blocking(
414            build_user_group(notification.user),
415            {
416                "type": "event.notification",
417                "id": str(notification.pk),
418                "data": notification.serializer(notification).data,
419            },
420        )
421        return []
422
423    def send_webhook(self, notification: Notification) -> list[str]:
424        """Send notification to generic webhook"""
425        default_body = {
426            "body": notification.body,
427            "severity": notification.severity,
428            "user_email": notification.user.email,
429            "user_username": notification.user.username,
430        }
431        if notification.event and notification.event.user:
432            default_body["event_user_email"] = notification.event.user.get("email", None)
433            default_body["event_user_username"] = notification.event.user.get("username", None)
434        headers = {}
435        if self.webhook_mapping_body:
436            default_body = sanitize_item(
437                self.webhook_mapping_body.evaluate(
438                    user=notification.user,
439                    request=None,
440                    notification=notification,
441                )
442            )
443        if self.webhook_mapping_headers:
444            headers = sanitize_item(
445                self.webhook_mapping_headers.evaluate(
446                    user=notification.user,
447                    request=None,
448                    notification=notification,
449                )
450            )
451
452        def send(**kwargs):
453            try:
454                response = get_http_session().post(
455                    self.webhook_url,
456                    json=default_body,
457                    headers=headers,
458                    **kwargs,
459                )
460                response.raise_for_status()
461            except RequestException as exc:
462                raise NotificationTransportError(
463                    exc.response.text if exc.response is not None else str(exc)
464                ) from exc
465            return [
466                response.status_code,
467                response.text,
468            ]
469
470        if self.webhook_ca:
471            with DockerInlineTLS(self.webhook_ca, authentication_kp=None) as tls:
472                return send(verify=tls.ca_cert)
473        return send()
474
475    def send_webhook_slack(self, notification: Notification) -> list[str]:
476        """Send notification to slack or slack-compatible endpoints"""
477        fields = [
478            {
479                "title": _("Severity"),
480                "value": notification.severity,
481                "short": True,
482            },
483            {
484                "title": _("Dispatched for user"),
485                "value": str(notification.user),
486                "short": True,
487            },
488        ]
489        if notification.event:
490            if notification.event.user:
491                fields.append(
492                    {
493                        "title": _("Event user"),
494                        "value": str(notification.event.user.get("username")),
495                        "short": True,
496                    },
497                )
498            for key, value in notification.event.context.items():
499                if not isinstance(value, str):
500                    continue
501                # https://birdie0.github.io/discord-webhooks-guide/other/field_limits.html
502                if len(fields) >= DISCORD_FIELD_LIMIT:
503                    continue
504                fields.append({"title": key[:256], "value": value[:1024]})
505        body = {
506            "username": "authentik",
507            "icon_url": "https://goauthentik.io/img/icon.png",
508            "attachments": [
509                {
510                    "author_name": "authentik",
511                    "author_link": "https://goauthentik.io",
512                    "author_icon": "https://goauthentik.io/img/icon.png",
513                    "title": notification.body,
514                    "color": "#fd4b2d",
515                    "fields": fields,
516                    "footer": f"authentik {authentik_full_version()}",
517                }
518            ],
519        }
520        if notification.event:
521            body["attachments"][0]["title"] = notification.event.action
522        try:
523            response = get_http_session().post(self.webhook_url, json=body)
524            response.raise_for_status()
525        except RequestException as exc:
526            text = exc.response.text if exc.response is not None else str(exc)
527            raise NotificationTransportError(text) from exc
528        return [
529            response.status_code,
530            response.text,
531        ]
532
533    def send_email(self, notification: Notification) -> list[str]:
534        """Send notification via global email configuration"""
535        from authentik.stages.email.tasks import send_mail
536
537        if notification.user.email.strip() == "":
538            LOGGER.info(
539                "Discarding notification as user has no email address",
540                user=notification.user,
541                notification=notification,
542            )
543            return None
544        context = {
545            "key_value": {
546                "user_email": notification.user.email,
547                "user_username": notification.user.username,
548            },
549            "body": notification.body,
550            "title": "",
551        }
552        if notification.event and notification.event.user:
553            context["key_value"]["event_user_email"] = notification.event.user.get("email", None)
554            context["key_value"]["event_user_username"] = notification.event.user.get(
555                "username", None
556            )
557        if notification.hyperlink:
558            context["link"] = {
559                "target": notification.hyperlink,
560                "label": notification.hyperlink_label,
561            }
562        if notification.event:
563            context["title"] += notification.event.action
564            for key, value in notification.event.context.items():
565                if not isinstance(value, str):
566                    continue
567                context["key_value"][key] = value
568        else:
569            context["title"] += notification.body[:NOTIFICATION_SUMMARY_LENGTH]
570        # TODO: improve permission check
571        if notification.user.is_superuser:
572            context["source"] = {
573                "from": self.name,
574            }
575        mail = TemplateEmailMessage(
576            subject=self.email_subject_prefix + context["title"],
577            to=[(notification.user.name, notification.user.email)],
578            language=notification.user.locale(),
579            template_name=self.email_template,
580            template_context=context,
581        )
582        send_mail.send_with_options(args=(mail.__dict__,), rel_obj=self)
583        return []
584
585    @property
586    def serializer(self) -> type[Serializer]:
587        from authentik.events.api.notification_transports import (
588            NotificationTransportSerializer,
589        )
590
591        return NotificationTransportSerializer
592
593    def __str__(self) -> str:
594        return f"Notification Transport {self.name}"
595
596    class Meta:
597        verbose_name = _("Notification Transport")
598        verbose_name_plural = _("Notification Transports")
599
600
601class NotificationSeverity(models.TextChoices):
602    """Severity images that a notification can have"""
603
604    NOTICE = "notice", _("Notice")
605    WARNING = "warning", _("Warning")
606    ALERT = "alert", _("Alert")
607
608
609class Notification(SerializerModel):
610    """Event Notification"""
611
612    uuid = models.UUIDField(primary_key=True, editable=False, default=uuid4)
613    severity = models.TextField(choices=NotificationSeverity.choices)
614    body = models.TextField()
615    hyperlink = models.TextField(blank=True, null=True, max_length=4096)
616    hyperlink_label = models.TextField(blank=True, null=True)
617    created = models.DateTimeField(auto_now_add=True)
618    event = models.ForeignKey(Event, on_delete=models.SET_NULL, null=True, blank=True)
619    seen = models.BooleanField(default=False)
620    user = models.ForeignKey(User, on_delete=models.CASCADE)
621
622    @property
623    def serializer(self) -> type[Serializer]:
624        from authentik.events.api.notifications import NotificationSerializer
625
626        return NotificationSerializer
627
628    def __str__(self) -> str:
629        body_trunc = (
630            (self.body[:NOTIFICATION_SUMMARY_LENGTH] + "..")
631            if len(self.body) > NOTIFICATION_SUMMARY_LENGTH
632            else self.body
633        )
634        return f"Notification for user {self.user_id}: {body_trunc}"
635
636    class Meta:
637        verbose_name = _("Notification")
638        verbose_name_plural = _("Notifications")
639
640
641class NotificationRule(TasksModel, SerializerModel, PolicyBindingModel):
642    """Decide when to create a Notification based on policies attached to this object."""
643
644    name = models.TextField(unique=True)
645    transports = models.ManyToManyField(
646        NotificationTransport,
647        help_text=_(
648            "Select which transports should be used to notify the user. If none are "
649            "selected, the notification will only be shown in the authentik UI."
650        ),
651        blank=True,
652    )
653    severity = models.TextField(
654        choices=NotificationSeverity.choices,
655        default=NotificationSeverity.NOTICE,
656        help_text=_("Controls which severity level the created notifications will have."),
657    )
658    destination_group = models.ForeignKey(
659        Group,
660        help_text=_(
661            "Define which group of users this notification should be sent and shown to. "
662            "If left empty, Notification won't ben sent."
663        ),
664        null=True,
665        blank=True,
666        on_delete=models.SET_NULL,
667    )
668    destination_event_user = models.BooleanField(
669        default=False,
670        help_text=_(
671            "When enabled, notification will be sent to user the user that triggered the event."
672            "When destination_group is configured, notification is sent to both."
673        ),
674    )
675
676    def destination_users(self, event: Event) -> Generator[User, Any]:
677        if self.destination_event_user and event.user.get("pk"):
678            yield User(pk=event.user.get("pk"))
679        if self.destination_group:
680            yield from self.destination_group.users.all()
681
682    @property
683    def serializer(self) -> type[Serializer]:
684        from authentik.events.api.notification_rules import NotificationRuleSerializer
685
686        return NotificationRuleSerializer
687
688    def __str__(self) -> str:
689        return f"Notification Rule {self.name}"
690
691    class Meta:
692        verbose_name = _("Notification Rule")
693        verbose_name_plural = _("Notification Rules")
694
695
696class NotificationWebhookMapping(PropertyMapping):
697    """Modify the payload of outgoing webhook requests"""
698
699    @property
700    def component(self) -> str:
701        return "ak-property-mapping-notification-form"
702
703    @property
704    def serializer(self) -> type[type[Serializer]]:
705        from authentik.events.api.notification_mappings import (
706            NotificationWebhookMappingSerializer,
707        )
708
709        return NotificationWebhookMappingSerializer
710
711    def __str__(self):
712        return f"Webhook Mapping {self.name}"
713
714    class Meta:
715        verbose_name = _("Webhook Mapping")
716        verbose_name_plural = _("Webhook Mappings")
LOGGER = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
DISCORD_FIELD_LIMIT = 25
NOTIFICATION_SUMMARY_LENGTH = 75
def default_event_duration():
61def default_event_duration():
62    """Default duration an Event is saved.
63    This is used as a fallback when no brand is available"""
64    try:
65        tenant = get_current_tenant(only=["event_retention"])
66        return now() + timedelta_from_string(tenant.event_retention)
67    except Tenant.DoesNotExist:
68        return now() + timedelta(days=365)

Default duration an Event is saved. This is used as a fallback when no brand is available

def default_brand():
71def default_brand():
72    """Get a default value for brand"""
73    return sanitize_dict(model_to_dict(DEFAULT_BRAND))

Get a default value for brand

@lru_cache
def django_app_names() -> list[str]:
76@lru_cache
77def django_app_names() -> list[str]:
78    """Get a cached list of all django apps' names (not labels)"""
79    return [x.name for x in apps.app_configs.values()]

Get a cached list of all django apps' names (not labels)

class NotificationTransportError(authentik.lib.sentry.SentryIgnoredException):
82class NotificationTransportError(SentryIgnoredException):
83    """Error raised when a notification fails to be delivered"""

Error raised when a notification fails to be delivered

class EventAction(django.db.models.enums.TextChoices):
 86class EventAction(models.TextChoices):
 87    """All possible actions to save into the events log"""
 88
 89    LOGIN = "login"
 90    LOGIN_FAILED = "login_failed"
 91    LOGOUT = "logout"
 92
 93    USER_WRITE = "user_write"
 94    SUSPICIOUS_REQUEST = "suspicious_request"
 95    PASSWORD_SET = "password_set"  # noqa # nosec
 96
 97    SECRET_VIEW = "secret_view"  # noqa # nosec
 98    SECRET_ROTATE = "secret_rotate"  # noqa # nosec
 99
100    INVITE_USED = "invitation_used"
101
102    AUTHORIZE_APPLICATION = "authorize_application"
103    SOURCE_LINKED = "source_linked"
104
105    IMPERSONATION_STARTED = "impersonation_started"
106    IMPERSONATION_ENDED = "impersonation_ended"
107
108    FLOW_EXECUTION = "flow_execution"
109    POLICY_EXECUTION = "policy_execution"
110    POLICY_EXCEPTION = "policy_exception"
111    PROPERTY_MAPPING_EXCEPTION = "property_mapping_exception"
112
113    SYSTEM_TASK_EXECUTION = "system_task_execution"
114    SYSTEM_TASK_EXCEPTION = "system_task_exception"
115    SYSTEM_EXCEPTION = "system_exception"
116
117    CONFIGURATION_ERROR = "configuration_error"
118    CONFIGURATION_WARNING = "configuration_warning"
119
120    MODEL_CREATED = "model_created"
121    MODEL_UPDATED = "model_updated"
122    MODEL_DELETED = "model_deleted"
123    EMAIL_SENT = "email_sent"
124
125    UPDATE_AVAILABLE = "update_available"
126
127    EXPORT_READY = "export_ready"
128
129    REVIEW_INITIATED = "review_initiated"
130    REVIEW_OVERDUE = "review_overdue"
131    REVIEW_ATTESTED = "review_attested"
132    REVIEW_COMPLETED = "review_completed"
133
134    CUSTOM_PREFIX = "custom_"

All possible actions to save into the events log

LOGIN_FAILED = EventAction.LOGIN_FAILED
USER_WRITE = EventAction.USER_WRITE
SUSPICIOUS_REQUEST = EventAction.SUSPICIOUS_REQUEST
PASSWORD_SET = EventAction.PASSWORD_SET
SECRET_VIEW = EventAction.SECRET_VIEW
SECRET_ROTATE = EventAction.SECRET_ROTATE
INVITE_USED = EventAction.INVITE_USED
AUTHORIZE_APPLICATION = EventAction.AUTHORIZE_APPLICATION
SOURCE_LINKED = EventAction.SOURCE_LINKED
IMPERSONATION_STARTED = EventAction.IMPERSONATION_STARTED
IMPERSONATION_ENDED = EventAction.IMPERSONATION_ENDED
FLOW_EXECUTION = EventAction.FLOW_EXECUTION
POLICY_EXECUTION = EventAction.POLICY_EXECUTION
POLICY_EXCEPTION = EventAction.POLICY_EXCEPTION
PROPERTY_MAPPING_EXCEPTION = EventAction.PROPERTY_MAPPING_EXCEPTION
SYSTEM_TASK_EXECUTION = EventAction.SYSTEM_TASK_EXECUTION
SYSTEM_TASK_EXCEPTION = EventAction.SYSTEM_TASK_EXCEPTION
SYSTEM_EXCEPTION = EventAction.SYSTEM_EXCEPTION
CONFIGURATION_ERROR = EventAction.CONFIGURATION_ERROR
CONFIGURATION_WARNING = EventAction.CONFIGURATION_WARNING
MODEL_CREATED = EventAction.MODEL_CREATED
MODEL_UPDATED = EventAction.MODEL_UPDATED
MODEL_DELETED = EventAction.MODEL_DELETED
EMAIL_SENT = EventAction.EMAIL_SENT
UPDATE_AVAILABLE = EventAction.UPDATE_AVAILABLE
EXPORT_READY = EventAction.EXPORT_READY
REVIEW_INITIATED = EventAction.REVIEW_INITIATED
REVIEW_OVERDUE = EventAction.REVIEW_OVERDUE
REVIEW_ATTESTED = EventAction.REVIEW_ATTESTED
REVIEW_COMPLETED = EventAction.REVIEW_COMPLETED
CUSTOM_PREFIX = EventAction.CUSTOM_PREFIX
137class Event(SerializerModel, ExpiringModel):
138    """An individual Audit/Metrics/Notification/Error Event"""
139
140    event_uuid = models.UUIDField(primary_key=True, editable=False, default=uuid4)
141    user = models.JSONField(default=dict)
142    action = models.TextField(choices=EventAction.choices)
143    app = models.TextField()
144    context = models.JSONField(default=dict, blank=True)
145    client_ip = models.GenericIPAddressField(null=True)
146    created = models.DateTimeField(auto_now_add=True)
147    brand = models.JSONField(default=default_brand, blank=True)
148
149    # Shadow the expires attribute from ExpiringModel to override the default duration
150    expires = models.DateTimeField(default=default_event_duration)
151
152    @staticmethod
153    def _get_app_from_request(request: HttpRequest) -> str:
154        if not isinstance(request, HttpRequest):
155            return ""
156        return request.resolver_match.app_name
157
158    @staticmethod
159    def new(
160        action: str | EventAction,
161        app: str | None = None,
162        **kwargs,
163    ) -> Event:
164        """Create new Event instance from arguments. Instance is NOT saved."""
165        if not isinstance(action, EventAction):
166            action = EventAction.CUSTOM_PREFIX + action
167        if not app:
168            current = currentframe()
169            parent = current.f_back
170            app = parent.f_globals["__name__"]
171            # Attempt to match the calling module to the django app it belongs to
172            # if we can't find a match, keep the module name
173            django_apps: list[str] = get_close_matches(app, django_app_names(), n=1)
174            # Also ensure that closest django app has the correct prefix
175            if len(django_apps) > 0 and django_apps[0].startswith(app):
176                app = django_apps[0]
177        cleaned_kwargs = cleanse_dict(sanitize_dict(kwargs))
178        event = Event(action=action, app=app, context=cleaned_kwargs)
179        return event
180
181    def with_exception(self, exc: Exception) -> Event:
182        """Add data from 'exc' to the event in a database-saveable format"""
183        self.context.setdefault("message", str(exc))
184        self.context["exception"] = exception_to_dict(exc)
185        return self
186
187    def set_user(self, user: User) -> Event:
188        """Set `.user` based on user, ensuring the correct attributes are copied.
189        This should only be used when self.from_http is *not* used."""
190        self.user = get_user(user)
191        return self
192
193    def from_http(self, request: HttpRequest, user: User | None = None) -> Event:
194        """Add data from a Django-HttpRequest, allowing the creation of
195        Events independently from requests.
196        `user` arguments optionally overrides user from requests."""
197        if request:
198            from authentik.flows.views.executor import QS_QUERY
199
200            self.context["http_request"] = {
201                "path": request.path,
202                "method": request.method,
203                "args": cleanse_dict(QueryDict(request.META.get("QUERY_STRING", ""))),
204                "user_agent": request.META.get("HTTP_USER_AGENT", ""),
205            }
206            if hasattr(request, "request_id"):
207                self.context["http_request"]["request_id"] = request.request_id
208            # Special case for events created during flow execution
209            # since they keep the http query within a wrapped query
210            if QS_QUERY in self.context["http_request"]["args"]:
211                wrapped = self.context["http_request"]["args"][QS_QUERY]
212                self.context["http_request"]["args"] = cleanse_dict(QueryDict(wrapped))
213        if hasattr(request, "brand"):
214            brand: Brand = request.brand
215            self.brand = sanitize_dict(model_to_dict(brand))
216        if hasattr(request, "user"):
217            self.user = get_user(request.user)
218        if user:
219            self.user = get_user(user)
220        if hasattr(request, "session"):
221            from authentik.flows.views.executor import SESSION_KEY_PLAN
222
223            # Check if we're currently impersonating, and add that user
224            if SESSION_KEY_IMPERSONATE_ORIGINAL_USER in request.session:
225                self.user = get_user(request.session[SESSION_KEY_IMPERSONATE_ORIGINAL_USER])
226                self.user["on_behalf_of"] = get_user(request.session[SESSION_KEY_IMPERSONATE_USER])
227            # Special case for events that happen during a flow, the user might not be authenticated
228            # yet but is a pending user instead
229            if SESSION_KEY_PLAN in request.session:
230                from authentik.flows.planner import PLAN_CONTEXT_PENDING_USER, FlowPlan
231
232                plan: FlowPlan = request.session[SESSION_KEY_PLAN]
233                pending_user = plan.context.get(PLAN_CONTEXT_PENDING_USER, None)
234                # Only save `authenticated_as` if there's a different pending user in the flow
235                # than the user that is authenticated
236                if pending_user and (
237                    (pending_user.pk and pending_user.pk != self.user.get("pk"))
238                    or (not pending_user.pk)
239                ):
240                    orig_user = self.user.copy()
241
242                    self.user = {"authenticated_as": orig_user, **get_user(pending_user)}
243        # User 255.255.255.255 as fallback if IP cannot be determined
244        self.client_ip = ClientIPMiddleware.get_client_ip(request)
245        # Enrich event data
246        for processor in get_context_processors():
247            processor.enrich_event(self)
248        # If there's no app set, we get it from the requests too
249        if not self.app:
250            self.app = Event._get_app_from_request(request)
251        self.save()
252        return self
253
254    @staticmethod
255    def log_deprecation(
256        identifier: str, message: str, cause: str | None = None, expiry_days=30, **kwargs
257    ):
258        query = Q(
259            action=EventAction.CONFIGURATION_WARNING,
260            context__deprecation=identifier,
261        )
262        cause = str(cause)
263        if cause:
264            query &= Q(context__cause=cause)
265        if Event.objects.filter(query).exists():
266            return
267        event = Event.new(
268            EventAction.CONFIGURATION_WARNING,
269            deprecation=identifier,
270            message=message,
271            cause=cause,
272            **kwargs,
273        )
274        event.expires = now() + timedelta(days=expiry_days)
275        event.save()
276
277    def save(self, *args, **kwargs):
278        if self._state.adding:
279            LOGGER.info(
280                "Created Event",
281                action=self.action,
282                context=self.context,
283                client_ip=self.client_ip,
284                user=self.user,
285            )
286        super().save(*args, **kwargs)
287
288    @property
289    def serializer(self) -> type[Serializer]:
290        from authentik.events.api.events import EventSerializer
291
292        return EventSerializer
293
294    @property
295    def summary(self) -> str:
296        """Return a summary of this event."""
297        if "message" in self.context:
298            return self.context["message"]
299        return f"{self.action}: {self.context}"
300
301    @property
302    def hyperlink(self) -> str | None:
303        return self.context.get("hyperlink")
304
305    @property
306    def hyperlink_label(self) -> str | None:
307        return self.context.get("hyperlink_label")
308
309    def __str__(self) -> str:
310        return f"Event action={self.action} user={self.user} context={self.context}"
311
312    class Meta:
313        verbose_name = _("Event")
314        verbose_name_plural = _("Events")
315        indexes = ExpiringModel.Meta.indexes + [
316            models.Index(fields=["action"]),
317            models.Index(fields=["user"]),
318            models.Index(fields=["app"]),
319            models.Index(fields=["created"]),
320            models.Index(fields=["client_ip"]),
321            models.Index(
322                models.F("context__authorized_application"),
323                name="authentik_e_ctx_app__idx",
324            ),
325            models.Index(
326                models.F("user__pk"),
327                name="authentik_e_user_pk__idx",
328            ),
329        ]

An individual Audit/Metrics/Notification/Error Event

def event_uuid(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def user(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def action(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def app(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def context(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def client_ip(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def created(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def brand(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def expires(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

@staticmethod
def new( action: str | EventAction, app: str | None = None, **kwargs) -> Event:
158    @staticmethod
159    def new(
160        action: str | EventAction,
161        app: str | None = None,
162        **kwargs,
163    ) -> Event:
164        """Create new Event instance from arguments. Instance is NOT saved."""
165        if not isinstance(action, EventAction):
166            action = EventAction.CUSTOM_PREFIX + action
167        if not app:
168            current = currentframe()
169            parent = current.f_back
170            app = parent.f_globals["__name__"]
171            # Attempt to match the calling module to the django app it belongs to
172            # if we can't find a match, keep the module name
173            django_apps: list[str] = get_close_matches(app, django_app_names(), n=1)
174            # Also ensure that closest django app has the correct prefix
175            if len(django_apps) > 0 and django_apps[0].startswith(app):
176                app = django_apps[0]
177        cleaned_kwargs = cleanse_dict(sanitize_dict(kwargs))
178        event = Event(action=action, app=app, context=cleaned_kwargs)
179        return event

Create new Event instance from arguments. Instance is NOT saved.

def with_exception(self, exc: Exception) -> Event:
181    def with_exception(self, exc: Exception) -> Event:
182        """Add data from 'exc' to the event in a database-saveable format"""
183        self.context.setdefault("message", str(exc))
184        self.context["exception"] = exception_to_dict(exc)
185        return self

Add data from 'exc' to the event in a database-saveable format

def set_user(self, user: authentik.core.models.User) -> Event:
187    def set_user(self, user: User) -> Event:
188        """Set `.user` based on user, ensuring the correct attributes are copied.
189        This should only be used when self.from_http is *not* used."""
190        self.user = get_user(user)
191        return self

Set .user based on user, ensuring the correct attributes are copied. This should only be used when self.from_http is not used.

def from_http( self, request: django.http.request.HttpRequest, user: authentik.core.models.User | None = None) -> Event:
193    def from_http(self, request: HttpRequest, user: User | None = None) -> Event:
194        """Add data from a Django-HttpRequest, allowing the creation of
195        Events independently from requests.
196        `user` arguments optionally overrides user from requests."""
197        if request:
198            from authentik.flows.views.executor import QS_QUERY
199
200            self.context["http_request"] = {
201                "path": request.path,
202                "method": request.method,
203                "args": cleanse_dict(QueryDict(request.META.get("QUERY_STRING", ""))),
204                "user_agent": request.META.get("HTTP_USER_AGENT", ""),
205            }
206            if hasattr(request, "request_id"):
207                self.context["http_request"]["request_id"] = request.request_id
208            # Special case for events created during flow execution
209            # since they keep the http query within a wrapped query
210            if QS_QUERY in self.context["http_request"]["args"]:
211                wrapped = self.context["http_request"]["args"][QS_QUERY]
212                self.context["http_request"]["args"] = cleanse_dict(QueryDict(wrapped))
213        if hasattr(request, "brand"):
214            brand: Brand = request.brand
215            self.brand = sanitize_dict(model_to_dict(brand))
216        if hasattr(request, "user"):
217            self.user = get_user(request.user)
218        if user:
219            self.user = get_user(user)
220        if hasattr(request, "session"):
221            from authentik.flows.views.executor import SESSION_KEY_PLAN
222
223            # Check if we're currently impersonating, and add that user
224            if SESSION_KEY_IMPERSONATE_ORIGINAL_USER in request.session:
225                self.user = get_user(request.session[SESSION_KEY_IMPERSONATE_ORIGINAL_USER])
226                self.user["on_behalf_of"] = get_user(request.session[SESSION_KEY_IMPERSONATE_USER])
227            # Special case for events that happen during a flow, the user might not be authenticated
228            # yet but is a pending user instead
229            if SESSION_KEY_PLAN in request.session:
230                from authentik.flows.planner import PLAN_CONTEXT_PENDING_USER, FlowPlan
231
232                plan: FlowPlan = request.session[SESSION_KEY_PLAN]
233                pending_user = plan.context.get(PLAN_CONTEXT_PENDING_USER, None)
234                # Only save `authenticated_as` if there's a different pending user in the flow
235                # than the user that is authenticated
236                if pending_user and (
237                    (pending_user.pk and pending_user.pk != self.user.get("pk"))
238                    or (not pending_user.pk)
239                ):
240                    orig_user = self.user.copy()
241
242                    self.user = {"authenticated_as": orig_user, **get_user(pending_user)}
243        # User 255.255.255.255 as fallback if IP cannot be determined
244        self.client_ip = ClientIPMiddleware.get_client_ip(request)
245        # Enrich event data
246        for processor in get_context_processors():
247            processor.enrich_event(self)
248        # If there's no app set, we get it from the requests too
249        if not self.app:
250            self.app = Event._get_app_from_request(request)
251        self.save()
252        return self

Add data from a Django-HttpRequest, allowing the creation of Events independently from requests. user arguments optionally overrides user from requests.

@staticmethod
def log_deprecation( identifier: str, message: str, cause: str | None = None, expiry_days=30, **kwargs):
254    @staticmethod
255    def log_deprecation(
256        identifier: str, message: str, cause: str | None = None, expiry_days=30, **kwargs
257    ):
258        query = Q(
259            action=EventAction.CONFIGURATION_WARNING,
260            context__deprecation=identifier,
261        )
262        cause = str(cause)
263        if cause:
264            query &= Q(context__cause=cause)
265        if Event.objects.filter(query).exists():
266            return
267        event = Event.new(
268            EventAction.CONFIGURATION_WARNING,
269            deprecation=identifier,
270            message=message,
271            cause=cause,
272            **kwargs,
273        )
274        event.expires = now() + timedelta(days=expiry_days)
275        event.save()
def save(self, *args, **kwargs):
277    def save(self, *args, **kwargs):
278        if self._state.adding:
279            LOGGER.info(
280                "Created Event",
281                action=self.action,
282                context=self.context,
283                client_ip=self.client_ip,
284                user=self.user,
285            )
286        super().save(*args, **kwargs)

Save the current instance. Override this in a subclass if you want to control the saving process.

The 'force_insert' and 'force_update' parameters can be used to insist that the "save" must be an SQL insert or update (or equivalent for non-SQL backends), respectively. Normally, they should not be set.

serializer: type[rest_framework.serializers.Serializer]
288    @property
289    def serializer(self) -> type[Serializer]:
290        from authentik.events.api.events import EventSerializer
291
292        return EventSerializer

Get serializer for this model

summary: str
294    @property
295    def summary(self) -> str:
296        """Return a summary of this event."""
297        if "message" in self.context:
298            return self.context["message"]
299        return f"{self.action}: {self.context}"

Return a summary of this event.

def expiring(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def get_action_display(unknown):

Method descriptor with partial application of the given arguments and keywords.

Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.

def get_next_by_created(unknown):

Method descriptor with partial application of the given arguments and keywords.

Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.

def get_previous_by_created(unknown):

Method descriptor with partial application of the given arguments and keywords.

Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.

def get_next_by_expires(unknown):

Method descriptor with partial application of the given arguments and keywords.

Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.

def get_previous_by_expires(unknown):

Method descriptor with partial application of the given arguments and keywords.

Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.

notification_set

Accessor to the related objects manager on the reverse side of a many-to-one relation.

In the example::

class Child(Model):
    parent = ForeignKey(Parent, related_name='children')

Parent.children is a ReverseManyToOneDescriptor instance.

Most of the implementation is delegated to a dynamically defined manager class built by create_forward_many_to_many_manager() defined below.

class Event.DoesNotExist(django.core.exceptions.ObjectDoesNotExist):

The requested object does not exist

class Event.MultipleObjectsReturned(django.core.exceptions.MultipleObjectsReturned):

The query returned multiple objects when only one was expected.

class TransportMode(django.db.models.enums.TextChoices):
332class TransportMode(models.TextChoices):
333    """Modes that a notification transport can send a notification"""
334
335    LOCAL = "local", _("authentik inbuilt notifications")
336    WEBHOOK = "webhook", _("Generic Webhook")
337    WEBHOOK_SLACK = "webhook_slack", _("Slack Webhook (Slack/Discord)")
338    EMAIL = "email", _("Email")

Modes that a notification transport can send a notification

WEBHOOK_SLACK = TransportMode.WEBHOOK_SLACK
class NotificationTransport(authentik.tasks.models.TasksModel, authentik.lib.models.SerializerModel):
341class NotificationTransport(TasksModel, SerializerModel):
342    """Action which is executed when a Rule matches"""
343
344    uuid = models.UUIDField(primary_key=True, editable=False, default=uuid4)
345
346    name = models.TextField(unique=True)
347    mode = models.TextField(choices=TransportMode.choices, default=TransportMode.LOCAL)
348    send_once = models.BooleanField(
349        default=False,
350        help_text=_(
351            "Only send notification once, for example when sending a webhook into a chat channel."
352        ),
353    )
354
355    email_subject_prefix = models.TextField(default="authentik Notification: ", blank=True)
356    email_template = models.TextField(default=EmailTemplates.EVENT_NOTIFICATION)
357
358    webhook_url = models.TextField(blank=True, validators=[DomainlessURLValidator()])
359    webhook_ca = models.ForeignKey(
360        CertificateKeyPair,
361        null=True,
362        default=None,
363        on_delete=models.SET_DEFAULT,
364        help_text=_(
365            "When set, the selected certificate is used to "
366            "validate the certificate of the webhook server."
367        ),
368    )
369    webhook_mapping_body = models.ForeignKey(
370        "NotificationWebhookMapping",
371        on_delete=models.SET_DEFAULT,
372        null=True,
373        default=None,
374        related_name="+",
375        help_text=_(
376            "Customize the body of the request. "
377            "Mapping should return data that is JSON-serializable."
378        ),
379    )
380    webhook_mapping_headers = models.ForeignKey(
381        "NotificationWebhookMapping",
382        on_delete=models.SET_DEFAULT,
383        null=True,
384        default=None,
385        related_name="+",
386        help_text=_(
387            "Configure additional headers to be sent. "
388            "Mapping should return a dictionary of key-value pairs"
389        ),
390    )
391
392    def send(self, notification: Notification) -> list[str]:
393        """Send notification to user, called from async task"""
394        if self.mode == TransportMode.LOCAL:
395            return self.send_local(notification)
396        if self.mode == TransportMode.WEBHOOK:
397            return self.send_webhook(notification)
398        if self.mode == TransportMode.WEBHOOK_SLACK:
399            return self.send_webhook_slack(notification)
400        if self.mode == TransportMode.EMAIL:
401            return self.send_email(notification)
402        raise ValueError(f"Invalid mode {self.mode} set")
403
404    def send_local(self, notification: Notification) -> list[str]:
405        """Local notification delivery"""
406        if self.webhook_mapping_body:
407            self.webhook_mapping_body.evaluate(
408                user=notification.user,
409                request=None,
410                notification=notification,
411            )
412        notification.save()
413        layer = get_channel_layer()
414        layer.group_send_blocking(
415            build_user_group(notification.user),
416            {
417                "type": "event.notification",
418                "id": str(notification.pk),
419                "data": notification.serializer(notification).data,
420            },
421        )
422        return []
423
424    def send_webhook(self, notification: Notification) -> list[str]:
425        """Send notification to generic webhook"""
426        default_body = {
427            "body": notification.body,
428            "severity": notification.severity,
429            "user_email": notification.user.email,
430            "user_username": notification.user.username,
431        }
432        if notification.event and notification.event.user:
433            default_body["event_user_email"] = notification.event.user.get("email", None)
434            default_body["event_user_username"] = notification.event.user.get("username", None)
435        headers = {}
436        if self.webhook_mapping_body:
437            default_body = sanitize_item(
438                self.webhook_mapping_body.evaluate(
439                    user=notification.user,
440                    request=None,
441                    notification=notification,
442                )
443            )
444        if self.webhook_mapping_headers:
445            headers = sanitize_item(
446                self.webhook_mapping_headers.evaluate(
447                    user=notification.user,
448                    request=None,
449                    notification=notification,
450                )
451            )
452
453        def send(**kwargs):
454            try:
455                response = get_http_session().post(
456                    self.webhook_url,
457                    json=default_body,
458                    headers=headers,
459                    **kwargs,
460                )
461                response.raise_for_status()
462            except RequestException as exc:
463                raise NotificationTransportError(
464                    exc.response.text if exc.response is not None else str(exc)
465                ) from exc
466            return [
467                response.status_code,
468                response.text,
469            ]
470
471        if self.webhook_ca:
472            with DockerInlineTLS(self.webhook_ca, authentication_kp=None) as tls:
473                return send(verify=tls.ca_cert)
474        return send()
475
476    def send_webhook_slack(self, notification: Notification) -> list[str]:
477        """Send notification to slack or slack-compatible endpoints"""
478        fields = [
479            {
480                "title": _("Severity"),
481                "value": notification.severity,
482                "short": True,
483            },
484            {
485                "title": _("Dispatched for user"),
486                "value": str(notification.user),
487                "short": True,
488            },
489        ]
490        if notification.event:
491            if notification.event.user:
492                fields.append(
493                    {
494                        "title": _("Event user"),
495                        "value": str(notification.event.user.get("username")),
496                        "short": True,
497                    },
498                )
499            for key, value in notification.event.context.items():
500                if not isinstance(value, str):
501                    continue
502                # https://birdie0.github.io/discord-webhooks-guide/other/field_limits.html
503                if len(fields) >= DISCORD_FIELD_LIMIT:
504                    continue
505                fields.append({"title": key[:256], "value": value[:1024]})
506        body = {
507            "username": "authentik",
508            "icon_url": "https://goauthentik.io/img/icon.png",
509            "attachments": [
510                {
511                    "author_name": "authentik",
512                    "author_link": "https://goauthentik.io",
513                    "author_icon": "https://goauthentik.io/img/icon.png",
514                    "title": notification.body,
515                    "color": "#fd4b2d",
516                    "fields": fields,
517                    "footer": f"authentik {authentik_full_version()}",
518                }
519            ],
520        }
521        if notification.event:
522            body["attachments"][0]["title"] = notification.event.action
523        try:
524            response = get_http_session().post(self.webhook_url, json=body)
525            response.raise_for_status()
526        except RequestException as exc:
527            text = exc.response.text if exc.response is not None else str(exc)
528            raise NotificationTransportError(text) from exc
529        return [
530            response.status_code,
531            response.text,
532        ]
533
534    def send_email(self, notification: Notification) -> list[str]:
535        """Send notification via global email configuration"""
536        from authentik.stages.email.tasks import send_mail
537
538        if notification.user.email.strip() == "":
539            LOGGER.info(
540                "Discarding notification as user has no email address",
541                user=notification.user,
542                notification=notification,
543            )
544            return None
545        context = {
546            "key_value": {
547                "user_email": notification.user.email,
548                "user_username": notification.user.username,
549            },
550            "body": notification.body,
551            "title": "",
552        }
553        if notification.event and notification.event.user:
554            context["key_value"]["event_user_email"] = notification.event.user.get("email", None)
555            context["key_value"]["event_user_username"] = notification.event.user.get(
556                "username", None
557            )
558        if notification.hyperlink:
559            context["link"] = {
560                "target": notification.hyperlink,
561                "label": notification.hyperlink_label,
562            }
563        if notification.event:
564            context["title"] += notification.event.action
565            for key, value in notification.event.context.items():
566                if not isinstance(value, str):
567                    continue
568                context["key_value"][key] = value
569        else:
570            context["title"] += notification.body[:NOTIFICATION_SUMMARY_LENGTH]
571        # TODO: improve permission check
572        if notification.user.is_superuser:
573            context["source"] = {
574                "from": self.name,
575            }
576        mail = TemplateEmailMessage(
577            subject=self.email_subject_prefix + context["title"],
578            to=[(notification.user.name, notification.user.email)],
579            language=notification.user.locale(),
580            template_name=self.email_template,
581            template_context=context,
582        )
583        send_mail.send_with_options(args=(mail.__dict__,), rel_obj=self)
584        return []
585
586    @property
587    def serializer(self) -> type[Serializer]:
588        from authentik.events.api.notification_transports import (
589            NotificationTransportSerializer,
590        )
591
592        return NotificationTransportSerializer
593
594    def __str__(self) -> str:
595        return f"Notification Transport {self.name}"
596
597    class Meta:
598        verbose_name = _("Notification Transport")
599        verbose_name_plural = _("Notification Transports")

Action which is executed when a Rule matches

def uuid(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def name(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def mode(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def send_once(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def email_subject_prefix(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def email_template(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def webhook_url(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

webhook_ca

Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.

In the example::

class Child(Model):
    parent = ForeignKey(Parent, related_name='children')

Child.parent is a ForwardManyToOneDescriptor instance.

webhook_mapping_body

Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.

In the example::

class Child(Model):
    parent = ForeignKey(Parent, related_name='children')

Child.parent is a ForwardManyToOneDescriptor instance.

webhook_mapping_headers

Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.

In the example::

class Child(Model):
    parent = ForeignKey(Parent, related_name='children')

Child.parent is a ForwardManyToOneDescriptor instance.

def send(self, notification: Notification) -> list[str]:
392    def send(self, notification: Notification) -> list[str]:
393        """Send notification to user, called from async task"""
394        if self.mode == TransportMode.LOCAL:
395            return self.send_local(notification)
396        if self.mode == TransportMode.WEBHOOK:
397            return self.send_webhook(notification)
398        if self.mode == TransportMode.WEBHOOK_SLACK:
399            return self.send_webhook_slack(notification)
400        if self.mode == TransportMode.EMAIL:
401            return self.send_email(notification)
402        raise ValueError(f"Invalid mode {self.mode} set")

Send notification to user, called from async task

def send_local(self, notification: Notification) -> list[str]:
404    def send_local(self, notification: Notification) -> list[str]:
405        """Local notification delivery"""
406        if self.webhook_mapping_body:
407            self.webhook_mapping_body.evaluate(
408                user=notification.user,
409                request=None,
410                notification=notification,
411            )
412        notification.save()
413        layer = get_channel_layer()
414        layer.group_send_blocking(
415            build_user_group(notification.user),
416            {
417                "type": "event.notification",
418                "id": str(notification.pk),
419                "data": notification.serializer(notification).data,
420            },
421        )
422        return []

Local notification delivery

def send_webhook(self, notification: Notification) -> list[str]:
424    def send_webhook(self, notification: Notification) -> list[str]:
425        """Send notification to generic webhook"""
426        default_body = {
427            "body": notification.body,
428            "severity": notification.severity,
429            "user_email": notification.user.email,
430            "user_username": notification.user.username,
431        }
432        if notification.event and notification.event.user:
433            default_body["event_user_email"] = notification.event.user.get("email", None)
434            default_body["event_user_username"] = notification.event.user.get("username", None)
435        headers = {}
436        if self.webhook_mapping_body:
437            default_body = sanitize_item(
438                self.webhook_mapping_body.evaluate(
439                    user=notification.user,
440                    request=None,
441                    notification=notification,
442                )
443            )
444        if self.webhook_mapping_headers:
445            headers = sanitize_item(
446                self.webhook_mapping_headers.evaluate(
447                    user=notification.user,
448                    request=None,
449                    notification=notification,
450                )
451            )
452
453        def send(**kwargs):
454            try:
455                response = get_http_session().post(
456                    self.webhook_url,
457                    json=default_body,
458                    headers=headers,
459                    **kwargs,
460                )
461                response.raise_for_status()
462            except RequestException as exc:
463                raise NotificationTransportError(
464                    exc.response.text if exc.response is not None else str(exc)
465                ) from exc
466            return [
467                response.status_code,
468                response.text,
469            ]
470
471        if self.webhook_ca:
472            with DockerInlineTLS(self.webhook_ca, authentication_kp=None) as tls:
473                return send(verify=tls.ca_cert)
474        return send()

Send notification to generic webhook

def send_webhook_slack(self, notification: Notification) -> list[str]:
476    def send_webhook_slack(self, notification: Notification) -> list[str]:
477        """Send notification to slack or slack-compatible endpoints"""
478        fields = [
479            {
480                "title": _("Severity"),
481                "value": notification.severity,
482                "short": True,
483            },
484            {
485                "title": _("Dispatched for user"),
486                "value": str(notification.user),
487                "short": True,
488            },
489        ]
490        if notification.event:
491            if notification.event.user:
492                fields.append(
493                    {
494                        "title": _("Event user"),
495                        "value": str(notification.event.user.get("username")),
496                        "short": True,
497                    },
498                )
499            for key, value in notification.event.context.items():
500                if not isinstance(value, str):
501                    continue
502                # https://birdie0.github.io/discord-webhooks-guide/other/field_limits.html
503                if len(fields) >= DISCORD_FIELD_LIMIT:
504                    continue
505                fields.append({"title": key[:256], "value": value[:1024]})
506        body = {
507            "username": "authentik",
508            "icon_url": "https://goauthentik.io/img/icon.png",
509            "attachments": [
510                {
511                    "author_name": "authentik",
512                    "author_link": "https://goauthentik.io",
513                    "author_icon": "https://goauthentik.io/img/icon.png",
514                    "title": notification.body,
515                    "color": "#fd4b2d",
516                    "fields": fields,
517                    "footer": f"authentik {authentik_full_version()}",
518                }
519            ],
520        }
521        if notification.event:
522            body["attachments"][0]["title"] = notification.event.action
523        try:
524            response = get_http_session().post(self.webhook_url, json=body)
525            response.raise_for_status()
526        except RequestException as exc:
527            text = exc.response.text if exc.response is not None else str(exc)
528            raise NotificationTransportError(text) from exc
529        return [
530            response.status_code,
531            response.text,
532        ]

Send notification to slack or slack-compatible endpoints

def send_email(self, notification: Notification) -> list[str]:
534    def send_email(self, notification: Notification) -> list[str]:
535        """Send notification via global email configuration"""
536        from authentik.stages.email.tasks import send_mail
537
538        if notification.user.email.strip() == "":
539            LOGGER.info(
540                "Discarding notification as user has no email address",
541                user=notification.user,
542                notification=notification,
543            )
544            return None
545        context = {
546            "key_value": {
547                "user_email": notification.user.email,
548                "user_username": notification.user.username,
549            },
550            "body": notification.body,
551            "title": "",
552        }
553        if notification.event and notification.event.user:
554            context["key_value"]["event_user_email"] = notification.event.user.get("email", None)
555            context["key_value"]["event_user_username"] = notification.event.user.get(
556                "username", None
557            )
558        if notification.hyperlink:
559            context["link"] = {
560                "target": notification.hyperlink,
561                "label": notification.hyperlink_label,
562            }
563        if notification.event:
564            context["title"] += notification.event.action
565            for key, value in notification.event.context.items():
566                if not isinstance(value, str):
567                    continue
568                context["key_value"][key] = value
569        else:
570            context["title"] += notification.body[:NOTIFICATION_SUMMARY_LENGTH]
571        # TODO: improve permission check
572        if notification.user.is_superuser:
573            context["source"] = {
574                "from": self.name,
575            }
576        mail = TemplateEmailMessage(
577            subject=self.email_subject_prefix + context["title"],
578            to=[(notification.user.name, notification.user.email)],
579            language=notification.user.locale(),
580            template_name=self.email_template,
581            template_context=context,
582        )
583        send_mail.send_with_options(args=(mail.__dict__,), rel_obj=self)
584        return []

Send notification via global email configuration

serializer: type[rest_framework.serializers.Serializer]
586    @property
587    def serializer(self) -> type[Serializer]:
588        from authentik.events.api.notification_transports import (
589            NotificationTransportSerializer,
590        )
591
592        return NotificationTransportSerializer

Get serializer for this model

tasks

Accessor to the related objects manager on the one-to-many relation created by GenericRelation.

In the example::

class Post(Model):
    comments = GenericRelation(Comment)

post.comments is a ReverseGenericManyToOneDescriptor instance.

def get_mode_display(unknown):

Method descriptor with partial application of the given arguments and keywords.

Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.

webhook_ca_id
webhook_mapping_body_id
webhook_mapping_headers_id
def objects(unknown):

The type of the None singleton.

notificationrule_set

Accessor to the related objects manager on the forward and reverse sides of a many-to-many relation.

In the example::

class Pizza(Model):
    toppings = ManyToManyField(Topping, related_name='pizzas')

Pizza.toppings and Topping.pizzas are ManyToManyDescriptor instances.

Most of the implementation is delegated to a dynamically defined manager class built by create_forward_many_to_many_manager() defined below.

lifecyclerule_set

Accessor to the related objects manager on the forward and reverse sides of a many-to-many relation.

In the example::

class Pizza(Model):
    toppings = ManyToManyField(Topping, related_name='pizzas')

Pizza.toppings and Topping.pizzas are ManyToManyDescriptor instances.

Most of the implementation is delegated to a dynamically defined manager class built by create_forward_many_to_many_manager() defined below.

class NotificationTransport.DoesNotExist(django.core.exceptions.ObjectDoesNotExist):

The requested object does not exist

class NotificationTransport.MultipleObjectsReturned(django.core.exceptions.MultipleObjectsReturned):

The query returned multiple objects when only one was expected.

class NotificationSeverity(django.db.models.enums.TextChoices):
602class NotificationSeverity(models.TextChoices):
603    """Severity images that a notification can have"""
604
605    NOTICE = "notice", _("Notice")
606    WARNING = "warning", _("Warning")
607    ALERT = "alert", _("Alert")

Severity images that a notification can have

class Notification(authentik.lib.models.SerializerModel):
610class Notification(SerializerModel):
611    """Event Notification"""
612
613    uuid = models.UUIDField(primary_key=True, editable=False, default=uuid4)
614    severity = models.TextField(choices=NotificationSeverity.choices)
615    body = models.TextField()
616    hyperlink = models.TextField(blank=True, null=True, max_length=4096)
617    hyperlink_label = models.TextField(blank=True, null=True)
618    created = models.DateTimeField(auto_now_add=True)
619    event = models.ForeignKey(Event, on_delete=models.SET_NULL, null=True, blank=True)
620    seen = models.BooleanField(default=False)
621    user = models.ForeignKey(User, on_delete=models.CASCADE)
622
623    @property
624    def serializer(self) -> type[Serializer]:
625        from authentik.events.api.notifications import NotificationSerializer
626
627        return NotificationSerializer
628
629    def __str__(self) -> str:
630        body_trunc = (
631            (self.body[:NOTIFICATION_SUMMARY_LENGTH] + "..")
632            if len(self.body) > NOTIFICATION_SUMMARY_LENGTH
633            else self.body
634        )
635        return f"Notification for user {self.user_id}: {body_trunc}"
636
637    class Meta:
638        verbose_name = _("Notification")
639        verbose_name_plural = _("Notifications")

Event Notification

def uuid(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def severity(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def body(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def created(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

event

Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.

In the example::

class Child(Model):
    parent = ForeignKey(Parent, related_name='children')

Child.parent is a ForwardManyToOneDescriptor instance.

def seen(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

user

Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.

In the example::

class Child(Model):
    parent = ForeignKey(Parent, related_name='children')

Child.parent is a ForwardManyToOneDescriptor instance.

serializer: type[rest_framework.serializers.Serializer]
623    @property
624    def serializer(self) -> type[Serializer]:
625        from authentik.events.api.notifications import NotificationSerializer
626
627        return NotificationSerializer

Get serializer for this model

def get_severity_display(unknown):

Method descriptor with partial application of the given arguments and keywords.

Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.

def get_next_by_created(unknown):

Method descriptor with partial application of the given arguments and keywords.

Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.

def get_previous_by_created(unknown):

Method descriptor with partial application of the given arguments and keywords.

Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.

event_id
user_id
def objects(unknown):

The type of the None singleton.

class Notification.DoesNotExist(django.core.exceptions.ObjectDoesNotExist):

The requested object does not exist

class Notification.MultipleObjectsReturned(django.core.exceptions.MultipleObjectsReturned):

The query returned multiple objects when only one was expected.

642class NotificationRule(TasksModel, SerializerModel, PolicyBindingModel):
643    """Decide when to create a Notification based on policies attached to this object."""
644
645    name = models.TextField(unique=True)
646    transports = models.ManyToManyField(
647        NotificationTransport,
648        help_text=_(
649            "Select which transports should be used to notify the user. If none are "
650            "selected, the notification will only be shown in the authentik UI."
651        ),
652        blank=True,
653    )
654    severity = models.TextField(
655        choices=NotificationSeverity.choices,
656        default=NotificationSeverity.NOTICE,
657        help_text=_("Controls which severity level the created notifications will have."),
658    )
659    destination_group = models.ForeignKey(
660        Group,
661        help_text=_(
662            "Define which group of users this notification should be sent and shown to. "
663            "If left empty, Notification won't ben sent."
664        ),
665        null=True,
666        blank=True,
667        on_delete=models.SET_NULL,
668    )
669    destination_event_user = models.BooleanField(
670        default=False,
671        help_text=_(
672            "When enabled, notification will be sent to user the user that triggered the event."
673            "When destination_group is configured, notification is sent to both."
674        ),
675    )
676
677    def destination_users(self, event: Event) -> Generator[User, Any]:
678        if self.destination_event_user and event.user.get("pk"):
679            yield User(pk=event.user.get("pk"))
680        if self.destination_group:
681            yield from self.destination_group.users.all()
682
683    @property
684    def serializer(self) -> type[Serializer]:
685        from authentik.events.api.notification_rules import NotificationRuleSerializer
686
687        return NotificationRuleSerializer
688
689    def __str__(self) -> str:
690        return f"Notification Rule {self.name}"
691
692    class Meta:
693        verbose_name = _("Notification Rule")
694        verbose_name_plural = _("Notification Rules")

Decide when to create a Notification based on policies attached to this object.

def name(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

transports

Accessor to the related objects manager on the forward and reverse sides of a many-to-many relation.

In the example::

class Pizza(Model):
    toppings = ManyToManyField(Topping, related_name='pizzas')

Pizza.toppings and Topping.pizzas are ManyToManyDescriptor instances.

Most of the implementation is delegated to a dynamically defined manager class built by create_forward_many_to_many_manager() defined below.

def severity(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

destination_group

Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.

In the example::

class Child(Model):
    parent = ForeignKey(Parent, related_name='children')

Child.parent is a ForwardManyToOneDescriptor instance.

def destination_event_user(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def destination_users( self, event: Event) -> Generator[authentik.core.models.User, typing.Any]:
677    def destination_users(self, event: Event) -> Generator[User, Any]:
678        if self.destination_event_user and event.user.get("pk"):
679            yield User(pk=event.user.get("pk"))
680        if self.destination_group:
681            yield from self.destination_group.users.all()
serializer: type[rest_framework.serializers.Serializer]
683    @property
684    def serializer(self) -> type[Serializer]:
685        from authentik.events.api.notification_rules import NotificationRuleSerializer
686
687        return NotificationRuleSerializer

Get serializer for this model

tasks

Accessor to the related objects manager on the one-to-many relation created by GenericRelation.

In the example::

class Post(Model):
    comments = GenericRelation(Comment)

post.comments is a ReverseGenericManyToOneDescriptor instance.

def get_severity_display(unknown):

Method descriptor with partial application of the given arguments and keywords.

Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.

destination_group_id
policybindingmodel_ptr_id
policybindingmodel_ptr

Accessor to the related object on the forward side of a one-to-one relation.

In the example::

class Restaurant(Model):
    place = OneToOneField(Place, related_name='restaurant')

Restaurant.place is a ForwardOneToOneDescriptor instance.

class NotificationRule.DoesNotExist(authentik.policies.models.PolicyBindingModel.DoesNotExist):

The requested object does not exist

class NotificationRule.MultipleObjectsReturned(authentik.policies.models.PolicyBindingModel.MultipleObjectsReturned):

The query returned multiple objects when only one was expected.

class NotificationWebhookMapping(authentik.core.models.PropertyMapping):
697class NotificationWebhookMapping(PropertyMapping):
698    """Modify the payload of outgoing webhook requests"""
699
700    @property
701    def component(self) -> str:
702        return "ak-property-mapping-notification-form"
703
704    @property
705    def serializer(self) -> type[type[Serializer]]:
706        from authentik.events.api.notification_mappings import (
707            NotificationWebhookMappingSerializer,
708        )
709
710        return NotificationWebhookMappingSerializer
711
712    def __str__(self):
713        return f"Webhook Mapping {self.name}"
714
715    class Meta:
716        verbose_name = _("Webhook Mapping")
717        verbose_name_plural = _("Webhook Mappings")

Modify the payload of outgoing webhook requests

component: str
700    @property
701    def component(self) -> str:
702        return "ak-property-mapping-notification-form"

Return component used to edit this object

serializer: type[type[rest_framework.serializers.Serializer]]
704    @property
705    def serializer(self) -> type[type[Serializer]]:
706        from authentik.events.api.notification_mappings import (
707            NotificationWebhookMappingSerializer,
708        )
709
710        return NotificationWebhookMappingSerializer

Get serializer for this model

propertymapping_ptr_id
propertymapping_ptr

Accessor to the related object on the forward side of a one-to-one relation.

In the example::

class Restaurant(Model):
    place = OneToOneField(Place, related_name='restaurant')

Restaurant.place is a ForwardOneToOneDescriptor instance.

authenticatorsmsstage_set

Accessor to the related objects manager on the reverse side of a many-to-one relation.

In the example::

class Child(Model):
    parent = ForeignKey(Parent, related_name='children')

Parent.children is a ReverseManyToOneDescriptor instance.

Most of the implementation is delegated to a dynamically defined manager class built by create_forward_many_to_many_manager() defined below.

class NotificationWebhookMapping.DoesNotExist(authentik.core.models.PropertyMapping.DoesNotExist):

The requested object does not exist

class NotificationWebhookMapping.MultipleObjectsReturned(authentik.core.models.PropertyMapping.MultipleObjectsReturned):

The query returned multiple objects when only one was expected.