authentik.lib.config

authentik core config loader

  1"""authentik core config loader"""
  2
  3import base64
  4import json
  5import os
  6from collections.abc import Mapping
  7from contextlib import contextmanager
  8from copy import deepcopy
  9from dataclasses import dataclass, field
 10from enum import Enum
 11from glob import glob
 12from json import JSONEncoder, dumps, loads
 13from json.decoder import JSONDecodeError
 14from pathlib import Path
 15from sys import argv, stderr
 16from time import time
 17from typing import Any
 18from urllib.parse import urlparse
 19
 20import yaml
 21from django.conf import ImproperlyConfigured
 22
 23from authentik.lib.utils.dict import delete_path_in_dict, get_path_from_dict, set_path_in_dict
 24
 25SEARCH_PATHS = ["authentik/lib/default.yml", "/etc/authentik/config.yml", ""] + glob(
 26    "/etc/authentik/config.d/*.yml", recursive=True
 27)
 28ENV_PREFIX = "AUTHENTIK"
 29ENVIRONMENT = os.getenv(f"{ENV_PREFIX}_ENV", "local")
 30
 31# Old key -> new key
 32DEPRECATIONS = {
 33    "geoip": "events.context_processors.geoip",
 34    "worker.concurrency": "worker.threads",
 35}
 36
 37
 38@dataclass(slots=True)
 39class Attr:
 40    """Single configuration attribute"""
 41
 42    class Source(Enum):
 43        """Sources a configuration attribute can come from, determines what should be done with
 44        Attr.source (and if it's set at all)"""
 45
 46        UNSPECIFIED = "unspecified"
 47        ENV = "env"
 48        CONFIG_FILE = "config_file"
 49        URI = "uri"
 50
 51    value: Any
 52
 53    source_type: Source = field(default=Source.UNSPECIFIED)
 54
 55    # depending on source_type, might contain the environment variable or the path
 56    # to the config file containing this change or the file containing this value
 57    source: str | None = field(default=None)
 58
 59    def __post_init__(self):
 60        if isinstance(self.value, Attr):
 61            raise RuntimeError(f"config Attr with nested Attr for source {self.source}")
 62
 63
 64class AttrEncoder(JSONEncoder):
 65    """JSON encoder that can deal with `Attr` classes"""
 66
 67    def default(self, o: Any) -> Any:
 68        if isinstance(o, Attr):
 69            return o.value
 70        return super().default(o)
 71
 72
 73class UNSET:
 74    """Used to test whether configuration key has not been set."""
 75
 76
 77class ConfigLoader:
 78    """Search through SEARCH_PATHS and load configuration. Environment variables starting with
 79    `ENV_PREFIX` are also applied.
 80
 81    A variable like AUTHENTIK_POSTGRESQL__HOST would translate to postgresql.host"""
 82
 83    deprecations: dict[tuple[str, str], str] = {}
 84
 85    def __init__(self, **kwargs):
 86        super().__init__()
 87        self.__config = {}
 88        base_dir = Path(__file__).parent.joinpath(Path("../..")).resolve()
 89        for _path in SEARCH_PATHS:
 90            path = Path(_path)
 91            # Check if path is relative, and if so join with base_dir
 92            if not path.is_absolute():
 93                path = base_dir / path
 94            if path.is_file() and path.exists():
 95                # Path is an existing file, so we just read it and update our config with it
 96                self.update_from_file(path)
 97            elif path.is_dir() and path.exists():
 98                # Path is an existing dir, so we try to read the env config from it
 99                env_paths = [
100                    path / Path(ENVIRONMENT + ".yml"),
101                    path / Path(ENVIRONMENT + ".env.yml"),
102                    path / Path(ENVIRONMENT + ".yaml"),
103                    path / Path(ENVIRONMENT + ".env.yaml"),
104                ]
105                for env_file in env_paths:
106                    if env_file.is_file() and env_file.exists():
107                        # Update config with env file
108                        self.update_from_file(env_file)
109        self.update_from_env()
110        self.update(self.__config, kwargs)
111        self.deprecations = self.check_deprecations()
112
113    def check_deprecations(self) -> dict[str, str]:
114        """Warn if any deprecated configuration options are used"""
115
116        def _pop_deprecated_key(current_obj, dot_parts, index):
117            """Recursive function to remove deprecated keys in configuration"""
118            dot_part = dot_parts[index]
119            if index == len(dot_parts) - 1:
120                return current_obj.pop(dot_part)
121            value = _pop_deprecated_key(current_obj[dot_part], dot_parts, index + 1)
122            if not current_obj[dot_part]:
123                current_obj.pop(dot_part)
124            return value
125
126        deprecation_replacements = {}
127        for deprecation, replacement in DEPRECATIONS.items():
128            if self.get(deprecation, default=UNSET) is UNSET:
129                continue
130            message = (
131                f"'{deprecation}' has been deprecated in favor of '{replacement}'! "
132                + "Please update your configuration."
133            )
134            self.log(
135                "warning",
136                message,
137            )
138            deprecation_replacements[(deprecation, replacement)] = message
139
140            deprecated_attr = _pop_deprecated_key(self.__config, deprecation.split("."), 0)
141            self.set(replacement, deprecated_attr)
142        return deprecation_replacements
143
144    def log(self, level: str, message: str, **kwargs):
145        """Custom Log method, we want to ensure ConfigLoader always logs JSON even when
146        'structlog' or 'logging' hasn't been configured yet."""
147        output = {
148            "event": message,
149            "level": level,
150            "logger": self.__class__.__module__,
151            "timestamp": time(),
152        }
153        output.update(kwargs)
154        print(dumps(output), file=stderr)
155
156    def update(self, root: dict[str, Any], updatee: dict[str, Any]) -> dict[str, Any]:
157        """Recursively update dictionary"""
158        for key, raw_value in updatee.items():
159            if isinstance(raw_value, Mapping):
160                root[key] = self.update(root.get(key, {}), raw_value)
161            else:
162                if isinstance(raw_value, str):
163                    value = self.parse_uri(raw_value)
164                elif isinstance(raw_value, Attr) and isinstance(raw_value.value, str):
165                    value = self.parse_uri(raw_value.value)
166                elif not isinstance(raw_value, Attr):
167                    value = Attr(raw_value)
168                else:
169                    value = raw_value
170                root[key] = value
171        return root
172
173    def refresh(self, key: str, default=None, sep=".") -> Any:
174        """Update a single value"""
175        attr: Attr = get_path_from_dict(self.raw, key, sep=sep, default=Attr(default))
176        if attr.source_type != Attr.Source.URI:
177            return attr.value
178        attr.value = self.parse_uri(attr.source).value
179        return attr.value
180
181    def parse_uri(self, value: str) -> Attr:
182        """Parse string values which start with a URI"""
183        url = urlparse(value)
184        parsed_value = value
185        if url.scheme == "env":
186            parsed_value = os.getenv(url.netloc, url.query)
187        if url.scheme == "file":
188            try:
189                with open(url.path, encoding="utf8") as _file:
190                    parsed_value = _file.read().strip()
191            except OSError as exc:
192                self.log("error", f"Failed to read config value from {url.path}: {exc}")
193                parsed_value = url.query
194        return Attr(parsed_value, Attr.Source.URI, value)
195
196    def update_from_file(self, path: Path):
197        """Update config from file contents"""
198        try:
199            with open(path, encoding="utf8") as file:
200                try:
201                    self.update(self.__config, yaml.safe_load(file))
202                    self.log("debug", "Loaded config", file=str(path))
203                except yaml.YAMLError as exc:
204                    raise ImproperlyConfigured from exc
205        except PermissionError as exc:
206            self.log(
207                "warning",
208                "Permission denied while reading file",
209                path=path,
210                error=str(exc),
211            )
212
213    def update_from_dict(self, update: dict):
214        """Update config from dict"""
215        self.__config.update(update)
216
217    def update_from_env(self):
218        """Check environment variables"""
219        outer = {}
220        idx = 0
221        for key, value in os.environ.items():
222            if not key.startswith(ENV_PREFIX):
223                continue
224            relative_key = key.replace(f"{ENV_PREFIX}_", "", 1).replace("__", ".").lower()
225            # Check if the value is json, and try to load it
226            try:
227                value = loads(value)  # noqa: PLW2901
228            except JSONDecodeError:
229                pass
230            attr_value = Attr(value, Attr.Source.ENV, relative_key)
231            set_path_in_dict(outer, relative_key, attr_value)
232            idx += 1
233        if idx > 0:
234            self.log("debug", "Loaded environment variables", count=idx)
235            self.update(self.__config, outer)
236
237    @contextmanager
238    def patch(self, path: str, value: Any):
239        """Context manager for unittests to patch a value"""
240        original_value = self.get(path, UNSET)
241        self.set(path, value)
242        try:
243            yield
244        finally:
245            if original_value is not UNSET:
246                self.set(path, original_value)
247            else:
248                self.delete(path)
249
250    @property
251    def raw(self) -> dict:
252        """Get raw config dictionary"""
253        return self.__config
254
255    def get(self, path: str, default=None, sep=".") -> Any:
256        """Access attribute by using yaml path"""
257        # Walk sub_dicts before parsing path
258        root = self.raw
259        # Walk each component of the path
260        attr: Attr = get_path_from_dict(root, path, sep=sep, default=Attr(default))
261        return attr.value
262
263    def get_int(self, path: str, default=0) -> int:
264        """Wrapper for get that converts value into int"""
265        try:
266            return int(self.get(path, default))
267        except ValueError as exc:
268            self.log("warning", "Failed to parse config as int", path=path, exc=str(exc))
269            return default
270
271    def get_optional_int(self, path: str, default=None) -> int | None:
272        """Wrapper for get that converts value into int or None if set"""
273        value = self.get(path, UNSET)
274        if value is UNSET:
275            return default
276        try:
277            return int(value)
278        except (ValueError, TypeError) as exc:
279            if value is None or (isinstance(value, str) and value.lower() == "null"):
280                return None
281            self.log("warning", "Failed to parse config as int", path=path, exc=str(exc))
282            return default
283
284    def get_bool(self, path: str, default=False) -> bool:
285        """Wrapper for get that converts value into boolean"""
286        value = self.get(path, UNSET)
287        if value is UNSET:
288            return default
289        return str(self.get(path)).lower() == "true"
290
291    def get_keys(self, path: str, sep=".") -> list[str]:
292        """List attribute keys by using yaml path"""
293        root = self.raw
294        attr: Attr = get_path_from_dict(root, path, sep=sep, default=Attr({}))
295        return attr.keys()
296
297    def get_dict_from_b64_json(self, path: str, default=None) -> dict:
298        """Wrapper for get that converts value from Base64 encoded string into dictionary"""
299        config_value = self.get(path)
300        if config_value is None:
301            return {}
302        try:
303            b64decoded_str = base64.b64decode(config_value).decode("utf-8")
304            return json.loads(b64decoded_str)
305        except (JSONDecodeError, TypeError, ValueError) as exc:
306            self.log(
307                "warning",
308                f"Ignored invalid configuration for '{path}' due to exception: {str(exc)}",
309            )
310            return default if isinstance(default, dict) else {}
311
312    def set(self, path: str, value: Any, sep="."):
313        """Set value using same syntax as get()"""
314        if not isinstance(value, Attr):
315            value = Attr(value)
316        set_path_in_dict(self.raw, path, value, sep=sep)
317
318    def delete(self, path: str, sep="."):
319        delete_path_in_dict(self.raw, path, sep=sep)
320
321
322CONFIG = ConfigLoader()
323
324
325def django_db_config(config: ConfigLoader | None = None) -> dict:
326    if not config:
327        config = CONFIG
328
329    pool_options = False
330    use_pool = config.get_bool("postgresql.use_pool", False)
331    if use_pool:
332        pool_options = config.get_dict_from_b64_json("postgresql.pool_options", True)
333        if not pool_options:
334            pool_options = True
335    # FIXME: Temporarily force pool to be deactivated.
336    # See https://github.com/goauthentik/authentik/issues/14320
337    pool_options = False
338
339    conn_options = config.get_dict_from_b64_json("postgresql.conn_options", default={})
340
341    db = {
342        "default": {
343            "ENGINE": "psqlextra.backend",
344            "HOST": config.get("postgresql.host"),
345            "NAME": config.get("postgresql.name"),
346            "USER": config.get("postgresql.user"),
347            "PASSWORD": config.get("postgresql.password"),
348            "PORT": config.get("postgresql.port"),
349            "OPTIONS": {
350                "sslmode": config.get("postgresql.sslmode"),
351                "sslrootcert": config.get("postgresql.sslrootcert"),
352                "sslcert": config.get("postgresql.sslcert"),
353                "sslkey": config.get("postgresql.sslkey"),
354                "pool": pool_options,
355                **conn_options,
356            },
357            "CONN_MAX_AGE": config.get_optional_int("postgresql.conn_max_age", 0),
358            "CONN_HEALTH_CHECKS": config.get_bool("postgresql.conn_health_checks", False),
359            "DISABLE_SERVER_SIDE_CURSORS": config.get_bool(
360                "postgresql.disable_server_side_cursors", False
361            ),
362            "TEST": {
363                "NAME": config.get("postgresql.test.name"),
364            },
365        }
366    }
367
368    conn_max_age = config.get_optional_int("postgresql.conn_max_age", UNSET)
369    disable_server_side_cursors = config.get_bool("postgresql.disable_server_side_cursors", UNSET)
370    if config.get_bool("postgresql.use_pgpool", False):
371        db["default"]["DISABLE_SERVER_SIDE_CURSORS"] = True
372        if disable_server_side_cursors is not UNSET:
373            db["default"]["DISABLE_SERVER_SIDE_CURSORS"] = disable_server_side_cursors
374
375    if config.get_bool("postgresql.use_pgbouncer", False):
376        # https://docs.djangoproject.com/en/4.0/ref/databases/#transaction-pooling-server-side-cursors
377        db["default"]["DISABLE_SERVER_SIDE_CURSORS"] = True
378        # https://docs.djangoproject.com/en/4.0/ref/databases/#persistent-connections
379        db["default"]["CONN_MAX_AGE"] = None  # persistent
380        if disable_server_side_cursors is not UNSET:
381            db["default"]["DISABLE_SERVER_SIDE_CURSORS"] = disable_server_side_cursors
382        if conn_max_age is not UNSET:
383            db["default"]["CONN_MAX_AGE"] = conn_max_age
384
385    all_replica_conn_options = config.get_dict_from_b64_json(
386        "postgresql.replica_conn_options",
387        default={},
388    )
389
390    for replica in config.get_keys("postgresql.read_replicas"):
391        _database = deepcopy(db["default"])
392
393        for setting, current_value in db["default"].items():
394            if isinstance(current_value, dict):
395                continue
396            override = config.get(
397                f"postgresql.read_replicas.{replica}.{setting.lower()}", default=UNSET
398            )
399            if override is not UNSET:
400                _database[setting] = override
401
402        for option in conn_options.keys():
403            _database["OPTIONS"].pop(option, None)
404
405        for setting in db["default"]["OPTIONS"].keys():
406            override = config.get(
407                f"postgresql.read_replicas.{replica}.{setting.lower()}", default=UNSET
408            )
409            if override is not UNSET:
410                _database["OPTIONS"][setting] = override
411
412        _database["OPTIONS"].update(all_replica_conn_options)
413        replica_conn_options = config.get_dict_from_b64_json(
414            f"postgresql.read_replicas.{replica}.conn_options", default={}
415        )
416        _database["OPTIONS"].update(replica_conn_options)
417
418        db[f"replica_{replica}"] = _database
419    return db
420
421
422if __name__ == "__main__":
423    if len(argv) < 2:  # noqa: PLR2004
424        print(dumps(CONFIG.raw, indent=4, cls=AttrEncoder))
425    else:
426        for arg in argv[1:]:
427            print(CONFIG.get(arg))
SEARCH_PATHS = ['authentik/lib/default.yml', '/etc/authentik/config.yml', '']
ENV_PREFIX = 'AUTHENTIK'
ENVIRONMENT = 'local'
DEPRECATIONS = {'geoip': 'events.context_processors.geoip', 'worker.concurrency': 'worker.threads'}
@dataclass(slots=True)
class Attr:
39@dataclass(slots=True)
40class Attr:
41    """Single configuration attribute"""
42
43    class Source(Enum):
44        """Sources a configuration attribute can come from, determines what should be done with
45        Attr.source (and if it's set at all)"""
46
47        UNSPECIFIED = "unspecified"
48        ENV = "env"
49        CONFIG_FILE = "config_file"
50        URI = "uri"
51
52    value: Any
53
54    source_type: Source = field(default=Source.UNSPECIFIED)
55
56    # depending on source_type, might contain the environment variable or the path
57    # to the config file containing this change or the file containing this value
58    source: str | None = field(default=None)
59
60    def __post_init__(self):
61        if isinstance(self.value, Attr):
62            raise RuntimeError(f"config Attr with nested Attr for source {self.source}")

Single configuration attribute

Attr( value: Any, source_type: Attr.Source = <Source.UNSPECIFIED: 'unspecified'>, source: str | None = None)
value: Any
source_type: Attr.Source
source: str | None
class Attr.Source(enum.Enum):
43    class Source(Enum):
44        """Sources a configuration attribute can come from, determines what should be done with
45        Attr.source (and if it's set at all)"""
46
47        UNSPECIFIED = "unspecified"
48        ENV = "env"
49        CONFIG_FILE = "config_file"
50        URI = "uri"

Sources a configuration attribute can come from, determines what should be done with Attr.source (and if it's set at all)

UNSPECIFIED = <Source.UNSPECIFIED: 'unspecified'>
ENV = <Source.ENV: 'env'>
CONFIG_FILE = <Source.CONFIG_FILE: 'config_file'>
URI = <Source.URI: 'uri'>
class AttrEncoder(json.encoder.JSONEncoder):
65class AttrEncoder(JSONEncoder):
66    """JSON encoder that can deal with `Attr` classes"""
67
68    def default(self, o: Any) -> Any:
69        if isinstance(o, Attr):
70            return o.value
71        return super().default(o)

JSON encoder that can deal with Attr classes

def default(self, o: Any) -> Any:
68    def default(self, o: Any) -> Any:
69        if isinstance(o, Attr):
70            return o.value
71        return super().default(o)

Implement this method in a subclass such that it returns a serializable object for o, or calls the base implementation (to raise a TypeError).

For example, to support arbitrary iterators, you could implement default like this::

def default(self, o):
    try:
        iterable = iter(o)
    except TypeError:
        pass
    else:
        return list(iterable)
    # Let the base class default method raise the TypeError
    return super().default(o)
class UNSET:
74class UNSET:
75    """Used to test whether configuration key has not been set."""

Used to test whether configuration key has not been set.

class ConfigLoader:
 78class ConfigLoader:
 79    """Search through SEARCH_PATHS and load configuration. Environment variables starting with
 80    `ENV_PREFIX` are also applied.
 81
 82    A variable like AUTHENTIK_POSTGRESQL__HOST would translate to postgresql.host"""
 83
 84    deprecations: dict[tuple[str, str], str] = {}
 85
 86    def __init__(self, **kwargs):
 87        super().__init__()
 88        self.__config = {}
 89        base_dir = Path(__file__).parent.joinpath(Path("../..")).resolve()
 90        for _path in SEARCH_PATHS:
 91            path = Path(_path)
 92            # Check if path is relative, and if so join with base_dir
 93            if not path.is_absolute():
 94                path = base_dir / path
 95            if path.is_file() and path.exists():
 96                # Path is an existing file, so we just read it and update our config with it
 97                self.update_from_file(path)
 98            elif path.is_dir() and path.exists():
 99                # Path is an existing dir, so we try to read the env config from it
100                env_paths = [
101                    path / Path(ENVIRONMENT + ".yml"),
102                    path / Path(ENVIRONMENT + ".env.yml"),
103                    path / Path(ENVIRONMENT + ".yaml"),
104                    path / Path(ENVIRONMENT + ".env.yaml"),
105                ]
106                for env_file in env_paths:
107                    if env_file.is_file() and env_file.exists():
108                        # Update config with env file
109                        self.update_from_file(env_file)
110        self.update_from_env()
111        self.update(self.__config, kwargs)
112        self.deprecations = self.check_deprecations()
113
114    def check_deprecations(self) -> dict[str, str]:
115        """Warn if any deprecated configuration options are used"""
116
117        def _pop_deprecated_key(current_obj, dot_parts, index):
118            """Recursive function to remove deprecated keys in configuration"""
119            dot_part = dot_parts[index]
120            if index == len(dot_parts) - 1:
121                return current_obj.pop(dot_part)
122            value = _pop_deprecated_key(current_obj[dot_part], dot_parts, index + 1)
123            if not current_obj[dot_part]:
124                current_obj.pop(dot_part)
125            return value
126
127        deprecation_replacements = {}
128        for deprecation, replacement in DEPRECATIONS.items():
129            if self.get(deprecation, default=UNSET) is UNSET:
130                continue
131            message = (
132                f"'{deprecation}' has been deprecated in favor of '{replacement}'! "
133                + "Please update your configuration."
134            )
135            self.log(
136                "warning",
137                message,
138            )
139            deprecation_replacements[(deprecation, replacement)] = message
140
141            deprecated_attr = _pop_deprecated_key(self.__config, deprecation.split("."), 0)
142            self.set(replacement, deprecated_attr)
143        return deprecation_replacements
144
145    def log(self, level: str, message: str, **kwargs):
146        """Custom Log method, we want to ensure ConfigLoader always logs JSON even when
147        'structlog' or 'logging' hasn't been configured yet."""
148        output = {
149            "event": message,
150            "level": level,
151            "logger": self.__class__.__module__,
152            "timestamp": time(),
153        }
154        output.update(kwargs)
155        print(dumps(output), file=stderr)
156
157    def update(self, root: dict[str, Any], updatee: dict[str, Any]) -> dict[str, Any]:
158        """Recursively update dictionary"""
159        for key, raw_value in updatee.items():
160            if isinstance(raw_value, Mapping):
161                root[key] = self.update(root.get(key, {}), raw_value)
162            else:
163                if isinstance(raw_value, str):
164                    value = self.parse_uri(raw_value)
165                elif isinstance(raw_value, Attr) and isinstance(raw_value.value, str):
166                    value = self.parse_uri(raw_value.value)
167                elif not isinstance(raw_value, Attr):
168                    value = Attr(raw_value)
169                else:
170                    value = raw_value
171                root[key] = value
172        return root
173
174    def refresh(self, key: str, default=None, sep=".") -> Any:
175        """Update a single value"""
176        attr: Attr = get_path_from_dict(self.raw, key, sep=sep, default=Attr(default))
177        if attr.source_type != Attr.Source.URI:
178            return attr.value
179        attr.value = self.parse_uri(attr.source).value
180        return attr.value
181
182    def parse_uri(self, value: str) -> Attr:
183        """Parse string values which start with a URI"""
184        url = urlparse(value)
185        parsed_value = value
186        if url.scheme == "env":
187            parsed_value = os.getenv(url.netloc, url.query)
188        if url.scheme == "file":
189            try:
190                with open(url.path, encoding="utf8") as _file:
191                    parsed_value = _file.read().strip()
192            except OSError as exc:
193                self.log("error", f"Failed to read config value from {url.path}: {exc}")
194                parsed_value = url.query
195        return Attr(parsed_value, Attr.Source.URI, value)
196
197    def update_from_file(self, path: Path):
198        """Update config from file contents"""
199        try:
200            with open(path, encoding="utf8") as file:
201                try:
202                    self.update(self.__config, yaml.safe_load(file))
203                    self.log("debug", "Loaded config", file=str(path))
204                except yaml.YAMLError as exc:
205                    raise ImproperlyConfigured from exc
206        except PermissionError as exc:
207            self.log(
208                "warning",
209                "Permission denied while reading file",
210                path=path,
211                error=str(exc),
212            )
213
214    def update_from_dict(self, update: dict):
215        """Update config from dict"""
216        self.__config.update(update)
217
218    def update_from_env(self):
219        """Check environment variables"""
220        outer = {}
221        idx = 0
222        for key, value in os.environ.items():
223            if not key.startswith(ENV_PREFIX):
224                continue
225            relative_key = key.replace(f"{ENV_PREFIX}_", "", 1).replace("__", ".").lower()
226            # Check if the value is json, and try to load it
227            try:
228                value = loads(value)  # noqa: PLW2901
229            except JSONDecodeError:
230                pass
231            attr_value = Attr(value, Attr.Source.ENV, relative_key)
232            set_path_in_dict(outer, relative_key, attr_value)
233            idx += 1
234        if idx > 0:
235            self.log("debug", "Loaded environment variables", count=idx)
236            self.update(self.__config, outer)
237
238    @contextmanager
239    def patch(self, path: str, value: Any):
240        """Context manager for unittests to patch a value"""
241        original_value = self.get(path, UNSET)
242        self.set(path, value)
243        try:
244            yield
245        finally:
246            if original_value is not UNSET:
247                self.set(path, original_value)
248            else:
249                self.delete(path)
250
251    @property
252    def raw(self) -> dict:
253        """Get raw config dictionary"""
254        return self.__config
255
256    def get(self, path: str, default=None, sep=".") -> Any:
257        """Access attribute by using yaml path"""
258        # Walk sub_dicts before parsing path
259        root = self.raw
260        # Walk each component of the path
261        attr: Attr = get_path_from_dict(root, path, sep=sep, default=Attr(default))
262        return attr.value
263
264    def get_int(self, path: str, default=0) -> int:
265        """Wrapper for get that converts value into int"""
266        try:
267            return int(self.get(path, default))
268        except ValueError as exc:
269            self.log("warning", "Failed to parse config as int", path=path, exc=str(exc))
270            return default
271
272    def get_optional_int(self, path: str, default=None) -> int | None:
273        """Wrapper for get that converts value into int or None if set"""
274        value = self.get(path, UNSET)
275        if value is UNSET:
276            return default
277        try:
278            return int(value)
279        except (ValueError, TypeError) as exc:
280            if value is None or (isinstance(value, str) and value.lower() == "null"):
281                return None
282            self.log("warning", "Failed to parse config as int", path=path, exc=str(exc))
283            return default
284
285    def get_bool(self, path: str, default=False) -> bool:
286        """Wrapper for get that converts value into boolean"""
287        value = self.get(path, UNSET)
288        if value is UNSET:
289            return default
290        return str(self.get(path)).lower() == "true"
291
292    def get_keys(self, path: str, sep=".") -> list[str]:
293        """List attribute keys by using yaml path"""
294        root = self.raw
295        attr: Attr = get_path_from_dict(root, path, sep=sep, default=Attr({}))
296        return attr.keys()
297
298    def get_dict_from_b64_json(self, path: str, default=None) -> dict:
299        """Wrapper for get that converts value from Base64 encoded string into dictionary"""
300        config_value = self.get(path)
301        if config_value is None:
302            return {}
303        try:
304            b64decoded_str = base64.b64decode(config_value).decode("utf-8")
305            return json.loads(b64decoded_str)
306        except (JSONDecodeError, TypeError, ValueError) as exc:
307            self.log(
308                "warning",
309                f"Ignored invalid configuration for '{path}' due to exception: {str(exc)}",
310            )
311            return default if isinstance(default, dict) else {}
312
313    def set(self, path: str, value: Any, sep="."):
314        """Set value using same syntax as get()"""
315        if not isinstance(value, Attr):
316            value = Attr(value)
317        set_path_in_dict(self.raw, path, value, sep=sep)
318
319    def delete(self, path: str, sep="."):
320        delete_path_in_dict(self.raw, path, sep=sep)

Search through SEARCH_PATHS and load configuration. Environment variables starting with ENV_PREFIX are also applied.

A variable like AUTHENTIK_POSTGRESQL__HOST would translate to postgresql.host

ConfigLoader(**kwargs)
 86    def __init__(self, **kwargs):
 87        super().__init__()
 88        self.__config = {}
 89        base_dir = Path(__file__).parent.joinpath(Path("../..")).resolve()
 90        for _path in SEARCH_PATHS:
 91            path = Path(_path)
 92            # Check if path is relative, and if so join with base_dir
 93            if not path.is_absolute():
 94                path = base_dir / path
 95            if path.is_file() and path.exists():
 96                # Path is an existing file, so we just read it and update our config with it
 97                self.update_from_file(path)
 98            elif path.is_dir() and path.exists():
 99                # Path is an existing dir, so we try to read the env config from it
100                env_paths = [
101                    path / Path(ENVIRONMENT + ".yml"),
102                    path / Path(ENVIRONMENT + ".env.yml"),
103                    path / Path(ENVIRONMENT + ".yaml"),
104                    path / Path(ENVIRONMENT + ".env.yaml"),
105                ]
106                for env_file in env_paths:
107                    if env_file.is_file() and env_file.exists():
108                        # Update config with env file
109                        self.update_from_file(env_file)
110        self.update_from_env()
111        self.update(self.__config, kwargs)
112        self.deprecations = self.check_deprecations()
deprecations: dict[tuple[str, str], str] = {}
def check_deprecations(self) -> dict[str, str]:
114    def check_deprecations(self) -> dict[str, str]:
115        """Warn if any deprecated configuration options are used"""
116
117        def _pop_deprecated_key(current_obj, dot_parts, index):
118            """Recursive function to remove deprecated keys in configuration"""
119            dot_part = dot_parts[index]
120            if index == len(dot_parts) - 1:
121                return current_obj.pop(dot_part)
122            value = _pop_deprecated_key(current_obj[dot_part], dot_parts, index + 1)
123            if not current_obj[dot_part]:
124                current_obj.pop(dot_part)
125            return value
126
127        deprecation_replacements = {}
128        for deprecation, replacement in DEPRECATIONS.items():
129            if self.get(deprecation, default=UNSET) is UNSET:
130                continue
131            message = (
132                f"'{deprecation}' has been deprecated in favor of '{replacement}'! "
133                + "Please update your configuration."
134            )
135            self.log(
136                "warning",
137                message,
138            )
139            deprecation_replacements[(deprecation, replacement)] = message
140
141            deprecated_attr = _pop_deprecated_key(self.__config, deprecation.split("."), 0)
142            self.set(replacement, deprecated_attr)
143        return deprecation_replacements

Warn if any deprecated configuration options are used

def log(self, level: str, message: str, **kwargs):
145    def log(self, level: str, message: str, **kwargs):
146        """Custom Log method, we want to ensure ConfigLoader always logs JSON even when
147        'structlog' or 'logging' hasn't been configured yet."""
148        output = {
149            "event": message,
150            "level": level,
151            "logger": self.__class__.__module__,
152            "timestamp": time(),
153        }
154        output.update(kwargs)
155        print(dumps(output), file=stderr)

Custom Log method, we want to ensure ConfigLoader always logs JSON even when 'structlog' or 'logging' hasn't been configured yet.

def update( self, root: dict[str, typing.Any], updatee: dict[str, typing.Any]) -> dict[str, typing.Any]:
157    def update(self, root: dict[str, Any], updatee: dict[str, Any]) -> dict[str, Any]:
158        """Recursively update dictionary"""
159        for key, raw_value in updatee.items():
160            if isinstance(raw_value, Mapping):
161                root[key] = self.update(root.get(key, {}), raw_value)
162            else:
163                if isinstance(raw_value, str):
164                    value = self.parse_uri(raw_value)
165                elif isinstance(raw_value, Attr) and isinstance(raw_value.value, str):
166                    value = self.parse_uri(raw_value.value)
167                elif not isinstance(raw_value, Attr):
168                    value = Attr(raw_value)
169                else:
170                    value = raw_value
171                root[key] = value
172        return root

Recursively update dictionary

def refresh(self, key: str, default=None, sep='.') -> Any:
174    def refresh(self, key: str, default=None, sep=".") -> Any:
175        """Update a single value"""
176        attr: Attr = get_path_from_dict(self.raw, key, sep=sep, default=Attr(default))
177        if attr.source_type != Attr.Source.URI:
178            return attr.value
179        attr.value = self.parse_uri(attr.source).value
180        return attr.value

Update a single value

def parse_uri(self, value: str) -> Attr:
182    def parse_uri(self, value: str) -> Attr:
183        """Parse string values which start with a URI"""
184        url = urlparse(value)
185        parsed_value = value
186        if url.scheme == "env":
187            parsed_value = os.getenv(url.netloc, url.query)
188        if url.scheme == "file":
189            try:
190                with open(url.path, encoding="utf8") as _file:
191                    parsed_value = _file.read().strip()
192            except OSError as exc:
193                self.log("error", f"Failed to read config value from {url.path}: {exc}")
194                parsed_value = url.query
195        return Attr(parsed_value, Attr.Source.URI, value)

Parse string values which start with a URI

def update_from_file(self, path: pathlib.Path):
197    def update_from_file(self, path: Path):
198        """Update config from file contents"""
199        try:
200            with open(path, encoding="utf8") as file:
201                try:
202                    self.update(self.__config, yaml.safe_load(file))
203                    self.log("debug", "Loaded config", file=str(path))
204                except yaml.YAMLError as exc:
205                    raise ImproperlyConfigured from exc
206        except PermissionError as exc:
207            self.log(
208                "warning",
209                "Permission denied while reading file",
210                path=path,
211                error=str(exc),
212            )

Update config from file contents

def update_from_dict(self, update: dict):
214    def update_from_dict(self, update: dict):
215        """Update config from dict"""
216        self.__config.update(update)

Update config from dict

def update_from_env(self):
218    def update_from_env(self):
219        """Check environment variables"""
220        outer = {}
221        idx = 0
222        for key, value in os.environ.items():
223            if not key.startswith(ENV_PREFIX):
224                continue
225            relative_key = key.replace(f"{ENV_PREFIX}_", "", 1).replace("__", ".").lower()
226            # Check if the value is json, and try to load it
227            try:
228                value = loads(value)  # noqa: PLW2901
229            except JSONDecodeError:
230                pass
231            attr_value = Attr(value, Attr.Source.ENV, relative_key)
232            set_path_in_dict(outer, relative_key, attr_value)
233            idx += 1
234        if idx > 0:
235            self.log("debug", "Loaded environment variables", count=idx)
236            self.update(self.__config, outer)

Check environment variables

@contextmanager
def patch(self, path: str, value: Any):
238    @contextmanager
239    def patch(self, path: str, value: Any):
240        """Context manager for unittests to patch a value"""
241        original_value = self.get(path, UNSET)
242        self.set(path, value)
243        try:
244            yield
245        finally:
246            if original_value is not UNSET:
247                self.set(path, original_value)
248            else:
249                self.delete(path)

Context manager for unittests to patch a value

raw: dict
251    @property
252    def raw(self) -> dict:
253        """Get raw config dictionary"""
254        return self.__config

Get raw config dictionary

def get(self, path: str, default=None, sep='.') -> Any:
256    def get(self, path: str, default=None, sep=".") -> Any:
257        """Access attribute by using yaml path"""
258        # Walk sub_dicts before parsing path
259        root = self.raw
260        # Walk each component of the path
261        attr: Attr = get_path_from_dict(root, path, sep=sep, default=Attr(default))
262        return attr.value

Access attribute by using yaml path

def get_int(self, path: str, default=0) -> int:
264    def get_int(self, path: str, default=0) -> int:
265        """Wrapper for get that converts value into int"""
266        try:
267            return int(self.get(path, default))
268        except ValueError as exc:
269            self.log("warning", "Failed to parse config as int", path=path, exc=str(exc))
270            return default

Wrapper for get that converts value into int

def get_optional_int(self, path: str, default=None) -> int | None:
272    def get_optional_int(self, path: str, default=None) -> int | None:
273        """Wrapper for get that converts value into int or None if set"""
274        value = self.get(path, UNSET)
275        if value is UNSET:
276            return default
277        try:
278            return int(value)
279        except (ValueError, TypeError) as exc:
280            if value is None or (isinstance(value, str) and value.lower() == "null"):
281                return None
282            self.log("warning", "Failed to parse config as int", path=path, exc=str(exc))
283            return default

Wrapper for get that converts value into int or None if set

def get_bool(self, path: str, default=False) -> bool:
285    def get_bool(self, path: str, default=False) -> bool:
286        """Wrapper for get that converts value into boolean"""
287        value = self.get(path, UNSET)
288        if value is UNSET:
289            return default
290        return str(self.get(path)).lower() == "true"

Wrapper for get that converts value into boolean

def get_keys(self, path: str, sep='.') -> list[str]:
292    def get_keys(self, path: str, sep=".") -> list[str]:
293        """List attribute keys by using yaml path"""
294        root = self.raw
295        attr: Attr = get_path_from_dict(root, path, sep=sep, default=Attr({}))
296        return attr.keys()

List attribute keys by using yaml path

def get_dict_from_b64_json(self, path: str, default=None) -> dict:
298    def get_dict_from_b64_json(self, path: str, default=None) -> dict:
299        """Wrapper for get that converts value from Base64 encoded string into dictionary"""
300        config_value = self.get(path)
301        if config_value is None:
302            return {}
303        try:
304            b64decoded_str = base64.b64decode(config_value).decode("utf-8")
305            return json.loads(b64decoded_str)
306        except (JSONDecodeError, TypeError, ValueError) as exc:
307            self.log(
308                "warning",
309                f"Ignored invalid configuration for '{path}' due to exception: {str(exc)}",
310            )
311            return default if isinstance(default, dict) else {}

Wrapper for get that converts value from Base64 encoded string into dictionary

def set(self, path: str, value: Any, sep='.'):
313    def set(self, path: str, value: Any, sep="."):
314        """Set value using same syntax as get()"""
315        if not isinstance(value, Attr):
316            value = Attr(value)
317        set_path_in_dict(self.raw, path, value, sep=sep)

Set value using same syntax as get()

def delete(self, path: str, sep='.'):
319    def delete(self, path: str, sep="."):
320        delete_path_in_dict(self.raw, path, sep=sep)
CONFIG = <ConfigLoader object>
def django_db_config(config: ConfigLoader | None = None) -> dict:
326def django_db_config(config: ConfigLoader | None = None) -> dict:
327    if not config:
328        config = CONFIG
329
330    pool_options = False
331    use_pool = config.get_bool("postgresql.use_pool", False)
332    if use_pool:
333        pool_options = config.get_dict_from_b64_json("postgresql.pool_options", True)
334        if not pool_options:
335            pool_options = True
336    # FIXME: Temporarily force pool to be deactivated.
337    # See https://github.com/goauthentik/authentik/issues/14320
338    pool_options = False
339
340    conn_options = config.get_dict_from_b64_json("postgresql.conn_options", default={})
341
342    db = {
343        "default": {
344            "ENGINE": "psqlextra.backend",
345            "HOST": config.get("postgresql.host"),
346            "NAME": config.get("postgresql.name"),
347            "USER": config.get("postgresql.user"),
348            "PASSWORD": config.get("postgresql.password"),
349            "PORT": config.get("postgresql.port"),
350            "OPTIONS": {
351                "sslmode": config.get("postgresql.sslmode"),
352                "sslrootcert": config.get("postgresql.sslrootcert"),
353                "sslcert": config.get("postgresql.sslcert"),
354                "sslkey": config.get("postgresql.sslkey"),
355                "pool": pool_options,
356                **conn_options,
357            },
358            "CONN_MAX_AGE": config.get_optional_int("postgresql.conn_max_age", 0),
359            "CONN_HEALTH_CHECKS": config.get_bool("postgresql.conn_health_checks", False),
360            "DISABLE_SERVER_SIDE_CURSORS": config.get_bool(
361                "postgresql.disable_server_side_cursors", False
362            ),
363            "TEST": {
364                "NAME": config.get("postgresql.test.name"),
365            },
366        }
367    }
368
369    conn_max_age = config.get_optional_int("postgresql.conn_max_age", UNSET)
370    disable_server_side_cursors = config.get_bool("postgresql.disable_server_side_cursors", UNSET)
371    if config.get_bool("postgresql.use_pgpool", False):
372        db["default"]["DISABLE_SERVER_SIDE_CURSORS"] = True
373        if disable_server_side_cursors is not UNSET:
374            db["default"]["DISABLE_SERVER_SIDE_CURSORS"] = disable_server_side_cursors
375
376    if config.get_bool("postgresql.use_pgbouncer", False):
377        # https://docs.djangoproject.com/en/4.0/ref/databases/#transaction-pooling-server-side-cursors
378        db["default"]["DISABLE_SERVER_SIDE_CURSORS"] = True
379        # https://docs.djangoproject.com/en/4.0/ref/databases/#persistent-connections
380        db["default"]["CONN_MAX_AGE"] = None  # persistent
381        if disable_server_side_cursors is not UNSET:
382            db["default"]["DISABLE_SERVER_SIDE_CURSORS"] = disable_server_side_cursors
383        if conn_max_age is not UNSET:
384            db["default"]["CONN_MAX_AGE"] = conn_max_age
385
386    all_replica_conn_options = config.get_dict_from_b64_json(
387        "postgresql.replica_conn_options",
388        default={},
389    )
390
391    for replica in config.get_keys("postgresql.read_replicas"):
392        _database = deepcopy(db["default"])
393
394        for setting, current_value in db["default"].items():
395            if isinstance(current_value, dict):
396                continue
397            override = config.get(
398                f"postgresql.read_replicas.{replica}.{setting.lower()}", default=UNSET
399            )
400            if override is not UNSET:
401                _database[setting] = override
402
403        for option in conn_options.keys():
404            _database["OPTIONS"].pop(option, None)
405
406        for setting in db["default"]["OPTIONS"].keys():
407            override = config.get(
408                f"postgresql.read_replicas.{replica}.{setting.lower()}", default=UNSET
409            )
410            if override is not UNSET:
411                _database["OPTIONS"][setting] = override
412
413        _database["OPTIONS"].update(all_replica_conn_options)
414        replica_conn_options = config.get_dict_from_b64_json(
415            f"postgresql.read_replicas.{replica}.conn_options", default={}
416        )
417        _database["OPTIONS"].update(replica_conn_options)
418
419        db[f"replica_{replica}"] = _database
420    return db