authentik.events.utils

event utilities

  1"""event utilities"""
  2
  3import re
  4from copy import copy
  5from dataclasses import asdict, is_dataclass
  6from datetime import date, datetime, time, timedelta
  7from enum import Enum
  8from pathlib import Path
  9from types import GeneratorType, NoneType
 10from typing import Any
 11from uuid import UUID
 12
 13from django.contrib.auth.models import AnonymousUser
 14from django.core.handlers.wsgi import WSGIRequest
 15from django.core.serializers.json import DjangoJSONEncoder
 16from django.db import models
 17from django.db.models.base import Model
 18from django.http.request import HttpRequest
 19from django.utils import timezone
 20from django.views.debug import SafeExceptionReporterFilter
 21from geoip2.models import ASN, City
 22from guardian.conf import settings
 23from guardian.shortcuts import get_anonymous_user
 24
 25from authentik.blueprints.v1.common import YAMLTag
 26from authentik.core.models import User
 27from authentik.events.context_processors.asn import ASN_CONTEXT_PROCESSOR
 28from authentik.events.context_processors.geoip import GEOIP_CONTEXT_PROCESSOR
 29from authentik.policies.types import PolicyRequest
 30
 31# Special keys which are *not* cleaned, even when the default filter
 32# is matched
 33ALLOWED_SPECIAL_KEYS = re.compile(
 34    r"passing|password_change_date|^auth_method(_args)?$",
 35    flags=re.I,
 36)
 37
 38
 39def cleanse_str(raw: Any) -> str:
 40    return str(raw).replace("\u0000", "")
 41
 42
 43def cleanse_item(key: str, value: Any) -> Any:
 44    """Cleanse a single item"""
 45    if isinstance(value, dict):
 46        return cleanse_dict(value)
 47    if isinstance(value, list | tuple | set):
 48        for idx, item in enumerate(value):
 49            value[idx] = cleanse_item(key, item)
 50        return value
 51    try:
 52        if not SafeExceptionReporterFilter.hidden_settings.search(key):
 53            return value
 54        if ALLOWED_SPECIAL_KEYS.search(key):
 55            return value
 56        return SafeExceptionReporterFilter.cleansed_substitute
 57    except TypeError:  # pragma: no cover
 58        return value
 59
 60
 61def cleanse_dict(source: dict[Any, Any]) -> dict[Any, Any]:
 62    """Cleanse a dictionary, recursively"""
 63    final_dict = {}
 64    for key, value in source.items():
 65        new_value = cleanse_item(key, value)
 66        if new_value is not ...:
 67            final_dict[key] = new_value
 68    return final_dict
 69
 70
 71def model_to_dict(model: Model) -> dict[str, Any]:
 72    """Convert model to dict"""
 73    name = cleanse_str(model)
 74    if hasattr(model, "name"):
 75        name = model.name
 76    return {
 77        "app": model._meta.app_label,
 78        "model_name": model._meta.model_name,
 79        "pk": model.pk,
 80        "name": name,
 81    }
 82
 83
 84def get_user(user: User | AnonymousUser) -> dict[str, Any]:
 85    """Convert user object to dictionary"""
 86    if isinstance(user, AnonymousUser):
 87        try:
 88            user = get_anonymous_user()
 89        except User.DoesNotExist:
 90            return {}
 91    user_data = {
 92        "username": user.username,
 93        "pk": user.pk,
 94        "email": user.email,
 95    }
 96    if user.username == settings.ANONYMOUS_USER_NAME:
 97        user_data["is_anonymous"] = True
 98    return user_data
 99
100
101def sanitize_item(value: Any) -> Any:  # noqa: PLR0911, PLR0912
102    """Sanitize a single item, ensure it is JSON parsable"""
103    if is_dataclass(value):
104        # Because asdict calls `copy.deepcopy(obj)` on everything that's not tuple/dict,
105        # and deepcopy doesn't work with HttpRequest (neither django nor rest_framework).
106        # (more specifically doesn't work with ResolverMatch)
107        # rest_framework's custom Request class makes this more complicated as it also holds a
108        # thread lock.
109        # Since this class is mainly used for Events which already hold the http request context
110        # we just remove the http_request from the shallow policy request
111        # Currently, the only dataclass that actually holds an http request is a PolicyRequest
112        if isinstance(value, PolicyRequest) and value.http_request is not None:
113            value: PolicyRequest = copy(value)
114            value.http_request = None
115        value = asdict(value)
116    if isinstance(value, dict):
117        return sanitize_dict(value)
118    if isinstance(value, GeneratorType):
119        return sanitize_item(list(value))
120    if isinstance(value, list | tuple | set):
121        new_values = []
122        for item in value:
123            new_value = sanitize_item(item)
124            if new_value:
125                new_values.append(new_value)
126        return new_values
127    if isinstance(value, User | AnonymousUser):
128        return sanitize_dict(get_user(value))
129    if isinstance(value, models.Model):
130        return sanitize_dict(model_to_dict(value))
131    if isinstance(value, UUID):
132        return value.hex
133    if isinstance(value, HttpRequest | WSGIRequest):
134        return ...
135    if isinstance(value, City):
136        return GEOIP_CONTEXT_PROCESSOR.city_to_dict(value)
137    if isinstance(value, ASN):
138        return ASN_CONTEXT_PROCESSOR.asn_to_dict(value)
139    if isinstance(value, Path):
140        return cleanse_str(value)
141    if isinstance(value, Exception):
142        return cleanse_str(value)
143    if isinstance(value, YAMLTag):
144        return cleanse_str(value)
145    if isinstance(value, Enum):
146        return value.value
147    if isinstance(value, type):
148        return {
149            "type": value.__name__,
150            "module": value.__module__,
151        }
152    # See
153    # https://github.com/encode/django-rest-framework/blob/master/rest_framework/utils/encoders.py
154    # For Date Time string spec, see ECMA 262
155    # https://ecma-international.org/ecma-262/5.1/#sec-15.9.1.15
156    if isinstance(value, datetime):
157        representation = value.isoformat()
158        if representation.endswith("+00:00"):
159            representation = representation[:-6] + "Z"
160        return representation
161    if isinstance(value, date):
162        return value.isoformat()
163    if isinstance(value, time):
164        if timezone and timezone.is_aware(value):
165            raise ValueError("JSON can't represent timezone-aware times.")
166        return value.isoformat()
167    if isinstance(value, timedelta):
168        return cleanse_str(value.total_seconds())
169    if callable(value):
170        return {
171            "type": "callable",
172            "name": value.__name__,
173            "module": value.__module__,
174        }
175    # List taken from the stdlib's JSON encoder (_make_iterencode, encoder.py:415)
176    if isinstance(value, bool | int | float | NoneType | list | tuple | dict):
177        return value
178    try:
179        return DjangoJSONEncoder().default(value)
180    except TypeError:
181        return cleanse_str(value)
182    return cleanse_str(value)
183
184
185def sanitize_dict(source: dict[Any, Any]) -> dict[Any, Any]:
186    """clean source of all Models that would interfere with the JSONField.
187    Models are replaced with a dictionary of {
188        app: str,
189        name: str,
190        pk: Any
191    }"""
192    final_dict = {}
193    for key, value in source.items():
194        new_value = sanitize_item(value)
195        if new_value is not ...:
196            final_dict[key] = new_value
197    return final_dict
ALLOWED_SPECIAL_KEYS = re.compile('passing|password_change_date|^auth_method(_args)?$', re.IGNORECASE)
def cleanse_str(raw: Any) -> str:
40def cleanse_str(raw: Any) -> str:
41    return str(raw).replace("\u0000", "")
def cleanse_item(key: str, value: Any) -> Any:
44def cleanse_item(key: str, value: Any) -> Any:
45    """Cleanse a single item"""
46    if isinstance(value, dict):
47        return cleanse_dict(value)
48    if isinstance(value, list | tuple | set):
49        for idx, item in enumerate(value):
50            value[idx] = cleanse_item(key, item)
51        return value
52    try:
53        if not SafeExceptionReporterFilter.hidden_settings.search(key):
54            return value
55        if ALLOWED_SPECIAL_KEYS.search(key):
56            return value
57        return SafeExceptionReporterFilter.cleansed_substitute
58    except TypeError:  # pragma: no cover
59        return value

Cleanse a single item

def cleanse_dict(source: dict[typing.Any, typing.Any]) -> dict[typing.Any, typing.Any]:
62def cleanse_dict(source: dict[Any, Any]) -> dict[Any, Any]:
63    """Cleanse a dictionary, recursively"""
64    final_dict = {}
65    for key, value in source.items():
66        new_value = cleanse_item(key, value)
67        if new_value is not ...:
68            final_dict[key] = new_value
69    return final_dict

Cleanse a dictionary, recursively

def model_to_dict(model: django.db.models.base.Model) -> dict[str, typing.Any]:
72def model_to_dict(model: Model) -> dict[str, Any]:
73    """Convert model to dict"""
74    name = cleanse_str(model)
75    if hasattr(model, "name"):
76        name = model.name
77    return {
78        "app": model._meta.app_label,
79        "model_name": model._meta.model_name,
80        "pk": model.pk,
81        "name": name,
82    }

Convert model to dict

def get_user( user: authentik.core.models.User | django.contrib.auth.models.AnonymousUser) -> dict[str, typing.Any]:
85def get_user(user: User | AnonymousUser) -> dict[str, Any]:
86    """Convert user object to dictionary"""
87    if isinstance(user, AnonymousUser):
88        try:
89            user = get_anonymous_user()
90        except User.DoesNotExist:
91            return {}
92    user_data = {
93        "username": user.username,
94        "pk": user.pk,
95        "email": user.email,
96    }
97    if user.username == settings.ANONYMOUS_USER_NAME:
98        user_data["is_anonymous"] = True
99    return user_data

Convert user object to dictionary

def sanitize_item(value: Any) -> Any:
102def sanitize_item(value: Any) -> Any:  # noqa: PLR0911, PLR0912
103    """Sanitize a single item, ensure it is JSON parsable"""
104    if is_dataclass(value):
105        # Because asdict calls `copy.deepcopy(obj)` on everything that's not tuple/dict,
106        # and deepcopy doesn't work with HttpRequest (neither django nor rest_framework).
107        # (more specifically doesn't work with ResolverMatch)
108        # rest_framework's custom Request class makes this more complicated as it also holds a
109        # thread lock.
110        # Since this class is mainly used for Events which already hold the http request context
111        # we just remove the http_request from the shallow policy request
112        # Currently, the only dataclass that actually holds an http request is a PolicyRequest
113        if isinstance(value, PolicyRequest) and value.http_request is not None:
114            value: PolicyRequest = copy(value)
115            value.http_request = None
116        value = asdict(value)
117    if isinstance(value, dict):
118        return sanitize_dict(value)
119    if isinstance(value, GeneratorType):
120        return sanitize_item(list(value))
121    if isinstance(value, list | tuple | set):
122        new_values = []
123        for item in value:
124            new_value = sanitize_item(item)
125            if new_value:
126                new_values.append(new_value)
127        return new_values
128    if isinstance(value, User | AnonymousUser):
129        return sanitize_dict(get_user(value))
130    if isinstance(value, models.Model):
131        return sanitize_dict(model_to_dict(value))
132    if isinstance(value, UUID):
133        return value.hex
134    if isinstance(value, HttpRequest | WSGIRequest):
135        return ...
136    if isinstance(value, City):
137        return GEOIP_CONTEXT_PROCESSOR.city_to_dict(value)
138    if isinstance(value, ASN):
139        return ASN_CONTEXT_PROCESSOR.asn_to_dict(value)
140    if isinstance(value, Path):
141        return cleanse_str(value)
142    if isinstance(value, Exception):
143        return cleanse_str(value)
144    if isinstance(value, YAMLTag):
145        return cleanse_str(value)
146    if isinstance(value, Enum):
147        return value.value
148    if isinstance(value, type):
149        return {
150            "type": value.__name__,
151            "module": value.__module__,
152        }
153    # See
154    # https://github.com/encode/django-rest-framework/blob/master/rest_framework/utils/encoders.py
155    # For Date Time string spec, see ECMA 262
156    # https://ecma-international.org/ecma-262/5.1/#sec-15.9.1.15
157    if isinstance(value, datetime):
158        representation = value.isoformat()
159        if representation.endswith("+00:00"):
160            representation = representation[:-6] + "Z"
161        return representation
162    if isinstance(value, date):
163        return value.isoformat()
164    if isinstance(value, time):
165        if timezone and timezone.is_aware(value):
166            raise ValueError("JSON can't represent timezone-aware times.")
167        return value.isoformat()
168    if isinstance(value, timedelta):
169        return cleanse_str(value.total_seconds())
170    if callable(value):
171        return {
172            "type": "callable",
173            "name": value.__name__,
174            "module": value.__module__,
175        }
176    # List taken from the stdlib's JSON encoder (_make_iterencode, encoder.py:415)
177    if isinstance(value, bool | int | float | NoneType | list | tuple | dict):
178        return value
179    try:
180        return DjangoJSONEncoder().default(value)
181    except TypeError:
182        return cleanse_str(value)
183    return cleanse_str(value)

Sanitize a single item, ensure it is JSON parsable

def sanitize_dict(source: dict[typing.Any, typing.Any]) -> dict[typing.Any, typing.Any]:
186def sanitize_dict(source: dict[Any, Any]) -> dict[Any, Any]:
187    """clean source of all Models that would interfere with the JSONField.
188    Models are replaced with a dictionary of {
189        app: str,
190        name: str,
191        pk: Any
192    }"""
193    final_dict = {}
194    for key, value in source.items():
195        new_value = sanitize_item(value)
196        if new_value is not ...:
197            final_dict[key] = new_value
198    return final_dict

clean source of all Models that would interfere with the JSONField. Models are replaced with a dictionary of { app: str, name: str, pk: Any }