authentik.tenants.flags

  1from collections.abc import Callable
  2from copy import copy
  3from functools import wraps
  4from typing import TYPE_CHECKING, Any, Literal
  5
  6from django.db import DatabaseError, InternalError, ProgrammingError
  7from django.db.models import F, Func, JSONField, Value
  8
  9from authentik.lib.utils.reflection import all_subclasses
 10
 11if TYPE_CHECKING:
 12    from authentik.tenants.models import Tenant
 13
 14
 15class Flag[T]:
 16    default: T | None = None
 17    visibility: (
 18        Literal["none"] | Literal["public"] | Literal["authenticated"] | Literal["system"]
 19    ) = "none"
 20    description: str | None = None
 21    deprecated = False
 22
 23    def __init_subclass__(cls, key: str, **kwargs):
 24        cls.__key = key
 25
 26    @property
 27    def key(self) -> str:
 28        return self.__key
 29
 30    @classmethod
 31    def get(cls, tenant: Tenant | None = None) -> T | None:
 32        from authentik.tenants.utils import get_current_tenant
 33
 34        if not tenant:
 35            tenant = get_current_tenant(["flags"])
 36
 37        flags = {}
 38        try:
 39            flags: dict[str, Any] = tenant.flags
 40        except DatabaseError, ProgrammingError, InternalError:
 41            pass
 42        value = flags.get(cls.__key, None)
 43        if value is None:
 44            return cls().get_default()
 45        return value
 46
 47    @classmethod
 48    def set(cls, value: T, tenant: Tenant | None = None) -> T | None:
 49        from authentik.tenants.models import Tenant
 50        from authentik.tenants.utils import get_current_tenant
 51
 52        if not tenant:
 53            tenant = get_current_tenant()
 54
 55        Tenant.objects.filter(pk=tenant.pk).update(
 56            flags=Func(
 57                F("flags"),
 58                Value([cls.__key]),
 59                Value(value, JSONField()),
 60                function="jsonb_set",
 61            )
 62        )
 63
 64    def get_default(self) -> T | None:
 65        return self.default
 66
 67    @staticmethod
 68    def available(
 69        visibility: Literal["none"] | Literal["public"] | Literal["authenticated"] | None = None,
 70        exclude_system=True,
 71    ):
 72        flags = all_subclasses(Flag)
 73        for flag in flags:
 74            if visibility and flag.visibility != visibility:
 75                continue
 76            if exclude_system and flag.visibility == "system":
 77                continue
 78            yield flag
 79
 80
 81def patch_flag[T](flag: Flag[T], value: T):
 82    """Decorator for tests to set a flag to a value"""
 83
 84    def wrapper_outer(func: Callable):
 85        """Set a flag for a test"""
 86        from authentik.tenants.utils import get_current_tenant
 87
 88        def cleanup(tenant: Tenant, flags: dict[str, Any]):
 89            tenant.flags = flags
 90            tenant.save()
 91
 92        @wraps(func)
 93        def wrapper(*args, **kwargs):
 94            tenant = get_current_tenant()
 95            old_flags = copy(tenant.flags)
 96            tenant.flags[flag().key] = value
 97            tenant.save()
 98            try:
 99                res = func(*args, **kwargs)
100                cleanup(tenant, old_flags)
101                return res
102            finally:
103                cleanup(tenant, old_flags)
104
105        return wrapper
106
107    return wrapper_outer
class Flag(typing.Generic[T]):
16class Flag[T]:
17    default: T | None = None
18    visibility: (
19        Literal["none"] | Literal["public"] | Literal["authenticated"] | Literal["system"]
20    ) = "none"
21    description: str | None = None
22    deprecated = False
23
24    def __init_subclass__(cls, key: str, **kwargs):
25        cls.__key = key
26
27    @property
28    def key(self) -> str:
29        return self.__key
30
31    @classmethod
32    def get(cls, tenant: Tenant | None = None) -> T | None:
33        from authentik.tenants.utils import get_current_tenant
34
35        if not tenant:
36            tenant = get_current_tenant(["flags"])
37
38        flags = {}
39        try:
40            flags: dict[str, Any] = tenant.flags
41        except DatabaseError, ProgrammingError, InternalError:
42            pass
43        value = flags.get(cls.__key, None)
44        if value is None:
45            return cls().get_default()
46        return value
47
48    @classmethod
49    def set(cls, value: T, tenant: Tenant | None = None) -> T | None:
50        from authentik.tenants.models import Tenant
51        from authentik.tenants.utils import get_current_tenant
52
53        if not tenant:
54            tenant = get_current_tenant()
55
56        Tenant.objects.filter(pk=tenant.pk).update(
57            flags=Func(
58                F("flags"),
59                Value([cls.__key]),
60                Value(value, JSONField()),
61                function="jsonb_set",
62            )
63        )
64
65    def get_default(self) -> T | None:
66        return self.default
67
68    @staticmethod
69    def available(
70        visibility: Literal["none"] | Literal["public"] | Literal["authenticated"] | None = None,
71        exclude_system=True,
72    ):
73        flags = all_subclasses(Flag)
74        for flag in flags:
75            if visibility and flag.visibility != visibility:
76                continue
77            if exclude_system and flag.visibility == "system":
78                continue
79            yield flag

Abstract base class for generic types.

On Python 3.12 and newer, generic classes implicitly inherit from Generic when they declare a parameter list after the class's name::

class Mapping[KT, VT]:
    def __getitem__(self, key: KT) -> VT:
        ...
    # Etc.

On older versions of Python, however, generic classes have to explicitly inherit from Generic.

After a class has been declared to be generic, it can then be used as follows::

def lookup_name[KT, VT](mapping: Mapping[KT, VT], key: KT, default: VT) -> VT:
    try:
        return mapping[key]
    except KeyError:
        return default
default: T | None = None
visibility: Literal['none'] | Literal['public'] | Literal['authenticated'] | Literal['system'] = 'none'
description: str | None = None
deprecated = False
key: str
27    @property
28    def key(self) -> str:
29        return self.__key
@classmethod
def get(unknown):
31    @classmethod
32    def get(cls, tenant: Tenant | None = None) -> T | None:
33        from authentik.tenants.utils import get_current_tenant
34
35        if not tenant:
36            tenant = get_current_tenant(["flags"])
37
38        flags = {}
39        try:
40            flags: dict[str, Any] = tenant.flags
41        except DatabaseError, ProgrammingError, InternalError:
42            pass
43        value = flags.get(cls.__key, None)
44        if value is None:
45            return cls().get_default()
46        return value
@classmethod
def set(unknown):
48    @classmethod
49    def set(cls, value: T, tenant: Tenant | None = None) -> T | None:
50        from authentik.tenants.models import Tenant
51        from authentik.tenants.utils import get_current_tenant
52
53        if not tenant:
54            tenant = get_current_tenant()
55
56        Tenant.objects.filter(pk=tenant.pk).update(
57            flags=Func(
58                F("flags"),
59                Value([cls.__key]),
60                Value(value, JSONField()),
61                function="jsonb_set",
62            )
63        )
def get_default(self) -> T | None:
65    def get_default(self) -> T | None:
66        return self.default
@staticmethod
def available( visibility: Literal['none'] | Literal['public'] | Literal['authenticated'] | None = None, exclude_system=True):
68    @staticmethod
69    def available(
70        visibility: Literal["none"] | Literal["public"] | Literal["authenticated"] | None = None,
71        exclude_system=True,
72    ):
73        flags = all_subclasses(Flag)
74        for flag in flags:
75            if visibility and flag.visibility != visibility:
76                continue
77            if exclude_system and flag.visibility == "system":
78                continue
79            yield flag
def patch_flag(flag: Flag[T], value: T):
 82def patch_flag[T](flag: Flag[T], value: T):
 83    """Decorator for tests to set a flag to a value"""
 84
 85    def wrapper_outer(func: Callable):
 86        """Set a flag for a test"""
 87        from authentik.tenants.utils import get_current_tenant
 88
 89        def cleanup(tenant: Tenant, flags: dict[str, Any]):
 90            tenant.flags = flags
 91            tenant.save()
 92
 93        @wraps(func)
 94        def wrapper(*args, **kwargs):
 95            tenant = get_current_tenant()
 96            old_flags = copy(tenant.flags)
 97            tenant.flags[flag().key] = value
 98            tenant.save()
 99            try:
100                res = func(*args, **kwargs)
101                cleanup(tenant, old_flags)
102                return res
103            finally:
104                cleanup(tenant, old_flags)
105
106        return wrapper
107
108    return wrapper_outer

Decorator for tests to set a flag to a value