authentik.enterprise.providers.ssf.tasks

  1from typing import Any
  2from uuid import UUID
  3
  4from django.http import HttpRequest
  5from django.utils.timezone import now
  6from django.utils.translation import gettext_lazy as _
  7from dramatiq.actor import actor
  8from requests.exceptions import RequestException
  9from structlog.stdlib import get_logger
 10
 11from authentik.core.apps import AppAccessWithoutBindings
 12from authentik.core.models import User
 13from authentik.enterprise.providers.ssf.models import (
 14    DeliveryMethods,
 15    EventTypes,
 16    SSFEventStatus,
 17    Stream,
 18    StreamEvent,
 19    StreamStatus,
 20)
 21from authentik.lib.utils.http import get_http_session
 22from authentik.lib.utils.time import timedelta_from_string
 23from authentik.policies.engine import PolicyEngine
 24from authentik.tasks.middleware import CurrentTask
 25
 26session = get_http_session()
 27LOGGER = get_logger()
 28
 29
 30def send_ssf_events(
 31    event_type: EventTypes,
 32    data: dict,
 33    stream_filter: dict | None = None,
 34    request: HttpRequest | None = None,
 35    **extra_data,
 36):
 37    """Wrapper to send an SSF event to multiple streams"""
 38    events_data = {}
 39    if not stream_filter:
 40        stream_filter = {}
 41    stream_filter["events_requested__contains"] = [event_type]
 42    if request and hasattr(request, "request_id"):
 43        extra_data.setdefault("txn", request.request_id)
 44    for stream in Stream.objects.filter(**stream_filter):
 45        event_data = stream.prepare_event_payload(event_type, data, **extra_data)
 46        events_data[stream.uuid] = event_data
 47    if not events_data:
 48        return
 49    ssf_events_dispatch.send(events_data)
 50
 51
 52@actor(description=_("Dispatch SSF events."))
 53def ssf_events_dispatch(events_data: dict[str, dict[str, Any]]):
 54    for stream_uuid, event_data in events_data.items():
 55        stream = Stream.objects.filter(pk=stream_uuid).first()
 56        if not stream:
 57            continue
 58        send_ssf_event.send_with_options(args=(stream_uuid, event_data), rel_obj=stream.provider)
 59
 60
 61def _check_app_access(stream: Stream, event_data: dict) -> bool:
 62    """Check if event is related to user and if so, check
 63    if the user has access to the application"""
 64    # `event_data` is a dict version of a StreamEvent
 65    sub_id = event_data.get("payload", {}).get("sub_id", {})
 66    email = sub_id.get("user", {}).get("email", None)
 67    if not email:
 68        return True
 69    user = User.objects.filter(email=email).first()
 70    if not user:
 71        return True
 72    engine = PolicyEngine(stream.provider.backchannel_application, user)
 73    engine.empty_result = AppAccessWithoutBindings.get()
 74    engine.use_cache = False
 75    engine.build()
 76    return engine.passing
 77
 78
 79@actor(description=_("Send an SSF event."))
 80def send_ssf_event(stream_uuid: UUID, event_data: dict[str, Any]):
 81    self = CurrentTask.get_task()
 82
 83    stream = Stream.objects.filter(pk=stream_uuid).first()
 84    if not stream:
 85        return
 86    if not _check_app_access(stream, event_data):
 87        return
 88    event = StreamEvent.objects.create(**event_data)
 89    self.set_uid(event.pk)
 90    if event.status == SSFEventStatus.SENT:
 91        return
 92    if stream.delivery_method not in [DeliveryMethods.RISC_PUSH, DeliveryMethods.RFC_PUSH]:
 93        return
 94
 95    headers = {"Content-Type": "application/secevent+jwt", "Accept": "application/json"}
 96    if stream.authorization_header:
 97        headers["Authorization"] = stream.authorization_header
 98    try:
 99        response = session.post(
100            event.stream.endpoint_url,
101            data=event.stream.encode(event.payload),
102            headers=headers,
103            verify=stream.provider.push_verify_certificates,
104            timeout=180,
105        )
106        response.raise_for_status()
107        event.status = SSFEventStatus.SENT
108        event.save()
109        self.info("Event successfully sent", status=response.status_code)
110        # Cleanup, if we were the last pending message for this stream and it has been deleted
111        # (status=StreamStatus.DISABLED), then we can delete the stream
112        if (
113            not StreamEvent.objects.filter(
114                stream=stream,
115                status__in=[SSFEventStatus.PENDING_FAILED, SSFEventStatus.PENDING_NEW],
116            ).exists()
117            and stream.status == StreamStatus.DISABLED
118        ):
119            LOGGER.info(
120                "Deleting inactive stream as all pending messages were sent.", stream=stream
121            )
122            self.info("Deleting inactive stream as all pending messages were sent.")
123            stream.delete()
124    except RequestException as exc:
125        LOGGER.warning("Failed to send SSF event", exc=exc, stream=stream)
126        attrs = {}
127        if exc.response is not None:
128            attrs["response"] = {
129                "content": exc.response.text,
130                "status": exc.response.status_code,
131            }
132        self.warning(exc)
133        self.warning("Failed to send request", **attrs)
134        # Re-up the expiry of the stream event
135        event.expires = now() + timedelta_from_string(event.stream.provider.event_retention)
136        self.info(f"Event will be re-sent at {event.expires}")
137        event.status = SSFEventStatus.PENDING_FAILED
138        event.save()
LOGGER = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
def send_ssf_events( event_type: authentik.enterprise.providers.ssf.models.EventTypes, data: dict, stream_filter: dict | None = None, request: django.http.request.HttpRequest | None = None, **extra_data):
31def send_ssf_events(
32    event_type: EventTypes,
33    data: dict,
34    stream_filter: dict | None = None,
35    request: HttpRequest | None = None,
36    **extra_data,
37):
38    """Wrapper to send an SSF event to multiple streams"""
39    events_data = {}
40    if not stream_filter:
41        stream_filter = {}
42    stream_filter["events_requested__contains"] = [event_type]
43    if request and hasattr(request, "request_id"):
44        extra_data.setdefault("txn", request.request_id)
45    for stream in Stream.objects.filter(**stream_filter):
46        event_data = stream.prepare_event_payload(event_type, data, **extra_data)
47        events_data[stream.uuid] = event_data
48    if not events_data:
49        return
50    ssf_events_dispatch.send(events_data)

Wrapper to send an SSF event to multiple streams