authentik.events.logs

 1from collections.abc import Generator
 2from contextlib import contextmanager
 3from dataclasses import dataclass, field
 4from datetime import UTC, datetime
 5from typing import Any
 6
 7from django.utils.timezone import now
 8from rest_framework.fields import CharField, ChoiceField, DateTimeField, DictField
 9from structlog import configure, get_config
10from structlog.stdlib import NAME_TO_LEVEL, ProcessorFormatter, get_logger
11from structlog.testing import LogCapture
12from structlog.types import EventDict
13
14from authentik.core.api.utils import PassiveSerializer
15from authentik.events.utils import sanitize_dict
16
17
18@dataclass()
19class LogEvent:
20
21    event: str
22    log_level: str
23    logger: str
24    timestamp: datetime = field(default_factory=now)
25    attributes: dict[str, Any] = field(default_factory=dict)
26
27    @staticmethod
28    def from_event_dict(item: EventDict) -> LogEvent:
29        event = item.pop("event")
30        log_level = item.pop("level").lower()
31        timestamp = datetime.fromisoformat(item.pop("timestamp")).replace(tzinfo=UTC)
32        item.pop("pid", None)
33        # Sometimes log entries have both `level` and `log_level` set, but `level` is always set
34        item.pop("log_level", None)
35        return LogEvent(
36            event, log_level, item.pop("logger"), timestamp, attributes=sanitize_dict(item)
37        )
38
39    def log(self):
40        get_logger(self.logger).log(NAME_TO_LEVEL[self.log_level], self.event, **self.attributes)
41
42
43class LogEventSerializer(PassiveSerializer):
44    """Single log message with all context logged."""
45
46    timestamp = DateTimeField()
47    log_level = ChoiceField(choices=tuple((x, x) for x in NAME_TO_LEVEL.keys()))
48    logger = CharField()
49    event = CharField()
50    attributes = DictField()
51
52    # TODO(2024.6?): This is a migration helper to return a correct API response for logs that
53    # have been saved in an older format (mostly just list[str] with just the messages)
54    def to_representation(self, instance):
55        if isinstance(instance, str):
56            instance = LogEvent(instance, "", "")
57        elif isinstance(instance, list):
58            instance = [LogEvent(x, "", "") for x in instance]
59        return super().to_representation(instance)
60
61
62@contextmanager
63def capture_logs(log_default_output=True) -> Generator[list[LogEvent]]:
64    """Capture log entries created"""
65    logs = []
66    cap = LogCapture()
67    # Modify `_Configuration.default_processors` set via `configure` but always
68    # keep the list instance intact to not break references held by bound
69    # loggers.
70    processors: list = get_config()["processors"]
71    old_processors = processors.copy()
72    try:
73        # clear processors list and use LogCapture for testing
74        if ProcessorFormatter.wrap_for_formatter in processors:
75            processors.remove(ProcessorFormatter.wrap_for_formatter)
76        processors.append(cap)
77        configure(processors=processors)
78        yield logs
79        for raw_log in cap.entries:
80            logs.append(LogEvent.from_event_dict(raw_log))
81    finally:
82        # remove LogCapture and restore original processors
83        processors.clear()
84        processors.extend(old_processors)
85        configure(processors=processors)
@dataclass()
class LogEvent:
19@dataclass()
20class LogEvent:
21
22    event: str
23    log_level: str
24    logger: str
25    timestamp: datetime = field(default_factory=now)
26    attributes: dict[str, Any] = field(default_factory=dict)
27
28    @staticmethod
29    def from_event_dict(item: EventDict) -> LogEvent:
30        event = item.pop("event")
31        log_level = item.pop("level").lower()
32        timestamp = datetime.fromisoformat(item.pop("timestamp")).replace(tzinfo=UTC)
33        item.pop("pid", None)
34        # Sometimes log entries have both `level` and `log_level` set, but `level` is always set
35        item.pop("log_level", None)
36        return LogEvent(
37            event, log_level, item.pop("logger"), timestamp, attributes=sanitize_dict(item)
38        )
39
40    def log(self):
41        get_logger(self.logger).log(NAME_TO_LEVEL[self.log_level], self.event, **self.attributes)
LogEvent( event: str, log_level: str, logger: str, timestamp: datetime.datetime = <factory>, attributes: dict[str, typing.Any] = <factory>)
event: str
log_level: str
logger: str
timestamp: datetime.datetime
attributes: dict[str, typing.Any]
@staticmethod
def from_event_dict(item: MutableMapping[str, Any]) -> LogEvent:
28    @staticmethod
29    def from_event_dict(item: EventDict) -> LogEvent:
30        event = item.pop("event")
31        log_level = item.pop("level").lower()
32        timestamp = datetime.fromisoformat(item.pop("timestamp")).replace(tzinfo=UTC)
33        item.pop("pid", None)
34        # Sometimes log entries have both `level` and `log_level` set, but `level` is always set
35        item.pop("log_level", None)
36        return LogEvent(
37            event, log_level, item.pop("logger"), timestamp, attributes=sanitize_dict(item)
38        )
def log(self):
40    def log(self):
41        get_logger(self.logger).log(NAME_TO_LEVEL[self.log_level], self.event, **self.attributes)
class LogEventSerializer(authentik.core.api.utils.PassiveSerializer):
44class LogEventSerializer(PassiveSerializer):
45    """Single log message with all context logged."""
46
47    timestamp = DateTimeField()
48    log_level = ChoiceField(choices=tuple((x, x) for x in NAME_TO_LEVEL.keys()))
49    logger = CharField()
50    event = CharField()
51    attributes = DictField()
52
53    # TODO(2024.6?): This is a migration helper to return a correct API response for logs that
54    # have been saved in an older format (mostly just list[str] with just the messages)
55    def to_representation(self, instance):
56        if isinstance(instance, str):
57            instance = LogEvent(instance, "", "")
58        elif isinstance(instance, list):
59            instance = [LogEvent(x, "", "") for x in instance]
60        return super().to_representation(instance)

Single log message with all context logged.

timestamp
log_level
logger
event
attributes
def to_representation(self, instance):
55    def to_representation(self, instance):
56        if isinstance(instance, str):
57            instance = LogEvent(instance, "", "")
58        elif isinstance(instance, list):
59            instance = [LogEvent(x, "", "") for x in instance]
60        return super().to_representation(instance)

Object instance -> Dict of primitive datatypes.

@contextmanager
def capture_logs( log_default_output=True) -> Generator[list[LogEvent]]:
63@contextmanager
64def capture_logs(log_default_output=True) -> Generator[list[LogEvent]]:
65    """Capture log entries created"""
66    logs = []
67    cap = LogCapture()
68    # Modify `_Configuration.default_processors` set via `configure` but always
69    # keep the list instance intact to not break references held by bound
70    # loggers.
71    processors: list = get_config()["processors"]
72    old_processors = processors.copy()
73    try:
74        # clear processors list and use LogCapture for testing
75        if ProcessorFormatter.wrap_for_formatter in processors:
76            processors.remove(ProcessorFormatter.wrap_for_formatter)
77        processors.append(cap)
78        configure(processors=processors)
79        yield logs
80        for raw_log in cap.entries:
81            logs.append(LogEvent.from_event_dict(raw_log))
82    finally:
83        # remove LogCapture and restore original processors
84        processors.clear()
85        processors.extend(old_processors)
86        configure(processors=processors)

Capture log entries created