authentik.events.logs
1from collections.abc import Generator 2from contextlib import contextmanager 3from dataclasses import dataclass, field 4from datetime import UTC, datetime 5from typing import Any 6 7from django.utils.timezone import now 8from rest_framework.fields import CharField, ChoiceField, DateTimeField, DictField 9from structlog import configure, get_config 10from structlog.stdlib import NAME_TO_LEVEL, ProcessorFormatter, get_logger 11from structlog.testing import LogCapture 12from structlog.types import EventDict 13 14from authentik.core.api.utils import PassiveSerializer 15from authentik.events.utils import sanitize_dict 16 17 18@dataclass() 19class LogEvent: 20 21 event: str 22 log_level: str 23 logger: str 24 timestamp: datetime = field(default_factory=now) 25 attributes: dict[str, Any] = field(default_factory=dict) 26 27 @staticmethod 28 def from_event_dict(item: EventDict) -> LogEvent: 29 event = item.pop("event") 30 log_level = item.pop("level").lower() 31 timestamp = datetime.fromisoformat(item.pop("timestamp")).replace(tzinfo=UTC) 32 item.pop("pid", None) 33 # Sometimes log entries have both `level` and `log_level` set, but `level` is always set 34 item.pop("log_level", None) 35 return LogEvent( 36 event, log_level, item.pop("logger"), timestamp, attributes=sanitize_dict(item) 37 ) 38 39 def log(self): 40 get_logger(self.logger).log(NAME_TO_LEVEL[self.log_level], self.event, **self.attributes) 41 42 43class LogEventSerializer(PassiveSerializer): 44 """Single log message with all context logged.""" 45 46 timestamp = DateTimeField() 47 log_level = ChoiceField(choices=tuple((x, x) for x in NAME_TO_LEVEL.keys())) 48 logger = CharField() 49 event = CharField() 50 attributes = DictField() 51 52 # TODO(2024.6?): This is a migration helper to return a correct API response for logs that 53 # have been saved in an older format (mostly just list[str] with just the messages) 54 def to_representation(self, instance): 55 if isinstance(instance, str): 56 instance = LogEvent(instance, "", "") 57 elif isinstance(instance, list): 58 instance = [LogEvent(x, "", "") for x in instance] 59 return super().to_representation(instance) 60 61 62@contextmanager 63def capture_logs(log_default_output=True) -> Generator[list[LogEvent]]: 64 """Capture log entries created""" 65 logs = [] 66 cap = LogCapture() 67 # Modify `_Configuration.default_processors` set via `configure` but always 68 # keep the list instance intact to not break references held by bound 69 # loggers. 70 processors: list = get_config()["processors"] 71 old_processors = processors.copy() 72 try: 73 # clear processors list and use LogCapture for testing 74 if ProcessorFormatter.wrap_for_formatter in processors: 75 processors.remove(ProcessorFormatter.wrap_for_formatter) 76 processors.append(cap) 77 configure(processors=processors) 78 yield logs 79 for raw_log in cap.entries: 80 logs.append(LogEvent.from_event_dict(raw_log)) 81 finally: 82 # remove LogCapture and restore original processors 83 processors.clear() 84 processors.extend(old_processors) 85 configure(processors=processors)
@dataclass()
class
LogEvent:
19@dataclass() 20class LogEvent: 21 22 event: str 23 log_level: str 24 logger: str 25 timestamp: datetime = field(default_factory=now) 26 attributes: dict[str, Any] = field(default_factory=dict) 27 28 @staticmethod 29 def from_event_dict(item: EventDict) -> LogEvent: 30 event = item.pop("event") 31 log_level = item.pop("level").lower() 32 timestamp = datetime.fromisoformat(item.pop("timestamp")).replace(tzinfo=UTC) 33 item.pop("pid", None) 34 # Sometimes log entries have both `level` and `log_level` set, but `level` is always set 35 item.pop("log_level", None) 36 return LogEvent( 37 event, log_level, item.pop("logger"), timestamp, attributes=sanitize_dict(item) 38 ) 39 40 def log(self): 41 get_logger(self.logger).log(NAME_TO_LEVEL[self.log_level], self.event, **self.attributes)
LogEvent( event: str, log_level: str, logger: str, timestamp: datetime.datetime = <factory>, attributes: dict[str, typing.Any] = <factory>)
28 @staticmethod 29 def from_event_dict(item: EventDict) -> LogEvent: 30 event = item.pop("event") 31 log_level = item.pop("level").lower() 32 timestamp = datetime.fromisoformat(item.pop("timestamp")).replace(tzinfo=UTC) 33 item.pop("pid", None) 34 # Sometimes log entries have both `level` and `log_level` set, but `level` is always set 35 item.pop("log_level", None) 36 return LogEvent( 37 event, log_level, item.pop("logger"), timestamp, attributes=sanitize_dict(item) 38 )
44class LogEventSerializer(PassiveSerializer): 45 """Single log message with all context logged.""" 46 47 timestamp = DateTimeField() 48 log_level = ChoiceField(choices=tuple((x, x) for x in NAME_TO_LEVEL.keys())) 49 logger = CharField() 50 event = CharField() 51 attributes = DictField() 52 53 # TODO(2024.6?): This is a migration helper to return a correct API response for logs that 54 # have been saved in an older format (mostly just list[str] with just the messages) 55 def to_representation(self, instance): 56 if isinstance(instance, str): 57 instance = LogEvent(instance, "", "") 58 elif isinstance(instance, list): 59 instance = [LogEvent(x, "", "") for x in instance] 60 return super().to_representation(instance)
Single log message with all context logged.
def
to_representation(self, instance):
55 def to_representation(self, instance): 56 if isinstance(instance, str): 57 instance = LogEvent(instance, "", "") 58 elif isinstance(instance, list): 59 instance = [LogEvent(x, "", "") for x in instance] 60 return super().to_representation(instance)
Object instance -> Dict of primitive datatypes.
Inherited Members
63@contextmanager 64def capture_logs(log_default_output=True) -> Generator[list[LogEvent]]: 65 """Capture log entries created""" 66 logs = [] 67 cap = LogCapture() 68 # Modify `_Configuration.default_processors` set via `configure` but always 69 # keep the list instance intact to not break references held by bound 70 # loggers. 71 processors: list = get_config()["processors"] 72 old_processors = processors.copy() 73 try: 74 # clear processors list and use LogCapture for testing 75 if ProcessorFormatter.wrap_for_formatter in processors: 76 processors.remove(ProcessorFormatter.wrap_for_formatter) 77 processors.append(cap) 78 configure(processors=processors) 79 yield logs 80 for raw_log in cap.entries: 81 logs.append(LogEvent.from_event_dict(raw_log)) 82 finally: 83 # remove LogCapture and restore original processors 84 processors.clear() 85 processors.extend(old_processors) 86 configure(processors=processors)
Capture log entries created