authentik.lib.config
authentik core config loader
1"""authentik core config loader""" 2 3import base64 4import json 5import os 6from collections.abc import Mapping 7from contextlib import contextmanager 8from copy import deepcopy 9from dataclasses import dataclass, field 10from enum import Enum 11from glob import glob 12from json import JSONEncoder, dumps, loads 13from json.decoder import JSONDecodeError 14from pathlib import Path 15from sys import argv, stderr 16from time import time 17from typing import Any 18from urllib.parse import urlparse 19 20import yaml 21from django.conf import ImproperlyConfigured 22 23from authentik.lib.utils.dict import delete_path_in_dict, get_path_from_dict, set_path_in_dict 24 25SEARCH_PATHS = ["authentik/lib/default.yml", "/etc/authentik/config.yml", ""] + glob( 26 "/etc/authentik/config.d/*.yml", recursive=True 27) 28ENV_PREFIX = "AUTHENTIK" 29ENVIRONMENT = os.getenv(f"{ENV_PREFIX}_ENV", "local") 30 31# Old key -> new key 32DEPRECATIONS = { 33 "geoip": "events.context_processors.geoip", 34 "worker.concurrency": "worker.threads", 35} 36 37 38@dataclass(slots=True) 39class Attr: 40 """Single configuration attribute""" 41 42 class Source(Enum): 43 """Sources a configuration attribute can come from, determines what should be done with 44 Attr.source (and if it's set at all)""" 45 46 UNSPECIFIED = "unspecified" 47 ENV = "env" 48 CONFIG_FILE = "config_file" 49 URI = "uri" 50 51 value: Any 52 53 source_type: Source = field(default=Source.UNSPECIFIED) 54 55 # depending on source_type, might contain the environment variable or the path 56 # to the config file containing this change or the file containing this value 57 source: str | None = field(default=None) 58 59 def __post_init__(self): 60 if isinstance(self.value, Attr): 61 raise RuntimeError(f"config Attr with nested Attr for source {self.source}") 62 63 64class AttrEncoder(JSONEncoder): 65 """JSON encoder that can deal with `Attr` classes""" 66 67 def default(self, o: Any) -> Any: 68 if isinstance(o, Attr): 69 return o.value 70 return super().default(o) 71 72 73class UNSET: 74 """Used to test whether configuration key has not been set.""" 75 76 77class ConfigLoader: 78 """Search through SEARCH_PATHS and load configuration. Environment variables starting with 79 `ENV_PREFIX` are also applied. 80 81 A variable like AUTHENTIK_POSTGRESQL__HOST would translate to postgresql.host""" 82 83 deprecations: dict[tuple[str, str], str] = {} 84 85 def __init__(self, **kwargs): 86 super().__init__() 87 self.__config = {} 88 base_dir = Path(__file__).parent.joinpath(Path("../..")).resolve() 89 for _path in SEARCH_PATHS: 90 path = Path(_path) 91 # Check if path is relative, and if so join with base_dir 92 if not path.is_absolute(): 93 path = base_dir / path 94 if path.is_file() and path.exists(): 95 # Path is an existing file, so we just read it and update our config with it 96 self.update_from_file(path) 97 elif path.is_dir() and path.exists(): 98 # Path is an existing dir, so we try to read the env config from it 99 env_paths = [ 100 path / Path(ENVIRONMENT + ".yml"), 101 path / Path(ENVIRONMENT + ".env.yml"), 102 path / Path(ENVIRONMENT + ".yaml"), 103 path / Path(ENVIRONMENT + ".env.yaml"), 104 ] 105 for env_file in env_paths: 106 if env_file.is_file() and env_file.exists(): 107 # Update config with env file 108 self.update_from_file(env_file) 109 self.update_from_env() 110 self.update(self.__config, kwargs) 111 self.deprecations = self.check_deprecations() 112 113 def check_deprecations(self) -> dict[str, str]: 114 """Warn if any deprecated configuration options are used""" 115 116 def _pop_deprecated_key(current_obj, dot_parts, index): 117 """Recursive function to remove deprecated keys in configuration""" 118 dot_part = dot_parts[index] 119 if index == len(dot_parts) - 1: 120 return current_obj.pop(dot_part) 121 value = _pop_deprecated_key(current_obj[dot_part], dot_parts, index + 1) 122 if not current_obj[dot_part]: 123 current_obj.pop(dot_part) 124 return value 125 126 deprecation_replacements = {} 127 for deprecation, replacement in DEPRECATIONS.items(): 128 if self.get(deprecation, default=UNSET) is UNSET: 129 continue 130 message = ( 131 f"'{deprecation}' has been deprecated in favor of '{replacement}'! " 132 + "Please update your configuration." 133 ) 134 self.log( 135 "warning", 136 message, 137 ) 138 deprecation_replacements[(deprecation, replacement)] = message 139 140 deprecated_attr = _pop_deprecated_key(self.__config, deprecation.split("."), 0) 141 self.set(replacement, deprecated_attr) 142 return deprecation_replacements 143 144 def log(self, level: str, message: str, **kwargs): 145 """Custom Log method, we want to ensure ConfigLoader always logs JSON even when 146 'structlog' or 'logging' hasn't been configured yet.""" 147 output = { 148 "event": message, 149 "level": level, 150 "logger": self.__class__.__module__, 151 "timestamp": time(), 152 } 153 output.update(kwargs) 154 print(dumps(output), file=stderr) 155 156 def update(self, root: dict[str, Any], updatee: dict[str, Any]) -> dict[str, Any]: 157 """Recursively update dictionary""" 158 for key, raw_value in updatee.items(): 159 if isinstance(raw_value, Mapping): 160 root[key] = self.update(root.get(key, {}), raw_value) 161 else: 162 if isinstance(raw_value, str): 163 value = self.parse_uri(raw_value) 164 elif isinstance(raw_value, Attr) and isinstance(raw_value.value, str): 165 value = self.parse_uri(raw_value.value) 166 elif not isinstance(raw_value, Attr): 167 value = Attr(raw_value) 168 else: 169 value = raw_value 170 root[key] = value 171 return root 172 173 def refresh(self, key: str, default=None, sep=".") -> Any: 174 """Update a single value""" 175 attr: Attr = get_path_from_dict(self.raw, key, sep=sep, default=Attr(default)) 176 if attr.source_type != Attr.Source.URI: 177 return attr.value 178 attr.value = self.parse_uri(attr.source).value 179 return attr.value 180 181 def parse_uri(self, value: str) -> Attr: 182 """Parse string values which start with a URI""" 183 url = urlparse(value) 184 parsed_value = value 185 if url.scheme == "env": 186 parsed_value = os.getenv(url.netloc, url.query) 187 if url.scheme == "file": 188 try: 189 with open(url.path, encoding="utf8") as _file: 190 parsed_value = _file.read().strip() 191 except OSError as exc: 192 self.log("error", f"Failed to read config value from {url.path}: {exc}") 193 parsed_value = url.query 194 return Attr(parsed_value, Attr.Source.URI, value) 195 196 def update_from_file(self, path: Path): 197 """Update config from file contents""" 198 try: 199 with open(path, encoding="utf8") as file: 200 try: 201 self.update(self.__config, yaml.safe_load(file)) 202 self.log("debug", "Loaded config", file=str(path)) 203 except yaml.YAMLError as exc: 204 raise ImproperlyConfigured from exc 205 except PermissionError as exc: 206 self.log( 207 "warning", 208 "Permission denied while reading file", 209 path=path, 210 error=str(exc), 211 ) 212 213 def update_from_dict(self, update: dict): 214 """Update config from dict""" 215 self.__config.update(update) 216 217 def update_from_env(self): 218 """Check environment variables""" 219 outer = {} 220 idx = 0 221 for key, value in os.environ.items(): 222 if not key.startswith(ENV_PREFIX): 223 continue 224 relative_key = key.replace(f"{ENV_PREFIX}_", "", 1).replace("__", ".").lower() 225 # Check if the value is json, and try to load it 226 try: 227 value = loads(value) # noqa: PLW2901 228 except JSONDecodeError: 229 pass 230 attr_value = Attr(value, Attr.Source.ENV, relative_key) 231 set_path_in_dict(outer, relative_key, attr_value) 232 idx += 1 233 if idx > 0: 234 self.log("debug", "Loaded environment variables", count=idx) 235 self.update(self.__config, outer) 236 237 @contextmanager 238 def patch(self, path: str, value: Any): 239 """Context manager for unittests to patch a value""" 240 original_value = self.get(path, UNSET) 241 self.set(path, value) 242 try: 243 yield 244 finally: 245 if original_value is not UNSET: 246 self.set(path, original_value) 247 else: 248 self.delete(path) 249 250 @property 251 def raw(self) -> dict: 252 """Get raw config dictionary""" 253 return self.__config 254 255 def get(self, path: str, default=None, sep=".") -> Any: 256 """Access attribute by using yaml path""" 257 # Walk sub_dicts before parsing path 258 root = self.raw 259 # Walk each component of the path 260 attr: Attr = get_path_from_dict(root, path, sep=sep, default=Attr(default)) 261 return attr.value 262 263 def get_int(self, path: str, default=0) -> int: 264 """Wrapper for get that converts value into int""" 265 try: 266 return int(self.get(path, default)) 267 except ValueError as exc: 268 self.log("warning", "Failed to parse config as int", path=path, exc=str(exc)) 269 return default 270 271 def get_optional_int(self, path: str, default=None) -> int | None: 272 """Wrapper for get that converts value into int or None if set""" 273 value = self.get(path, UNSET) 274 if value is UNSET: 275 return default 276 try: 277 return int(value) 278 except (ValueError, TypeError) as exc: 279 if value is None or (isinstance(value, str) and value.lower() == "null"): 280 return None 281 self.log("warning", "Failed to parse config as int", path=path, exc=str(exc)) 282 return default 283 284 def get_bool(self, path: str, default=False) -> bool: 285 """Wrapper for get that converts value into boolean""" 286 value = self.get(path, UNSET) 287 if value is UNSET: 288 return default 289 return str(self.get(path)).lower() == "true" 290 291 def get_keys(self, path: str, sep=".") -> list[str]: 292 """List attribute keys by using yaml path""" 293 root = self.raw 294 attr: Attr = get_path_from_dict(root, path, sep=sep, default=Attr({})) 295 return attr.keys() 296 297 def get_dict_from_b64_json(self, path: str, default=None) -> dict: 298 """Wrapper for get that converts value from Base64 encoded string into dictionary""" 299 config_value = self.get(path) 300 if config_value is None: 301 return {} 302 try: 303 b64decoded_str = base64.b64decode(config_value).decode("utf-8") 304 return json.loads(b64decoded_str) 305 except (JSONDecodeError, TypeError, ValueError) as exc: 306 self.log( 307 "warning", 308 f"Ignored invalid configuration for '{path}' due to exception: {str(exc)}", 309 ) 310 return default if isinstance(default, dict) else {} 311 312 def set(self, path: str, value: Any, sep="."): 313 """Set value using same syntax as get()""" 314 if not isinstance(value, Attr): 315 value = Attr(value) 316 set_path_in_dict(self.raw, path, value, sep=sep) 317 318 def delete(self, path: str, sep="."): 319 delete_path_in_dict(self.raw, path, sep=sep) 320 321 322CONFIG = ConfigLoader() 323 324 325def django_db_config(config: ConfigLoader | None = None) -> dict: 326 if not config: 327 config = CONFIG 328 329 pool_options = False 330 use_pool = config.get_bool("postgresql.use_pool", False) 331 if use_pool: 332 pool_options = config.get_dict_from_b64_json("postgresql.pool_options", True) 333 if not pool_options: 334 pool_options = True 335 # FIXME: Temporarily force pool to be deactivated. 336 # See https://github.com/goauthentik/authentik/issues/14320 337 pool_options = False 338 339 conn_options = config.get_dict_from_b64_json("postgresql.conn_options", default={}) 340 341 db = { 342 "default": { 343 "ENGINE": "psqlextra.backend", 344 "HOST": config.get("postgresql.host"), 345 "NAME": config.get("postgresql.name"), 346 "USER": config.get("postgresql.user"), 347 "PASSWORD": config.get("postgresql.password"), 348 "PORT": config.get("postgresql.port"), 349 "OPTIONS": { 350 "sslmode": config.get("postgresql.sslmode"), 351 "sslrootcert": config.get("postgresql.sslrootcert"), 352 "sslcert": config.get("postgresql.sslcert"), 353 "sslkey": config.get("postgresql.sslkey"), 354 "pool": pool_options, 355 **conn_options, 356 }, 357 "CONN_MAX_AGE": config.get_optional_int("postgresql.conn_max_age", 0), 358 "CONN_HEALTH_CHECKS": config.get_bool("postgresql.conn_health_checks", False), 359 "DISABLE_SERVER_SIDE_CURSORS": config.get_bool( 360 "postgresql.disable_server_side_cursors", False 361 ), 362 "TEST": { 363 "NAME": config.get("postgresql.test.name"), 364 }, 365 } 366 } 367 368 conn_max_age = config.get_optional_int("postgresql.conn_max_age", UNSET) 369 disable_server_side_cursors = config.get_bool("postgresql.disable_server_side_cursors", UNSET) 370 if config.get_bool("postgresql.use_pgpool", False): 371 db["default"]["DISABLE_SERVER_SIDE_CURSORS"] = True 372 if disable_server_side_cursors is not UNSET: 373 db["default"]["DISABLE_SERVER_SIDE_CURSORS"] = disable_server_side_cursors 374 375 if config.get_bool("postgresql.use_pgbouncer", False): 376 # https://docs.djangoproject.com/en/4.0/ref/databases/#transaction-pooling-server-side-cursors 377 db["default"]["DISABLE_SERVER_SIDE_CURSORS"] = True 378 # https://docs.djangoproject.com/en/4.0/ref/databases/#persistent-connections 379 db["default"]["CONN_MAX_AGE"] = None # persistent 380 if disable_server_side_cursors is not UNSET: 381 db["default"]["DISABLE_SERVER_SIDE_CURSORS"] = disable_server_side_cursors 382 if conn_max_age is not UNSET: 383 db["default"]["CONN_MAX_AGE"] = conn_max_age 384 385 all_replica_conn_options = config.get_dict_from_b64_json( 386 "postgresql.replica_conn_options", 387 default={}, 388 ) 389 390 for replica in config.get_keys("postgresql.read_replicas"): 391 _database = deepcopy(db["default"]) 392 393 for setting, current_value in db["default"].items(): 394 if isinstance(current_value, dict): 395 continue 396 override = config.get( 397 f"postgresql.read_replicas.{replica}.{setting.lower()}", default=UNSET 398 ) 399 if override is not UNSET: 400 _database[setting] = override 401 402 for option in conn_options.keys(): 403 _database["OPTIONS"].pop(option, None) 404 405 for setting in db["default"]["OPTIONS"].keys(): 406 override = config.get( 407 f"postgresql.read_replicas.{replica}.{setting.lower()}", default=UNSET 408 ) 409 if override is not UNSET: 410 _database["OPTIONS"][setting] = override 411 412 _database["OPTIONS"].update(all_replica_conn_options) 413 replica_conn_options = config.get_dict_from_b64_json( 414 f"postgresql.read_replicas.{replica}.conn_options", default={} 415 ) 416 _database["OPTIONS"].update(replica_conn_options) 417 418 db[f"replica_{replica}"] = _database 419 return db 420 421 422if __name__ == "__main__": 423 if len(argv) < 2: # noqa: PLR2004 424 print(dumps(CONFIG.raw, indent=4, cls=AttrEncoder)) 425 else: 426 for arg in argv[1:]: 427 print(CONFIG.get(arg))
39@dataclass(slots=True) 40class Attr: 41 """Single configuration attribute""" 42 43 class Source(Enum): 44 """Sources a configuration attribute can come from, determines what should be done with 45 Attr.source (and if it's set at all)""" 46 47 UNSPECIFIED = "unspecified" 48 ENV = "env" 49 CONFIG_FILE = "config_file" 50 URI = "uri" 51 52 value: Any 53 54 source_type: Source = field(default=Source.UNSPECIFIED) 55 56 # depending on source_type, might contain the environment variable or the path 57 # to the config file containing this change or the file containing this value 58 source: str | None = field(default=None) 59 60 def __post_init__(self): 61 if isinstance(self.value, Attr): 62 raise RuntimeError(f"config Attr with nested Attr for source {self.source}")
Single configuration attribute
43 class Source(Enum): 44 """Sources a configuration attribute can come from, determines what should be done with 45 Attr.source (and if it's set at all)""" 46 47 UNSPECIFIED = "unspecified" 48 ENV = "env" 49 CONFIG_FILE = "config_file" 50 URI = "uri"
Sources a configuration attribute can come from, determines what should be done with Attr.source (and if it's set at all)
65class AttrEncoder(JSONEncoder): 66 """JSON encoder that can deal with `Attr` classes""" 67 68 def default(self, o: Any) -> Any: 69 if isinstance(o, Attr): 70 return o.value 71 return super().default(o)
JSON encoder that can deal with Attr classes
68 def default(self, o: Any) -> Any: 69 if isinstance(o, Attr): 70 return o.value 71 return super().default(o)
Implement this method in a subclass such that it returns
a serializable object for o, or calls the base implementation
(to raise a TypeError).
For example, to support arbitrary iterators, you could implement default like this::
def default(self, o):
try:
iterable = iter(o)
except TypeError:
pass
else:
return list(iterable)
# Let the base class default method raise the TypeError
return super().default(o)
Used to test whether configuration key has not been set.
78class ConfigLoader: 79 """Search through SEARCH_PATHS and load configuration. Environment variables starting with 80 `ENV_PREFIX` are also applied. 81 82 A variable like AUTHENTIK_POSTGRESQL__HOST would translate to postgresql.host""" 83 84 deprecations: dict[tuple[str, str], str] = {} 85 86 def __init__(self, **kwargs): 87 super().__init__() 88 self.__config = {} 89 base_dir = Path(__file__).parent.joinpath(Path("../..")).resolve() 90 for _path in SEARCH_PATHS: 91 path = Path(_path) 92 # Check if path is relative, and if so join with base_dir 93 if not path.is_absolute(): 94 path = base_dir / path 95 if path.is_file() and path.exists(): 96 # Path is an existing file, so we just read it and update our config with it 97 self.update_from_file(path) 98 elif path.is_dir() and path.exists(): 99 # Path is an existing dir, so we try to read the env config from it 100 env_paths = [ 101 path / Path(ENVIRONMENT + ".yml"), 102 path / Path(ENVIRONMENT + ".env.yml"), 103 path / Path(ENVIRONMENT + ".yaml"), 104 path / Path(ENVIRONMENT + ".env.yaml"), 105 ] 106 for env_file in env_paths: 107 if env_file.is_file() and env_file.exists(): 108 # Update config with env file 109 self.update_from_file(env_file) 110 self.update_from_env() 111 self.update(self.__config, kwargs) 112 self.deprecations = self.check_deprecations() 113 114 def check_deprecations(self) -> dict[str, str]: 115 """Warn if any deprecated configuration options are used""" 116 117 def _pop_deprecated_key(current_obj, dot_parts, index): 118 """Recursive function to remove deprecated keys in configuration""" 119 dot_part = dot_parts[index] 120 if index == len(dot_parts) - 1: 121 return current_obj.pop(dot_part) 122 value = _pop_deprecated_key(current_obj[dot_part], dot_parts, index + 1) 123 if not current_obj[dot_part]: 124 current_obj.pop(dot_part) 125 return value 126 127 deprecation_replacements = {} 128 for deprecation, replacement in DEPRECATIONS.items(): 129 if self.get(deprecation, default=UNSET) is UNSET: 130 continue 131 message = ( 132 f"'{deprecation}' has been deprecated in favor of '{replacement}'! " 133 + "Please update your configuration." 134 ) 135 self.log( 136 "warning", 137 message, 138 ) 139 deprecation_replacements[(deprecation, replacement)] = message 140 141 deprecated_attr = _pop_deprecated_key(self.__config, deprecation.split("."), 0) 142 self.set(replacement, deprecated_attr) 143 return deprecation_replacements 144 145 def log(self, level: str, message: str, **kwargs): 146 """Custom Log method, we want to ensure ConfigLoader always logs JSON even when 147 'structlog' or 'logging' hasn't been configured yet.""" 148 output = { 149 "event": message, 150 "level": level, 151 "logger": self.__class__.__module__, 152 "timestamp": time(), 153 } 154 output.update(kwargs) 155 print(dumps(output), file=stderr) 156 157 def update(self, root: dict[str, Any], updatee: dict[str, Any]) -> dict[str, Any]: 158 """Recursively update dictionary""" 159 for key, raw_value in updatee.items(): 160 if isinstance(raw_value, Mapping): 161 root[key] = self.update(root.get(key, {}), raw_value) 162 else: 163 if isinstance(raw_value, str): 164 value = self.parse_uri(raw_value) 165 elif isinstance(raw_value, Attr) and isinstance(raw_value.value, str): 166 value = self.parse_uri(raw_value.value) 167 elif not isinstance(raw_value, Attr): 168 value = Attr(raw_value) 169 else: 170 value = raw_value 171 root[key] = value 172 return root 173 174 def refresh(self, key: str, default=None, sep=".") -> Any: 175 """Update a single value""" 176 attr: Attr = get_path_from_dict(self.raw, key, sep=sep, default=Attr(default)) 177 if attr.source_type != Attr.Source.URI: 178 return attr.value 179 attr.value = self.parse_uri(attr.source).value 180 return attr.value 181 182 def parse_uri(self, value: str) -> Attr: 183 """Parse string values which start with a URI""" 184 url = urlparse(value) 185 parsed_value = value 186 if url.scheme == "env": 187 parsed_value = os.getenv(url.netloc, url.query) 188 if url.scheme == "file": 189 try: 190 with open(url.path, encoding="utf8") as _file: 191 parsed_value = _file.read().strip() 192 except OSError as exc: 193 self.log("error", f"Failed to read config value from {url.path}: {exc}") 194 parsed_value = url.query 195 return Attr(parsed_value, Attr.Source.URI, value) 196 197 def update_from_file(self, path: Path): 198 """Update config from file contents""" 199 try: 200 with open(path, encoding="utf8") as file: 201 try: 202 self.update(self.__config, yaml.safe_load(file)) 203 self.log("debug", "Loaded config", file=str(path)) 204 except yaml.YAMLError as exc: 205 raise ImproperlyConfigured from exc 206 except PermissionError as exc: 207 self.log( 208 "warning", 209 "Permission denied while reading file", 210 path=path, 211 error=str(exc), 212 ) 213 214 def update_from_dict(self, update: dict): 215 """Update config from dict""" 216 self.__config.update(update) 217 218 def update_from_env(self): 219 """Check environment variables""" 220 outer = {} 221 idx = 0 222 for key, value in os.environ.items(): 223 if not key.startswith(ENV_PREFIX): 224 continue 225 relative_key = key.replace(f"{ENV_PREFIX}_", "", 1).replace("__", ".").lower() 226 # Check if the value is json, and try to load it 227 try: 228 value = loads(value) # noqa: PLW2901 229 except JSONDecodeError: 230 pass 231 attr_value = Attr(value, Attr.Source.ENV, relative_key) 232 set_path_in_dict(outer, relative_key, attr_value) 233 idx += 1 234 if idx > 0: 235 self.log("debug", "Loaded environment variables", count=idx) 236 self.update(self.__config, outer) 237 238 @contextmanager 239 def patch(self, path: str, value: Any): 240 """Context manager for unittests to patch a value""" 241 original_value = self.get(path, UNSET) 242 self.set(path, value) 243 try: 244 yield 245 finally: 246 if original_value is not UNSET: 247 self.set(path, original_value) 248 else: 249 self.delete(path) 250 251 @property 252 def raw(self) -> dict: 253 """Get raw config dictionary""" 254 return self.__config 255 256 def get(self, path: str, default=None, sep=".") -> Any: 257 """Access attribute by using yaml path""" 258 # Walk sub_dicts before parsing path 259 root = self.raw 260 # Walk each component of the path 261 attr: Attr = get_path_from_dict(root, path, sep=sep, default=Attr(default)) 262 return attr.value 263 264 def get_int(self, path: str, default=0) -> int: 265 """Wrapper for get that converts value into int""" 266 try: 267 return int(self.get(path, default)) 268 except ValueError as exc: 269 self.log("warning", "Failed to parse config as int", path=path, exc=str(exc)) 270 return default 271 272 def get_optional_int(self, path: str, default=None) -> int | None: 273 """Wrapper for get that converts value into int or None if set""" 274 value = self.get(path, UNSET) 275 if value is UNSET: 276 return default 277 try: 278 return int(value) 279 except (ValueError, TypeError) as exc: 280 if value is None or (isinstance(value, str) and value.lower() == "null"): 281 return None 282 self.log("warning", "Failed to parse config as int", path=path, exc=str(exc)) 283 return default 284 285 def get_bool(self, path: str, default=False) -> bool: 286 """Wrapper for get that converts value into boolean""" 287 value = self.get(path, UNSET) 288 if value is UNSET: 289 return default 290 return str(self.get(path)).lower() == "true" 291 292 def get_keys(self, path: str, sep=".") -> list[str]: 293 """List attribute keys by using yaml path""" 294 root = self.raw 295 attr: Attr = get_path_from_dict(root, path, sep=sep, default=Attr({})) 296 return attr.keys() 297 298 def get_dict_from_b64_json(self, path: str, default=None) -> dict: 299 """Wrapper for get that converts value from Base64 encoded string into dictionary""" 300 config_value = self.get(path) 301 if config_value is None: 302 return {} 303 try: 304 b64decoded_str = base64.b64decode(config_value).decode("utf-8") 305 return json.loads(b64decoded_str) 306 except (JSONDecodeError, TypeError, ValueError) as exc: 307 self.log( 308 "warning", 309 f"Ignored invalid configuration for '{path}' due to exception: {str(exc)}", 310 ) 311 return default if isinstance(default, dict) else {} 312 313 def set(self, path: str, value: Any, sep="."): 314 """Set value using same syntax as get()""" 315 if not isinstance(value, Attr): 316 value = Attr(value) 317 set_path_in_dict(self.raw, path, value, sep=sep) 318 319 def delete(self, path: str, sep="."): 320 delete_path_in_dict(self.raw, path, sep=sep)
Search through SEARCH_PATHS and load configuration. Environment variables starting with
ENV_PREFIX are also applied.
A variable like AUTHENTIK_POSTGRESQL__HOST would translate to postgresql.host
86 def __init__(self, **kwargs): 87 super().__init__() 88 self.__config = {} 89 base_dir = Path(__file__).parent.joinpath(Path("../..")).resolve() 90 for _path in SEARCH_PATHS: 91 path = Path(_path) 92 # Check if path is relative, and if so join with base_dir 93 if not path.is_absolute(): 94 path = base_dir / path 95 if path.is_file() and path.exists(): 96 # Path is an existing file, so we just read it and update our config with it 97 self.update_from_file(path) 98 elif path.is_dir() and path.exists(): 99 # Path is an existing dir, so we try to read the env config from it 100 env_paths = [ 101 path / Path(ENVIRONMENT + ".yml"), 102 path / Path(ENVIRONMENT + ".env.yml"), 103 path / Path(ENVIRONMENT + ".yaml"), 104 path / Path(ENVIRONMENT + ".env.yaml"), 105 ] 106 for env_file in env_paths: 107 if env_file.is_file() and env_file.exists(): 108 # Update config with env file 109 self.update_from_file(env_file) 110 self.update_from_env() 111 self.update(self.__config, kwargs) 112 self.deprecations = self.check_deprecations()
114 def check_deprecations(self) -> dict[str, str]: 115 """Warn if any deprecated configuration options are used""" 116 117 def _pop_deprecated_key(current_obj, dot_parts, index): 118 """Recursive function to remove deprecated keys in configuration""" 119 dot_part = dot_parts[index] 120 if index == len(dot_parts) - 1: 121 return current_obj.pop(dot_part) 122 value = _pop_deprecated_key(current_obj[dot_part], dot_parts, index + 1) 123 if not current_obj[dot_part]: 124 current_obj.pop(dot_part) 125 return value 126 127 deprecation_replacements = {} 128 for deprecation, replacement in DEPRECATIONS.items(): 129 if self.get(deprecation, default=UNSET) is UNSET: 130 continue 131 message = ( 132 f"'{deprecation}' has been deprecated in favor of '{replacement}'! " 133 + "Please update your configuration." 134 ) 135 self.log( 136 "warning", 137 message, 138 ) 139 deprecation_replacements[(deprecation, replacement)] = message 140 141 deprecated_attr = _pop_deprecated_key(self.__config, deprecation.split("."), 0) 142 self.set(replacement, deprecated_attr) 143 return deprecation_replacements
Warn if any deprecated configuration options are used
145 def log(self, level: str, message: str, **kwargs): 146 """Custom Log method, we want to ensure ConfigLoader always logs JSON even when 147 'structlog' or 'logging' hasn't been configured yet.""" 148 output = { 149 "event": message, 150 "level": level, 151 "logger": self.__class__.__module__, 152 "timestamp": time(), 153 } 154 output.update(kwargs) 155 print(dumps(output), file=stderr)
Custom Log method, we want to ensure ConfigLoader always logs JSON even when 'structlog' or 'logging' hasn't been configured yet.
157 def update(self, root: dict[str, Any], updatee: dict[str, Any]) -> dict[str, Any]: 158 """Recursively update dictionary""" 159 for key, raw_value in updatee.items(): 160 if isinstance(raw_value, Mapping): 161 root[key] = self.update(root.get(key, {}), raw_value) 162 else: 163 if isinstance(raw_value, str): 164 value = self.parse_uri(raw_value) 165 elif isinstance(raw_value, Attr) and isinstance(raw_value.value, str): 166 value = self.parse_uri(raw_value.value) 167 elif not isinstance(raw_value, Attr): 168 value = Attr(raw_value) 169 else: 170 value = raw_value 171 root[key] = value 172 return root
Recursively update dictionary
174 def refresh(self, key: str, default=None, sep=".") -> Any: 175 """Update a single value""" 176 attr: Attr = get_path_from_dict(self.raw, key, sep=sep, default=Attr(default)) 177 if attr.source_type != Attr.Source.URI: 178 return attr.value 179 attr.value = self.parse_uri(attr.source).value 180 return attr.value
Update a single value
182 def parse_uri(self, value: str) -> Attr: 183 """Parse string values which start with a URI""" 184 url = urlparse(value) 185 parsed_value = value 186 if url.scheme == "env": 187 parsed_value = os.getenv(url.netloc, url.query) 188 if url.scheme == "file": 189 try: 190 with open(url.path, encoding="utf8") as _file: 191 parsed_value = _file.read().strip() 192 except OSError as exc: 193 self.log("error", f"Failed to read config value from {url.path}: {exc}") 194 parsed_value = url.query 195 return Attr(parsed_value, Attr.Source.URI, value)
Parse string values which start with a URI
197 def update_from_file(self, path: Path): 198 """Update config from file contents""" 199 try: 200 with open(path, encoding="utf8") as file: 201 try: 202 self.update(self.__config, yaml.safe_load(file)) 203 self.log("debug", "Loaded config", file=str(path)) 204 except yaml.YAMLError as exc: 205 raise ImproperlyConfigured from exc 206 except PermissionError as exc: 207 self.log( 208 "warning", 209 "Permission denied while reading file", 210 path=path, 211 error=str(exc), 212 )
Update config from file contents
214 def update_from_dict(self, update: dict): 215 """Update config from dict""" 216 self.__config.update(update)
Update config from dict
218 def update_from_env(self): 219 """Check environment variables""" 220 outer = {} 221 idx = 0 222 for key, value in os.environ.items(): 223 if not key.startswith(ENV_PREFIX): 224 continue 225 relative_key = key.replace(f"{ENV_PREFIX}_", "", 1).replace("__", ".").lower() 226 # Check if the value is json, and try to load it 227 try: 228 value = loads(value) # noqa: PLW2901 229 except JSONDecodeError: 230 pass 231 attr_value = Attr(value, Attr.Source.ENV, relative_key) 232 set_path_in_dict(outer, relative_key, attr_value) 233 idx += 1 234 if idx > 0: 235 self.log("debug", "Loaded environment variables", count=idx) 236 self.update(self.__config, outer)
Check environment variables
238 @contextmanager 239 def patch(self, path: str, value: Any): 240 """Context manager for unittests to patch a value""" 241 original_value = self.get(path, UNSET) 242 self.set(path, value) 243 try: 244 yield 245 finally: 246 if original_value is not UNSET: 247 self.set(path, original_value) 248 else: 249 self.delete(path)
Context manager for unittests to patch a value
251 @property 252 def raw(self) -> dict: 253 """Get raw config dictionary""" 254 return self.__config
Get raw config dictionary
256 def get(self, path: str, default=None, sep=".") -> Any: 257 """Access attribute by using yaml path""" 258 # Walk sub_dicts before parsing path 259 root = self.raw 260 # Walk each component of the path 261 attr: Attr = get_path_from_dict(root, path, sep=sep, default=Attr(default)) 262 return attr.value
Access attribute by using yaml path
264 def get_int(self, path: str, default=0) -> int: 265 """Wrapper for get that converts value into int""" 266 try: 267 return int(self.get(path, default)) 268 except ValueError as exc: 269 self.log("warning", "Failed to parse config as int", path=path, exc=str(exc)) 270 return default
Wrapper for get that converts value into int
272 def get_optional_int(self, path: str, default=None) -> int | None: 273 """Wrapper for get that converts value into int or None if set""" 274 value = self.get(path, UNSET) 275 if value is UNSET: 276 return default 277 try: 278 return int(value) 279 except (ValueError, TypeError) as exc: 280 if value is None or (isinstance(value, str) and value.lower() == "null"): 281 return None 282 self.log("warning", "Failed to parse config as int", path=path, exc=str(exc)) 283 return default
Wrapper for get that converts value into int or None if set
285 def get_bool(self, path: str, default=False) -> bool: 286 """Wrapper for get that converts value into boolean""" 287 value = self.get(path, UNSET) 288 if value is UNSET: 289 return default 290 return str(self.get(path)).lower() == "true"
Wrapper for get that converts value into boolean
292 def get_keys(self, path: str, sep=".") -> list[str]: 293 """List attribute keys by using yaml path""" 294 root = self.raw 295 attr: Attr = get_path_from_dict(root, path, sep=sep, default=Attr({})) 296 return attr.keys()
List attribute keys by using yaml path
298 def get_dict_from_b64_json(self, path: str, default=None) -> dict: 299 """Wrapper for get that converts value from Base64 encoded string into dictionary""" 300 config_value = self.get(path) 301 if config_value is None: 302 return {} 303 try: 304 b64decoded_str = base64.b64decode(config_value).decode("utf-8") 305 return json.loads(b64decoded_str) 306 except (JSONDecodeError, TypeError, ValueError) as exc: 307 self.log( 308 "warning", 309 f"Ignored invalid configuration for '{path}' due to exception: {str(exc)}", 310 ) 311 return default if isinstance(default, dict) else {}
Wrapper for get that converts value from Base64 encoded string into dictionary
326def django_db_config(config: ConfigLoader | None = None) -> dict: 327 if not config: 328 config = CONFIG 329 330 pool_options = False 331 use_pool = config.get_bool("postgresql.use_pool", False) 332 if use_pool: 333 pool_options = config.get_dict_from_b64_json("postgresql.pool_options", True) 334 if not pool_options: 335 pool_options = True 336 # FIXME: Temporarily force pool to be deactivated. 337 # See https://github.com/goauthentik/authentik/issues/14320 338 pool_options = False 339 340 conn_options = config.get_dict_from_b64_json("postgresql.conn_options", default={}) 341 342 db = { 343 "default": { 344 "ENGINE": "psqlextra.backend", 345 "HOST": config.get("postgresql.host"), 346 "NAME": config.get("postgresql.name"), 347 "USER": config.get("postgresql.user"), 348 "PASSWORD": config.get("postgresql.password"), 349 "PORT": config.get("postgresql.port"), 350 "OPTIONS": { 351 "sslmode": config.get("postgresql.sslmode"), 352 "sslrootcert": config.get("postgresql.sslrootcert"), 353 "sslcert": config.get("postgresql.sslcert"), 354 "sslkey": config.get("postgresql.sslkey"), 355 "pool": pool_options, 356 **conn_options, 357 }, 358 "CONN_MAX_AGE": config.get_optional_int("postgresql.conn_max_age", 0), 359 "CONN_HEALTH_CHECKS": config.get_bool("postgresql.conn_health_checks", False), 360 "DISABLE_SERVER_SIDE_CURSORS": config.get_bool( 361 "postgresql.disable_server_side_cursors", False 362 ), 363 "TEST": { 364 "NAME": config.get("postgresql.test.name"), 365 }, 366 } 367 } 368 369 conn_max_age = config.get_optional_int("postgresql.conn_max_age", UNSET) 370 disable_server_side_cursors = config.get_bool("postgresql.disable_server_side_cursors", UNSET) 371 if config.get_bool("postgresql.use_pgpool", False): 372 db["default"]["DISABLE_SERVER_SIDE_CURSORS"] = True 373 if disable_server_side_cursors is not UNSET: 374 db["default"]["DISABLE_SERVER_SIDE_CURSORS"] = disable_server_side_cursors 375 376 if config.get_bool("postgresql.use_pgbouncer", False): 377 # https://docs.djangoproject.com/en/4.0/ref/databases/#transaction-pooling-server-side-cursors 378 db["default"]["DISABLE_SERVER_SIDE_CURSORS"] = True 379 # https://docs.djangoproject.com/en/4.0/ref/databases/#persistent-connections 380 db["default"]["CONN_MAX_AGE"] = None # persistent 381 if disable_server_side_cursors is not UNSET: 382 db["default"]["DISABLE_SERVER_SIDE_CURSORS"] = disable_server_side_cursors 383 if conn_max_age is not UNSET: 384 db["default"]["CONN_MAX_AGE"] = conn_max_age 385 386 all_replica_conn_options = config.get_dict_from_b64_json( 387 "postgresql.replica_conn_options", 388 default={}, 389 ) 390 391 for replica in config.get_keys("postgresql.read_replicas"): 392 _database = deepcopy(db["default"]) 393 394 for setting, current_value in db["default"].items(): 395 if isinstance(current_value, dict): 396 continue 397 override = config.get( 398 f"postgresql.read_replicas.{replica}.{setting.lower()}", default=UNSET 399 ) 400 if override is not UNSET: 401 _database[setting] = override 402 403 for option in conn_options.keys(): 404 _database["OPTIONS"].pop(option, None) 405 406 for setting in db["default"]["OPTIONS"].keys(): 407 override = config.get( 408 f"postgresql.read_replicas.{replica}.{setting.lower()}", default=UNSET 409 ) 410 if override is not UNSET: 411 _database["OPTIONS"][setting] = override 412 413 _database["OPTIONS"].update(all_replica_conn_options) 414 replica_conn_options = config.get_dict_from_b64_json( 415 f"postgresql.read_replicas.{replica}.conn_options", default={} 416 ) 417 _database["OPTIONS"].update(replica_conn_options) 418 419 db[f"replica_{replica}"] = _database 420 return db