authentik.events.utils
event utilities
1"""event utilities""" 2 3import re 4from copy import copy 5from dataclasses import asdict, is_dataclass 6from datetime import date, datetime, time, timedelta 7from enum import Enum 8from pathlib import Path 9from types import GeneratorType, NoneType 10from typing import Any 11from uuid import UUID 12 13from django.contrib.auth.models import AnonymousUser 14from django.core.handlers.wsgi import WSGIRequest 15from django.core.serializers.json import DjangoJSONEncoder 16from django.db import models 17from django.db.models.base import Model 18from django.http.request import HttpRequest 19from django.utils import timezone 20from django.views.debug import SafeExceptionReporterFilter 21from geoip2.models import ASN, City 22from guardian.conf import settings 23from guardian.shortcuts import get_anonymous_user 24 25from authentik.blueprints.v1.common import YAMLTag 26from authentik.core.models import User 27from authentik.events.context_processors.asn import ASN_CONTEXT_PROCESSOR 28from authentik.events.context_processors.geoip import GEOIP_CONTEXT_PROCESSOR 29from authentik.policies.types import PolicyRequest 30 31# Special keys which are *not* cleaned, even when the default filter 32# is matched 33ALLOWED_SPECIAL_KEYS = re.compile( 34 r"passing|password_change_date|^auth_method(_args)?$", 35 flags=re.I, 36) 37 38 39def cleanse_str(raw: Any) -> str: 40 return str(raw).replace("\u0000", "") 41 42 43def cleanse_item(key: str, value: Any) -> Any: 44 """Cleanse a single item""" 45 if isinstance(value, dict): 46 return cleanse_dict(value) 47 if isinstance(value, list | tuple | set): 48 for idx, item in enumerate(value): 49 value[idx] = cleanse_item(key, item) 50 return value 51 try: 52 if not SafeExceptionReporterFilter.hidden_settings.search(key): 53 return value 54 if ALLOWED_SPECIAL_KEYS.search(key): 55 return value 56 return SafeExceptionReporterFilter.cleansed_substitute 57 except TypeError: # pragma: no cover 58 return value 59 60 61def cleanse_dict(source: dict[Any, Any]) -> dict[Any, Any]: 62 """Cleanse a dictionary, recursively""" 63 final_dict = {} 64 for key, value in source.items(): 65 new_value = cleanse_item(key, value) 66 if new_value is not ...: 67 final_dict[key] = new_value 68 return final_dict 69 70 71def model_to_dict(model: Model) -> dict[str, Any]: 72 """Convert model to dict""" 73 name = cleanse_str(model) 74 if hasattr(model, "name"): 75 name = model.name 76 return { 77 "app": model._meta.app_label, 78 "model_name": model._meta.model_name, 79 "pk": model.pk, 80 "name": name, 81 } 82 83 84def get_user(user: User | AnonymousUser) -> dict[str, Any]: 85 """Convert user object to dictionary""" 86 if isinstance(user, AnonymousUser): 87 try: 88 user = get_anonymous_user() 89 except User.DoesNotExist: 90 return {} 91 user_data = { 92 "username": user.username, 93 "pk": user.pk, 94 "email": user.email, 95 } 96 if user.username == settings.ANONYMOUS_USER_NAME: 97 user_data["is_anonymous"] = True 98 return user_data 99 100 101def sanitize_item(value: Any) -> Any: # noqa: PLR0911, PLR0912 102 """Sanitize a single item, ensure it is JSON parsable""" 103 if is_dataclass(value): 104 # Because asdict calls `copy.deepcopy(obj)` on everything that's not tuple/dict, 105 # and deepcopy doesn't work with HttpRequest (neither django nor rest_framework). 106 # (more specifically doesn't work with ResolverMatch) 107 # rest_framework's custom Request class makes this more complicated as it also holds a 108 # thread lock. 109 # Since this class is mainly used for Events which already hold the http request context 110 # we just remove the http_request from the shallow policy request 111 # Currently, the only dataclass that actually holds an http request is a PolicyRequest 112 if isinstance(value, PolicyRequest) and value.http_request is not None: 113 value: PolicyRequest = copy(value) 114 value.http_request = None 115 value = asdict(value) 116 if isinstance(value, dict): 117 return sanitize_dict(value) 118 if isinstance(value, GeneratorType): 119 return sanitize_item(list(value)) 120 if isinstance(value, list | tuple | set): 121 new_values = [] 122 for item in value: 123 new_value = sanitize_item(item) 124 if new_value: 125 new_values.append(new_value) 126 return new_values 127 if isinstance(value, User | AnonymousUser): 128 return sanitize_dict(get_user(value)) 129 if isinstance(value, models.Model): 130 return sanitize_dict(model_to_dict(value)) 131 if isinstance(value, UUID): 132 return value.hex 133 if isinstance(value, HttpRequest | WSGIRequest): 134 return ... 135 if isinstance(value, City): 136 return GEOIP_CONTEXT_PROCESSOR.city_to_dict(value) 137 if isinstance(value, ASN): 138 return ASN_CONTEXT_PROCESSOR.asn_to_dict(value) 139 if isinstance(value, Path): 140 return cleanse_str(value) 141 if isinstance(value, Exception): 142 return cleanse_str(value) 143 if isinstance(value, YAMLTag): 144 return cleanse_str(value) 145 if isinstance(value, Enum): 146 return value.value 147 if isinstance(value, type): 148 return { 149 "type": value.__name__, 150 "module": value.__module__, 151 } 152 # See 153 # https://github.com/encode/django-rest-framework/blob/master/rest_framework/utils/encoders.py 154 # For Date Time string spec, see ECMA 262 155 # https://ecma-international.org/ecma-262/5.1/#sec-15.9.1.15 156 if isinstance(value, datetime): 157 representation = value.isoformat() 158 if representation.endswith("+00:00"): 159 representation = representation[:-6] + "Z" 160 return representation 161 if isinstance(value, date): 162 return value.isoformat() 163 if isinstance(value, time): 164 if timezone and timezone.is_aware(value): 165 raise ValueError("JSON can't represent timezone-aware times.") 166 return value.isoformat() 167 if isinstance(value, timedelta): 168 return cleanse_str(value.total_seconds()) 169 if callable(value): 170 return { 171 "type": "callable", 172 "name": value.__name__, 173 "module": value.__module__, 174 } 175 # List taken from the stdlib's JSON encoder (_make_iterencode, encoder.py:415) 176 if isinstance(value, bool | int | float | NoneType | list | tuple | dict): 177 return value 178 try: 179 return DjangoJSONEncoder().default(value) 180 except TypeError: 181 return cleanse_str(value) 182 return cleanse_str(value) 183 184 185def sanitize_dict(source: dict[Any, Any]) -> dict[Any, Any]: 186 """clean source of all Models that would interfere with the JSONField. 187 Models are replaced with a dictionary of { 188 app: str, 189 name: str, 190 pk: Any 191 }""" 192 final_dict = {} 193 for key, value in source.items(): 194 new_value = sanitize_item(value) 195 if new_value is not ...: 196 final_dict[key] = new_value 197 return final_dict
ALLOWED_SPECIAL_KEYS =
re.compile('passing|password_change_date|^auth_method(_args)?$', re.IGNORECASE)
def
cleanse_str(raw: Any) -> str:
def
cleanse_item(key: str, value: Any) -> Any:
44def cleanse_item(key: str, value: Any) -> Any: 45 """Cleanse a single item""" 46 if isinstance(value, dict): 47 return cleanse_dict(value) 48 if isinstance(value, list | tuple | set): 49 for idx, item in enumerate(value): 50 value[idx] = cleanse_item(key, item) 51 return value 52 try: 53 if not SafeExceptionReporterFilter.hidden_settings.search(key): 54 return value 55 if ALLOWED_SPECIAL_KEYS.search(key): 56 return value 57 return SafeExceptionReporterFilter.cleansed_substitute 58 except TypeError: # pragma: no cover 59 return value
Cleanse a single item
def
cleanse_dict(source: dict[typing.Any, typing.Any]) -> dict[typing.Any, typing.Any]:
62def cleanse_dict(source: dict[Any, Any]) -> dict[Any, Any]: 63 """Cleanse a dictionary, recursively""" 64 final_dict = {} 65 for key, value in source.items(): 66 new_value = cleanse_item(key, value) 67 if new_value is not ...: 68 final_dict[key] = new_value 69 return final_dict
Cleanse a dictionary, recursively
def
model_to_dict(model: django.db.models.base.Model) -> dict[str, typing.Any]:
72def model_to_dict(model: Model) -> dict[str, Any]: 73 """Convert model to dict""" 74 name = cleanse_str(model) 75 if hasattr(model, "name"): 76 name = model.name 77 return { 78 "app": model._meta.app_label, 79 "model_name": model._meta.model_name, 80 "pk": model.pk, 81 "name": name, 82 }
Convert model to dict
def
get_user( user: authentik.core.models.User | django.contrib.auth.models.AnonymousUser) -> dict[str, typing.Any]:
85def get_user(user: User | AnonymousUser) -> dict[str, Any]: 86 """Convert user object to dictionary""" 87 if isinstance(user, AnonymousUser): 88 try: 89 user = get_anonymous_user() 90 except User.DoesNotExist: 91 return {} 92 user_data = { 93 "username": user.username, 94 "pk": user.pk, 95 "email": user.email, 96 } 97 if user.username == settings.ANONYMOUS_USER_NAME: 98 user_data["is_anonymous"] = True 99 return user_data
Convert user object to dictionary
def
sanitize_item(value: Any) -> Any:
102def sanitize_item(value: Any) -> Any: # noqa: PLR0911, PLR0912 103 """Sanitize a single item, ensure it is JSON parsable""" 104 if is_dataclass(value): 105 # Because asdict calls `copy.deepcopy(obj)` on everything that's not tuple/dict, 106 # and deepcopy doesn't work with HttpRequest (neither django nor rest_framework). 107 # (more specifically doesn't work with ResolverMatch) 108 # rest_framework's custom Request class makes this more complicated as it also holds a 109 # thread lock. 110 # Since this class is mainly used for Events which already hold the http request context 111 # we just remove the http_request from the shallow policy request 112 # Currently, the only dataclass that actually holds an http request is a PolicyRequest 113 if isinstance(value, PolicyRequest) and value.http_request is not None: 114 value: PolicyRequest = copy(value) 115 value.http_request = None 116 value = asdict(value) 117 if isinstance(value, dict): 118 return sanitize_dict(value) 119 if isinstance(value, GeneratorType): 120 return sanitize_item(list(value)) 121 if isinstance(value, list | tuple | set): 122 new_values = [] 123 for item in value: 124 new_value = sanitize_item(item) 125 if new_value: 126 new_values.append(new_value) 127 return new_values 128 if isinstance(value, User | AnonymousUser): 129 return sanitize_dict(get_user(value)) 130 if isinstance(value, models.Model): 131 return sanitize_dict(model_to_dict(value)) 132 if isinstance(value, UUID): 133 return value.hex 134 if isinstance(value, HttpRequest | WSGIRequest): 135 return ... 136 if isinstance(value, City): 137 return GEOIP_CONTEXT_PROCESSOR.city_to_dict(value) 138 if isinstance(value, ASN): 139 return ASN_CONTEXT_PROCESSOR.asn_to_dict(value) 140 if isinstance(value, Path): 141 return cleanse_str(value) 142 if isinstance(value, Exception): 143 return cleanse_str(value) 144 if isinstance(value, YAMLTag): 145 return cleanse_str(value) 146 if isinstance(value, Enum): 147 return value.value 148 if isinstance(value, type): 149 return { 150 "type": value.__name__, 151 "module": value.__module__, 152 } 153 # See 154 # https://github.com/encode/django-rest-framework/blob/master/rest_framework/utils/encoders.py 155 # For Date Time string spec, see ECMA 262 156 # https://ecma-international.org/ecma-262/5.1/#sec-15.9.1.15 157 if isinstance(value, datetime): 158 representation = value.isoformat() 159 if representation.endswith("+00:00"): 160 representation = representation[:-6] + "Z" 161 return representation 162 if isinstance(value, date): 163 return value.isoformat() 164 if isinstance(value, time): 165 if timezone and timezone.is_aware(value): 166 raise ValueError("JSON can't represent timezone-aware times.") 167 return value.isoformat() 168 if isinstance(value, timedelta): 169 return cleanse_str(value.total_seconds()) 170 if callable(value): 171 return { 172 "type": "callable", 173 "name": value.__name__, 174 "module": value.__module__, 175 } 176 # List taken from the stdlib's JSON encoder (_make_iterencode, encoder.py:415) 177 if isinstance(value, bool | int | float | NoneType | list | tuple | dict): 178 return value 179 try: 180 return DjangoJSONEncoder().default(value) 181 except TypeError: 182 return cleanse_str(value) 183 return cleanse_str(value)
Sanitize a single item, ensure it is JSON parsable
def
sanitize_dict(source: dict[typing.Any, typing.Any]) -> dict[typing.Any, typing.Any]:
186def sanitize_dict(source: dict[Any, Any]) -> dict[Any, Any]: 187 """clean source of all Models that would interfere with the JSONField. 188 Models are replaced with a dictionary of { 189 app: str, 190 name: str, 191 pk: Any 192 }""" 193 final_dict = {} 194 for key, value in source.items(): 195 new_value = sanitize_item(value) 196 if new_value is not ...: 197 final_dict[key] = new_value 198 return final_dict
clean source of all Models that would interfere with the JSONField. Models are replaced with a dictionary of { app: str, name: str, pk: Any }