authentik.enterprise.providers.ssf.tasks
1from typing import Any 2from uuid import UUID 3 4from django.http import HttpRequest 5from django.utils.timezone import now 6from django.utils.translation import gettext_lazy as _ 7from dramatiq.actor import actor 8from requests.exceptions import RequestException 9from structlog.stdlib import get_logger 10 11from authentik.core.apps import AppAccessWithoutBindings 12from authentik.core.models import User 13from authentik.enterprise.providers.ssf.models import ( 14 DeliveryMethods, 15 EventTypes, 16 SSFEventStatus, 17 Stream, 18 StreamEvent, 19 StreamStatus, 20) 21from authentik.lib.utils.http import get_http_session 22from authentik.lib.utils.time import timedelta_from_string 23from authentik.policies.engine import PolicyEngine 24from authentik.tasks.middleware import CurrentTask 25 26session = get_http_session() 27LOGGER = get_logger() 28 29 30def send_ssf_events( 31 event_type: EventTypes, 32 data: dict, 33 stream_filter: dict | None = None, 34 request: HttpRequest | None = None, 35 **extra_data, 36): 37 """Wrapper to send an SSF event to multiple streams""" 38 events_data = {} 39 if not stream_filter: 40 stream_filter = {} 41 stream_filter["events_requested__contains"] = [event_type] 42 if request and hasattr(request, "request_id"): 43 extra_data.setdefault("txn", request.request_id) 44 for stream in Stream.objects.filter(**stream_filter): 45 event_data = stream.prepare_event_payload(event_type, data, **extra_data) 46 events_data[stream.uuid] = event_data 47 if not events_data: 48 return 49 ssf_events_dispatch.send(events_data) 50 51 52@actor(description=_("Dispatch SSF events.")) 53def ssf_events_dispatch(events_data: dict[str, dict[str, Any]]): 54 for stream_uuid, event_data in events_data.items(): 55 stream = Stream.objects.filter(pk=stream_uuid).first() 56 if not stream: 57 continue 58 send_ssf_event.send_with_options(args=(stream_uuid, event_data), rel_obj=stream.provider) 59 60 61def _check_app_access(stream: Stream, event_data: dict) -> bool: 62 """Check if event is related to user and if so, check 63 if the user has access to the application""" 64 # `event_data` is a dict version of a StreamEvent 65 sub_id = event_data.get("payload", {}).get("sub_id", {}) 66 email = sub_id.get("user", {}).get("email", None) 67 if not email: 68 return True 69 user = User.objects.filter(email=email).first() 70 if not user: 71 return True 72 engine = PolicyEngine(stream.provider.backchannel_application, user) 73 engine.empty_result = AppAccessWithoutBindings.get() 74 engine.use_cache = False 75 engine.build() 76 return engine.passing 77 78 79@actor(description=_("Send an SSF event.")) 80def send_ssf_event(stream_uuid: UUID, event_data: dict[str, Any]): 81 self = CurrentTask.get_task() 82 83 stream = Stream.objects.filter(pk=stream_uuid).first() 84 if not stream: 85 return 86 if not _check_app_access(stream, event_data): 87 return 88 event = StreamEvent.objects.create(**event_data) 89 self.set_uid(event.pk) 90 if event.status == SSFEventStatus.SENT: 91 return 92 if stream.delivery_method not in [DeliveryMethods.RISC_PUSH, DeliveryMethods.RFC_PUSH]: 93 return 94 95 headers = {"Content-Type": "application/secevent+jwt", "Accept": "application/json"} 96 if stream.authorization_header: 97 headers["Authorization"] = stream.authorization_header 98 try: 99 response = session.post( 100 event.stream.endpoint_url, 101 data=event.stream.encode(event.payload), 102 headers=headers, 103 verify=stream.provider.push_verify_certificates, 104 timeout=180, 105 ) 106 response.raise_for_status() 107 event.status = SSFEventStatus.SENT 108 event.save() 109 self.info("Event successfully sent", status=response.status_code) 110 # Cleanup, if we were the last pending message for this stream and it has been deleted 111 # (status=StreamStatus.DISABLED), then we can delete the stream 112 if ( 113 not StreamEvent.objects.filter( 114 stream=stream, 115 status__in=[SSFEventStatus.PENDING_FAILED, SSFEventStatus.PENDING_NEW], 116 ).exists() 117 and stream.status == StreamStatus.DISABLED 118 ): 119 LOGGER.info( 120 "Deleting inactive stream as all pending messages were sent.", stream=stream 121 ) 122 self.info("Deleting inactive stream as all pending messages were sent.") 123 stream.delete() 124 except RequestException as exc: 125 LOGGER.warning("Failed to send SSF event", exc=exc, stream=stream) 126 attrs = {} 127 if exc.response is not None: 128 attrs["response"] = { 129 "content": exc.response.text, 130 "status": exc.response.status_code, 131 } 132 self.warning(exc) 133 self.warning("Failed to send request", **attrs) 134 # Re-up the expiry of the stream event 135 event.expires = now() + timedelta_from_string(event.stream.provider.event_retention) 136 self.info(f"Event will be re-sent at {event.expires}") 137 event.status = SSFEventStatus.PENDING_FAILED 138 event.save()
session =
<authentik.lib.utils.http.TimeoutSession object>
LOGGER =
<BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
def
send_ssf_events( event_type: authentik.enterprise.providers.ssf.models.EventTypes, data: dict, stream_filter: dict | None = None, request: django.http.request.HttpRequest | None = None, **extra_data):
31def send_ssf_events( 32 event_type: EventTypes, 33 data: dict, 34 stream_filter: dict | None = None, 35 request: HttpRequest | None = None, 36 **extra_data, 37): 38 """Wrapper to send an SSF event to multiple streams""" 39 events_data = {} 40 if not stream_filter: 41 stream_filter = {} 42 stream_filter["events_requested__contains"] = [event_type] 43 if request and hasattr(request, "request_id"): 44 extra_data.setdefault("txn", request.request_id) 45 for stream in Stream.objects.filter(**stream_filter): 46 event_data = stream.prepare_event_payload(event_type, data, **extra_data) 47 events_data[stream.uuid] = event_data 48 if not events_data: 49 return 50 ssf_events_dispatch.send(events_data)
Wrapper to send an SSF event to multiple streams