authentik.events.signals

authentik events signal listener

  1"""authentik events signal listener"""
  2
  3from importlib import import_module
  4from typing import Any
  5
  6from django.conf import settings
  7from django.contrib.auth.signals import user_logged_in, user_logged_out
  8from django.db.models.signals import post_save, pre_delete
  9from django.dispatch import receiver
 10from django.http import HttpRequest
 11from rest_framework.request import Request
 12
 13from authentik.core.models import AuthenticatedSession, User
 14from authentik.core.signals import login_failed, password_changed
 15from authentik.events.models import Event, EventAction
 16from authentik.flows.models import Stage
 17from authentik.flows.planner import (
 18    PLAN_CONTEXT_DEVICE,
 19    PLAN_CONTEXT_OUTPOST,
 20    PLAN_CONTEXT_SOURCE,
 21    FlowPlan,
 22)
 23from authentik.flows.views.executor import SESSION_KEY_PLAN
 24from authentik.stages.invitation.models import Invitation
 25from authentik.stages.invitation.signals import invitation_used
 26from authentik.stages.password.stage import PLAN_CONTEXT_METHOD, PLAN_CONTEXT_METHOD_ARGS
 27from authentik.stages.user_write.signals import user_write
 28from authentik.tenants.utils import get_current_tenant
 29
 30SESSION_LOGIN_EVENT = "login_event"
 31_session_engine = import_module(settings.SESSION_ENGINE)
 32
 33
 34@receiver(user_logged_in)
 35def on_user_logged_in(sender, request: HttpRequest, user: User, **_):
 36    """Log successful login"""
 37    kwargs = {}
 38    if SESSION_KEY_PLAN in request.session:
 39        flow_plan: FlowPlan = request.session[SESSION_KEY_PLAN]
 40        if PLAN_CONTEXT_SOURCE in flow_plan.context:
 41            # Login request came from an external source, save it in the context
 42            kwargs[PLAN_CONTEXT_SOURCE] = flow_plan.context[PLAN_CONTEXT_SOURCE]
 43        if PLAN_CONTEXT_METHOD in flow_plan.context:
 44            # Save the login method used
 45            kwargs[PLAN_CONTEXT_METHOD] = flow_plan.context[PLAN_CONTEXT_METHOD]
 46            kwargs[PLAN_CONTEXT_METHOD_ARGS] = flow_plan.context.get(PLAN_CONTEXT_METHOD_ARGS, {})
 47        if PLAN_CONTEXT_OUTPOST in flow_plan.context:
 48            # Save outpost context
 49            kwargs[PLAN_CONTEXT_OUTPOST] = flow_plan.context[PLAN_CONTEXT_OUTPOST]
 50        if PLAN_CONTEXT_DEVICE in flow_plan.context:
 51            # Save device
 52            kwargs[PLAN_CONTEXT_DEVICE] = flow_plan.context[PLAN_CONTEXT_DEVICE]
 53    event = Event.new(EventAction.LOGIN, **kwargs).from_http(request, user=user)
 54    request.session[SESSION_LOGIN_EVENT] = event
 55    request.session.save()
 56
 57
 58def get_login_event(request_or_session: HttpRequest | AuthenticatedSession | None) -> Event | None:
 59    """Wrapper to get login event that can be mocked in tests"""
 60    session = None
 61    if not request_or_session:
 62        return None
 63    if isinstance(request_or_session, HttpRequest | Request):
 64        session = request_or_session.session
 65    if isinstance(request_or_session, AuthenticatedSession):
 66        SessionStore = _session_engine.SessionStore
 67        session = SessionStore(request_or_session.session.session_key)
 68    return session.get(SESSION_LOGIN_EVENT, None)
 69
 70
 71@receiver(user_logged_out)
 72def on_user_logged_out(sender, request: HttpRequest, user: User, **kwargs):
 73    """Log successfully logout"""
 74    # Check if this even comes from the user_login stage's middleware, which will set an extra
 75    # argument
 76    event = Event.new(EventAction.LOGOUT)
 77    if "event_extra" in kwargs:
 78        event.context.update(kwargs["event_extra"])
 79    event.from_http(request, user=user)
 80
 81
 82@receiver(user_write)
 83def on_user_write(sender, request: HttpRequest, user: User, data: dict[str, Any], **kwargs):
 84    """Log User write"""
 85    data["created"] = kwargs.get("created", False)
 86    Event.new(EventAction.USER_WRITE, **data).from_http(request, user=user)
 87
 88
 89@receiver(login_failed)
 90def on_login_failed(
 91    signal,
 92    sender,
 93    credentials: dict[str, str],
 94    request: HttpRequest,
 95    stage: Stage | None = None,
 96    context: dict[str, Any] | None = None,
 97    **kwargs,
 98):
 99    """Failed Login, authentik custom event"""
100    user = User.objects.filter(username=credentials.get("username")).first()
101    context = context or {}
102    Event.new(EventAction.LOGIN_FAILED, **credentials, stage=stage, **context).from_http(
103        request, user
104    )
105
106
107@receiver(invitation_used)
108def on_invitation_used(sender, request: HttpRequest, invitation: Invitation, **_):
109    """Log Invitation usage"""
110    Event.new(EventAction.INVITE_USED, invitation_uuid=invitation.invite_uuid.hex).from_http(
111        request
112    )
113
114
115@receiver(password_changed)
116def on_password_changed(sender, user: User, password: str, request: HttpRequest | None, **_):
117    """Log password change"""
118    Event.new(EventAction.PASSWORD_SET).from_http(request, user=user)
119
120
121@receiver(post_save, sender=Event)
122def event_post_save_notification(sender, instance: Event, **_):
123    """Start task to check if any policies trigger an notification on this event"""
124    from authentik.events.tasks import event_trigger_dispatch
125
126    event_trigger_dispatch.send(instance.event_uuid)
127
128
129@receiver(pre_delete, sender=User)
130def event_user_pre_delete_cleanup(sender, instance: User, **_):
131    """If gdpr_compliance is enabled, remove all the user's events"""
132    from authentik.events.tasks import gdpr_cleanup
133
134    if get_current_tenant().gdpr_compliance:
135        gdpr_cleanup.send(instance.pk)
SESSION_LOGIN_EVENT = 'login_event'
@receiver(user_logged_in)
def on_user_logged_in( sender, request: django.http.request.HttpRequest, user: authentik.core.models.User, **_):
35@receiver(user_logged_in)
36def on_user_logged_in(sender, request: HttpRequest, user: User, **_):
37    """Log successful login"""
38    kwargs = {}
39    if SESSION_KEY_PLAN in request.session:
40        flow_plan: FlowPlan = request.session[SESSION_KEY_PLAN]
41        if PLAN_CONTEXT_SOURCE in flow_plan.context:
42            # Login request came from an external source, save it in the context
43            kwargs[PLAN_CONTEXT_SOURCE] = flow_plan.context[PLAN_CONTEXT_SOURCE]
44        if PLAN_CONTEXT_METHOD in flow_plan.context:
45            # Save the login method used
46            kwargs[PLAN_CONTEXT_METHOD] = flow_plan.context[PLAN_CONTEXT_METHOD]
47            kwargs[PLAN_CONTEXT_METHOD_ARGS] = flow_plan.context.get(PLAN_CONTEXT_METHOD_ARGS, {})
48        if PLAN_CONTEXT_OUTPOST in flow_plan.context:
49            # Save outpost context
50            kwargs[PLAN_CONTEXT_OUTPOST] = flow_plan.context[PLAN_CONTEXT_OUTPOST]
51        if PLAN_CONTEXT_DEVICE in flow_plan.context:
52            # Save device
53            kwargs[PLAN_CONTEXT_DEVICE] = flow_plan.context[PLAN_CONTEXT_DEVICE]
54    event = Event.new(EventAction.LOGIN, **kwargs).from_http(request, user=user)
55    request.session[SESSION_LOGIN_EVENT] = event
56    request.session.save()

Log successful login

def get_login_event( request_or_session: django.http.request.HttpRequest | authentik.core.models.AuthenticatedSession | None) -> authentik.events.models.Event | None:
59def get_login_event(request_or_session: HttpRequest | AuthenticatedSession | None) -> Event | None:
60    """Wrapper to get login event that can be mocked in tests"""
61    session = None
62    if not request_or_session:
63        return None
64    if isinstance(request_or_session, HttpRequest | Request):
65        session = request_or_session.session
66    if isinstance(request_or_session, AuthenticatedSession):
67        SessionStore = _session_engine.SessionStore
68        session = SessionStore(request_or_session.session.session_key)
69    return session.get(SESSION_LOGIN_EVENT, None)

Wrapper to get login event that can be mocked in tests

@receiver(user_logged_out)
def on_user_logged_out( sender, request: django.http.request.HttpRequest, user: authentik.core.models.User, **kwargs):
72@receiver(user_logged_out)
73def on_user_logged_out(sender, request: HttpRequest, user: User, **kwargs):
74    """Log successfully logout"""
75    # Check if this even comes from the user_login stage's middleware, which will set an extra
76    # argument
77    event = Event.new(EventAction.LOGOUT)
78    if "event_extra" in kwargs:
79        event.context.update(kwargs["event_extra"])
80    event.from_http(request, user=user)

Log successfully logout

@receiver(user_write)
def on_user_write( sender, request: django.http.request.HttpRequest, user: authentik.core.models.User, data: dict[str, typing.Any], **kwargs):
83@receiver(user_write)
84def on_user_write(sender, request: HttpRequest, user: User, data: dict[str, Any], **kwargs):
85    """Log User write"""
86    data["created"] = kwargs.get("created", False)
87    Event.new(EventAction.USER_WRITE, **data).from_http(request, user=user)

Log User write

@receiver(login_failed)
def on_login_failed( signal, sender, credentials: dict[str, str], request: django.http.request.HttpRequest, stage: authentik.flows.models.Stage | None = None, context: dict[str, Any] | None = None, **kwargs):
 90@receiver(login_failed)
 91def on_login_failed(
 92    signal,
 93    sender,
 94    credentials: dict[str, str],
 95    request: HttpRequest,
 96    stage: Stage | None = None,
 97    context: dict[str, Any] | None = None,
 98    **kwargs,
 99):
100    """Failed Login, authentik custom event"""
101    user = User.objects.filter(username=credentials.get("username")).first()
102    context = context or {}
103    Event.new(EventAction.LOGIN_FAILED, **credentials, stage=stage, **context).from_http(
104        request, user
105    )

Failed Login, authentik custom event

@receiver(invitation_used)
def on_invitation_used( sender, request: django.http.request.HttpRequest, invitation: authentik.stages.invitation.models.Invitation, **_):
108@receiver(invitation_used)
109def on_invitation_used(sender, request: HttpRequest, invitation: Invitation, **_):
110    """Log Invitation usage"""
111    Event.new(EventAction.INVITE_USED, invitation_uuid=invitation.invite_uuid.hex).from_http(
112        request
113    )

Log Invitation usage

@receiver(password_changed)
def on_password_changed( sender, user: authentik.core.models.User, password: str, request: django.http.request.HttpRequest | None, **_):
116@receiver(password_changed)
117def on_password_changed(sender, user: User, password: str, request: HttpRequest | None, **_):
118    """Log password change"""
119    Event.new(EventAction.PASSWORD_SET).from_http(request, user=user)

Log password change

@receiver(post_save, sender=Event)
def event_post_save_notification(sender, instance: authentik.events.models.Event, **_):
122@receiver(post_save, sender=Event)
123def event_post_save_notification(sender, instance: Event, **_):
124    """Start task to check if any policies trigger an notification on this event"""
125    from authentik.events.tasks import event_trigger_dispatch
126
127    event_trigger_dispatch.send(instance.event_uuid)

Start task to check if any policies trigger an notification on this event

@receiver(pre_delete, sender=User)
def event_user_pre_delete_cleanup(sender, instance: authentik.core.models.User, **_):
130@receiver(pre_delete, sender=User)
131def event_user_pre_delete_cleanup(sender, instance: User, **_):
132    """If gdpr_compliance is enabled, remove all the user's events"""
133    from authentik.events.tasks import gdpr_cleanup
134
135    if get_current_tenant().gdpr_compliance:
136        gdpr_cleanup.send(instance.pk)

If gdpr_compliance is enabled, remove all the user's events