authentik.tenants.api.settings

Serializer for tenants models

  1"""Serializer for tenants models"""
  2
  3from typing import get_args
  4
  5from django.utils.translation import gettext_lazy as _
  6from django_tenants.utils import get_public_schema_name
  7from drf_spectacular.extensions import OpenApiSerializerFieldExtension
  8from drf_spectacular.plumbing import build_basic_type, build_object_type
  9from rest_framework.exceptions import ValidationError
 10from rest_framework.fields import JSONField
 11from rest_framework.generics import RetrieveUpdateAPIView
 12from rest_framework.permissions import SAFE_METHODS
 13
 14from authentik.core.api.utils import JSONDictField, ModelSerializer
 15from authentik.rbac.permissions import HasPermission
 16from authentik.tenants.flags import Flag
 17from authentik.tenants.models import Tenant
 18
 19
 20class FlagJSONField(JSONDictField):
 21
 22    def to_internal_value(self, data: str):
 23        flags = super().to_internal_value(data)
 24        for flag in Flag.available(visibility="system", exclude_system=False):
 25            flags[flag().key] = flag.get()
 26        return flags
 27
 28    def to_representation(self, value: dict) -> dict:
 29        new_value = value.copy()
 30        for flag in Flag.available(exclude_system=False):
 31            _flag = flag()
 32            # Exclude any system flags that aren't modifiable
 33            if _flag.visibility == "system":
 34                new_value.pop(_flag.key, None)
 35            # Explicitly present unset flags as if they were set to default
 36            if _flag.key not in value:
 37                value[_flag.key] = _flag.default
 38        return super().to_representation(new_value)
 39
 40    def run_validators(self, value: dict):
 41        super().run_validators(value)
 42        for flag in Flag.available():
 43            _flag = flag()
 44            if _flag.key not in value:
 45                continue
 46            flag_value = value.get(_flag.key)
 47            flag_type = get_args(_flag.__orig_bases__[0])[0]
 48            if flag_value and not isinstance(flag_value, flag_type):
 49                raise ValidationError(
 50                    _("Value for flag {flag_key} needs to be of type {type}.").format(
 51                        flag_key=_flag.key, type=flag_type.__name__
 52                    )
 53                )
 54
 55
 56class FlagsJSONExtension(OpenApiSerializerFieldExtension):
 57    """Generate API Schema for JSON fields as"""
 58
 59    target_class = "authentik.tenants.api.settings.FlagJSONField"
 60
 61    def map_serializer_field(self, auto_schema, direction):
 62        props = {}
 63        for flag in Flag.available():
 64            _flag = flag()
 65            props[_flag.key] = build_basic_type(get_args(_flag.__orig_bases__[0])[0])
 66            if _flag.description:
 67                props[_flag.key]["description"] = _flag.description
 68            if _flag.deprecated:
 69                props[_flag.key]["deprecated"] = _flag.deprecated
 70        return build_object_type(props, required=props.keys())
 71
 72
 73class SettingsSerializer(ModelSerializer):
 74    """Settings Serializer"""
 75
 76    footer_links = JSONField(required=False)
 77    flags = FlagJSONField()
 78
 79    class Meta:
 80        model = Tenant
 81        fields = [
 82            "avatars",
 83            "default_user_change_name",
 84            "default_user_change_email",
 85            "default_user_change_username",
 86            "event_retention",
 87            "reputation_lower_limit",
 88            "reputation_upper_limit",
 89            "footer_links",
 90            "gdpr_compliance",
 91            "impersonation",
 92            "impersonation_require_reason",
 93            "default_token_duration",
 94            "default_token_length",
 95            "pagination_default_page_size",
 96            "pagination_max_page_size",
 97            "flags",
 98        ]
 99
100
101class SettingsView(RetrieveUpdateAPIView):
102    """Settings view"""
103
104    queryset = Tenant.objects.filter(ready=True)
105    serializer_class = SettingsSerializer
106    filter_backends = []
107
108    def get_permissions(self):
109        return [
110            HasPermission(
111                "authentik_rbac.view_system_settings"
112                if self.request.method in SAFE_METHODS
113                else "authentik_rbac.edit_system_settings"
114            )()
115        ]
116
117    def get_object(self):
118        obj = self.request.tenant
119        self.check_object_permissions(self.request, obj)
120        return obj
121
122    def perform_update(self, serializer):
123        # We need to be in the public schema to actually modify a tenant
124        with Tenant.objects.get(schema_name=get_public_schema_name()):
125            super().perform_update(serializer)
class FlagJSONField(authentik.core.api.utils.JSONDictField):
21class FlagJSONField(JSONDictField):
22
23    def to_internal_value(self, data: str):
24        flags = super().to_internal_value(data)
25        for flag in Flag.available(visibility="system", exclude_system=False):
26            flags[flag().key] = flag.get()
27        return flags
28
29    def to_representation(self, value: dict) -> dict:
30        new_value = value.copy()
31        for flag in Flag.available(exclude_system=False):
32            _flag = flag()
33            # Exclude any system flags that aren't modifiable
34            if _flag.visibility == "system":
35                new_value.pop(_flag.key, None)
36            # Explicitly present unset flags as if they were set to default
37            if _flag.key not in value:
38                value[_flag.key] = _flag.default
39        return super().to_representation(new_value)
40
41    def run_validators(self, value: dict):
42        super().run_validators(value)
43        for flag in Flag.available():
44            _flag = flag()
45            if _flag.key not in value:
46                continue
47            flag_value = value.get(_flag.key)
48            flag_type = get_args(_flag.__orig_bases__[0])[0]
49            if flag_value and not isinstance(flag_value, flag_type):
50                raise ValidationError(
51                    _("Value for flag {flag_key} needs to be of type {type}.").format(
52                        flag_key=_flag.key, type=flag_type.__name__
53                    )
54                )

JSON Field which only allows dictionaries

def to_internal_value(self, data: str):
23    def to_internal_value(self, data: str):
24        flags = super().to_internal_value(data)
25        for flag in Flag.available(visibility="system", exclude_system=False):
26            flags[flag().key] = flag.get()
27        return flags

Transform the incoming primitive data into a native value.

def to_representation(self, value: dict) -> dict:
29    def to_representation(self, value: dict) -> dict:
30        new_value = value.copy()
31        for flag in Flag.available(exclude_system=False):
32            _flag = flag()
33            # Exclude any system flags that aren't modifiable
34            if _flag.visibility == "system":
35                new_value.pop(_flag.key, None)
36            # Explicitly present unset flags as if they were set to default
37            if _flag.key not in value:
38                value[_flag.key] = _flag.default
39        return super().to_representation(new_value)

Transform the outgoing native value into primitive data.

def run_validators(self, value: dict):
41    def run_validators(self, value: dict):
42        super().run_validators(value)
43        for flag in Flag.available():
44            _flag = flag()
45            if _flag.key not in value:
46                continue
47            flag_value = value.get(_flag.key)
48            flag_type = get_args(_flag.__orig_bases__[0])[0]
49            if flag_value and not isinstance(flag_value, flag_type):
50                raise ValidationError(
51                    _("Value for flag {flag_key} needs to be of type {type}.").format(
52                        flag_key=_flag.key, type=flag_type.__name__
53                    )
54                )

Test the given value against all the validators on the field, and either raise a ValidationError or simply return.

class FlagsJSONExtension(drf_spectacular.plumbing.OpenApiGeneratorExtension[ForwardRef('OpenApiSerializerFieldExtension')]):
57class FlagsJSONExtension(OpenApiSerializerFieldExtension):
58    """Generate API Schema for JSON fields as"""
59
60    target_class = "authentik.tenants.api.settings.FlagJSONField"
61
62    def map_serializer_field(self, auto_schema, direction):
63        props = {}
64        for flag in Flag.available():
65            _flag = flag()
66            props[_flag.key] = build_basic_type(get_args(_flag.__orig_bases__[0])[0])
67            if _flag.description:
68                props[_flag.key]["description"] = _flag.description
69            if _flag.deprecated:
70                props[_flag.key]["deprecated"] = _flag.deprecated
71        return build_object_type(props, required=props.keys())

Generate API Schema for JSON fields as

target_class = 'FlagJSONField'
def map_serializer_field(self, auto_schema, direction):
62    def map_serializer_field(self, auto_schema, direction):
63        props = {}
64        for flag in Flag.available():
65            _flag = flag()
66            props[_flag.key] = build_basic_type(get_args(_flag.__orig_bases__[0])[0])
67            if _flag.description:
68                props[_flag.key]["description"] = _flag.description
69            if _flag.deprecated:
70                props[_flag.key]["deprecated"] = _flag.deprecated
71        return build_object_type(props, required=props.keys())

override for customized serializer field mapping

class SettingsSerializer(authentik.core.api.utils.ModelSerializer):
74class SettingsSerializer(ModelSerializer):
75    """Settings Serializer"""
76
77    footer_links = JSONField(required=False)
78    flags = FlagJSONField()
79
80    class Meta:
81        model = Tenant
82        fields = [
83            "avatars",
84            "default_user_change_name",
85            "default_user_change_email",
86            "default_user_change_username",
87            "event_retention",
88            "reputation_lower_limit",
89            "reputation_upper_limit",
90            "footer_links",
91            "gdpr_compliance",
92            "impersonation",
93            "impersonation_require_reason",
94            "default_token_duration",
95            "default_token_length",
96            "pagination_default_page_size",
97            "pagination_max_page_size",
98            "flags",
99        ]

Settings Serializer

flags
class SettingsSerializer.Meta:
80    class Meta:
81        model = Tenant
82        fields = [
83            "avatars",
84            "default_user_change_name",
85            "default_user_change_email",
86            "default_user_change_username",
87            "event_retention",
88            "reputation_lower_limit",
89            "reputation_upper_limit",
90            "footer_links",
91            "gdpr_compliance",
92            "impersonation",
93            "impersonation_require_reason",
94            "default_token_duration",
95            "default_token_length",
96            "pagination_default_page_size",
97            "pagination_max_page_size",
98            "flags",
99        ]
fields = ['avatars', 'default_user_change_name', 'default_user_change_email', 'default_user_change_username', 'event_retention', 'reputation_lower_limit', 'reputation_upper_limit', 'footer_links', 'gdpr_compliance', 'impersonation', 'impersonation_require_reason', 'default_token_duration', 'default_token_length', 'pagination_default_page_size', 'pagination_max_page_size', 'flags']
class SettingsView(rest_framework.generics.RetrieveUpdateAPIView):
102class SettingsView(RetrieveUpdateAPIView):
103    """Settings view"""
104
105    queryset = Tenant.objects.filter(ready=True)
106    serializer_class = SettingsSerializer
107    filter_backends = []
108
109    def get_permissions(self):
110        return [
111            HasPermission(
112                "authentik_rbac.view_system_settings"
113                if self.request.method in SAFE_METHODS
114                else "authentik_rbac.edit_system_settings"
115            )()
116        ]
117
118    def get_object(self):
119        obj = self.request.tenant
120        self.check_object_permissions(self.request, obj)
121        return obj
122
123    def perform_update(self, serializer):
124        # We need to be in the public schema to actually modify a tenant
125        with Tenant.objects.get(schema_name=get_public_schema_name()):
126            super().perform_update(serializer)

Settings view

queryset = <QuerySet [<Tenant: Tenant Default>]>
serializer_class = <class 'SettingsSerializer'>
filter_backends = []
def get_permissions(self):
109    def get_permissions(self):
110        return [
111            HasPermission(
112                "authentik_rbac.view_system_settings"
113                if self.request.method in SAFE_METHODS
114                else "authentik_rbac.edit_system_settings"
115            )()
116        ]

Instantiates and returns the list of permissions that this view requires.

def get_object(self):
118    def get_object(self):
119        obj = self.request.tenant
120        self.check_object_permissions(self.request, obj)
121        return obj

Returns the object the view is displaying.

You may want to override this if you need to provide non-standard queryset lookups. Eg if objects are referenced using multiple keyword arguments in the url conf.

def perform_update(self, serializer):
123    def perform_update(self, serializer):
124        # We need to be in the public schema to actually modify a tenant
125        with Tenant.objects.get(schema_name=get_public_schema_name()):
126            super().perform_update(serializer)