authentik.tenants.api.settings

Serializer for tenants models

  1"""Serializer for tenants models"""
  2
  3from typing import get_args
  4
  5from django.utils.translation import gettext_lazy as _
  6from django_tenants.utils import get_public_schema_name
  7from drf_spectacular.extensions import OpenApiSerializerFieldExtension
  8from drf_spectacular.plumbing import build_basic_type, build_object_type
  9from rest_framework.exceptions import ValidationError
 10from rest_framework.fields import JSONField
 11from rest_framework.generics import RetrieveUpdateAPIView
 12from rest_framework.permissions import SAFE_METHODS
 13
 14from authentik.core.api.utils import JSONDictField, ModelSerializer
 15from authentik.rbac.permissions import HasPermission
 16from authentik.tenants.flags import Flag
 17from authentik.tenants.models import Tenant
 18
 19
 20class FlagJSONField(JSONDictField):
 21
 22    def run_validators(self, value: dict):
 23        super().run_validators(value)
 24        for flag in Flag.available():
 25            _flag = flag()
 26            if _flag.key in value:
 27                flag_value = value.get(_flag.key)
 28                flag_type = get_args(_flag.__orig_bases__[0])[0]
 29                if flag_value and not isinstance(flag_value, flag_type):
 30                    raise ValidationError(
 31                        _("Value for flag {flag_key} needs to be of type {type}.").format(
 32                            flag_key=_flag.key, type=flag_type.__name__
 33                        )
 34                    )
 35
 36
 37class FlagsJSONExtension(OpenApiSerializerFieldExtension):
 38    """Generate API Schema for JSON fields as"""
 39
 40    target_class = "authentik.tenants.api.settings.FlagJSONField"
 41
 42    def map_serializer_field(self, auto_schema, direction):
 43        props = {}
 44        for flag in Flag.available():
 45            _flag = flag()
 46            props[_flag.key] = build_basic_type(get_args(_flag.__orig_bases__[0])[0])
 47            if _flag.description:
 48                props[_flag.key]["description"] = _flag.description
 49        return build_object_type(props, required=props.keys())
 50
 51
 52class SettingsSerializer(ModelSerializer):
 53    """Settings Serializer"""
 54
 55    footer_links = JSONField(required=False)
 56    flags = FlagJSONField()
 57
 58    class Meta:
 59        model = Tenant
 60        fields = [
 61            "avatars",
 62            "default_user_change_name",
 63            "default_user_change_email",
 64            "default_user_change_username",
 65            "event_retention",
 66            "reputation_lower_limit",
 67            "reputation_upper_limit",
 68            "footer_links",
 69            "gdpr_compliance",
 70            "impersonation",
 71            "impersonation_require_reason",
 72            "default_token_duration",
 73            "default_token_length",
 74            "pagination_default_page_size",
 75            "pagination_max_page_size",
 76            "flags",
 77        ]
 78
 79
 80class SettingsView(RetrieveUpdateAPIView):
 81    """Settings view"""
 82
 83    queryset = Tenant.objects.filter(ready=True)
 84    serializer_class = SettingsSerializer
 85    filter_backends = []
 86
 87    def get_permissions(self):
 88        return [
 89            HasPermission(
 90                "authentik_rbac.view_system_settings"
 91                if self.request.method in SAFE_METHODS
 92                else "authentik_rbac.edit_system_settings"
 93            )()
 94        ]
 95
 96    def get_object(self):
 97        obj = self.request.tenant
 98        self.check_object_permissions(self.request, obj)
 99        return obj
100
101    def perform_update(self, serializer):
102        # We need to be in the public schema to actually modify a tenant
103        with Tenant.objects.get(schema_name=get_public_schema_name()):
104            super().perform_update(serializer)
class FlagJSONField(authentik.core.api.utils.JSONDictField):
21class FlagJSONField(JSONDictField):
22
23    def run_validators(self, value: dict):
24        super().run_validators(value)
25        for flag in Flag.available():
26            _flag = flag()
27            if _flag.key in value:
28                flag_value = value.get(_flag.key)
29                flag_type = get_args(_flag.__orig_bases__[0])[0]
30                if flag_value and not isinstance(flag_value, flag_type):
31                    raise ValidationError(
32                        _("Value for flag {flag_key} needs to be of type {type}.").format(
33                            flag_key=_flag.key, type=flag_type.__name__
34                        )
35                    )

JSON Field which only allows dictionaries

def run_validators(self, value: dict):
23    def run_validators(self, value: dict):
24        super().run_validators(value)
25        for flag in Flag.available():
26            _flag = flag()
27            if _flag.key in value:
28                flag_value = value.get(_flag.key)
29                flag_type = get_args(_flag.__orig_bases__[0])[0]
30                if flag_value and not isinstance(flag_value, flag_type):
31                    raise ValidationError(
32                        _("Value for flag {flag_key} needs to be of type {type}.").format(
33                            flag_key=_flag.key, type=flag_type.__name__
34                        )
35                    )

Test the given value against all the validators on the field, and either raise a ValidationError or simply return.

class FlagsJSONExtension(drf_spectacular.plumbing.OpenApiGeneratorExtension[ForwardRef('OpenApiSerializerFieldExtension')]):
38class FlagsJSONExtension(OpenApiSerializerFieldExtension):
39    """Generate API Schema for JSON fields as"""
40
41    target_class = "authentik.tenants.api.settings.FlagJSONField"
42
43    def map_serializer_field(self, auto_schema, direction):
44        props = {}
45        for flag in Flag.available():
46            _flag = flag()
47            props[_flag.key] = build_basic_type(get_args(_flag.__orig_bases__[0])[0])
48            if _flag.description:
49                props[_flag.key]["description"] = _flag.description
50        return build_object_type(props, required=props.keys())

Generate API Schema for JSON fields as

target_class = 'FlagJSONField'
def map_serializer_field(self, auto_schema, direction):
43    def map_serializer_field(self, auto_schema, direction):
44        props = {}
45        for flag in Flag.available():
46            _flag = flag()
47            props[_flag.key] = build_basic_type(get_args(_flag.__orig_bases__[0])[0])
48            if _flag.description:
49                props[_flag.key]["description"] = _flag.description
50        return build_object_type(props, required=props.keys())

override for customized serializer field mapping

class SettingsSerializer(authentik.core.api.utils.ModelSerializer):
53class SettingsSerializer(ModelSerializer):
54    """Settings Serializer"""
55
56    footer_links = JSONField(required=False)
57    flags = FlagJSONField()
58
59    class Meta:
60        model = Tenant
61        fields = [
62            "avatars",
63            "default_user_change_name",
64            "default_user_change_email",
65            "default_user_change_username",
66            "event_retention",
67            "reputation_lower_limit",
68            "reputation_upper_limit",
69            "footer_links",
70            "gdpr_compliance",
71            "impersonation",
72            "impersonation_require_reason",
73            "default_token_duration",
74            "default_token_length",
75            "pagination_default_page_size",
76            "pagination_max_page_size",
77            "flags",
78        ]

Settings Serializer

flags
class SettingsSerializer.Meta:
59    class Meta:
60        model = Tenant
61        fields = [
62            "avatars",
63            "default_user_change_name",
64            "default_user_change_email",
65            "default_user_change_username",
66            "event_retention",
67            "reputation_lower_limit",
68            "reputation_upper_limit",
69            "footer_links",
70            "gdpr_compliance",
71            "impersonation",
72            "impersonation_require_reason",
73            "default_token_duration",
74            "default_token_length",
75            "pagination_default_page_size",
76            "pagination_max_page_size",
77            "flags",
78        ]
fields = ['avatars', 'default_user_change_name', 'default_user_change_email', 'default_user_change_username', 'event_retention', 'reputation_lower_limit', 'reputation_upper_limit', 'footer_links', 'gdpr_compliance', 'impersonation', 'impersonation_require_reason', 'default_token_duration', 'default_token_length', 'pagination_default_page_size', 'pagination_max_page_size', 'flags']
class SettingsView(rest_framework.generics.RetrieveUpdateAPIView):
 81class SettingsView(RetrieveUpdateAPIView):
 82    """Settings view"""
 83
 84    queryset = Tenant.objects.filter(ready=True)
 85    serializer_class = SettingsSerializer
 86    filter_backends = []
 87
 88    def get_permissions(self):
 89        return [
 90            HasPermission(
 91                "authentik_rbac.view_system_settings"
 92                if self.request.method in SAFE_METHODS
 93                else "authentik_rbac.edit_system_settings"
 94            )()
 95        ]
 96
 97    def get_object(self):
 98        obj = self.request.tenant
 99        self.check_object_permissions(self.request, obj)
100        return obj
101
102    def perform_update(self, serializer):
103        # We need to be in the public schema to actually modify a tenant
104        with Tenant.objects.get(schema_name=get_public_schema_name()):
105            super().perform_update(serializer)

Settings view

queryset = <QuerySet [<Tenant: Tenant Default>]>
serializer_class = <class 'SettingsSerializer'>
filter_backends = []
def get_permissions(self):
88    def get_permissions(self):
89        return [
90            HasPermission(
91                "authentik_rbac.view_system_settings"
92                if self.request.method in SAFE_METHODS
93                else "authentik_rbac.edit_system_settings"
94            )()
95        ]

Instantiates and returns the list of permissions that this view requires.

def get_object(self):
 97    def get_object(self):
 98        obj = self.request.tenant
 99        self.check_object_permissions(self.request, obj)
100        return obj

Returns the object the view is displaying.

You may want to override this if you need to provide non-standard queryset lookups. Eg if objects are referenced using multiple keyword arguments in the url conf.

def perform_update(self, serializer):
102    def perform_update(self, serializer):
103        # We need to be in the public schema to actually modify a tenant
104        with Tenant.objects.get(schema_name=get_public_schema_name()):
105            super().perform_update(serializer)