authentik.tenants.api.settings
Serializer for tenants models
1"""Serializer for tenants models""" 2 3from typing import get_args 4 5from django.utils.translation import gettext_lazy as _ 6from django_tenants.utils import get_public_schema_name 7from drf_spectacular.extensions import OpenApiSerializerFieldExtension 8from drf_spectacular.plumbing import build_basic_type, build_object_type 9from rest_framework.exceptions import ValidationError 10from rest_framework.fields import JSONField 11from rest_framework.generics import RetrieveUpdateAPIView 12from rest_framework.permissions import SAFE_METHODS 13 14from authentik.core.api.utils import JSONDictField, ModelSerializer 15from authentik.rbac.permissions import HasPermission 16from authentik.tenants.flags import Flag 17from authentik.tenants.models import Tenant 18 19 20class FlagJSONField(JSONDictField): 21 22 def to_internal_value(self, data: str): 23 flags = super().to_internal_value(data) 24 for flag in Flag.available(visibility="system", exclude_system=False): 25 flags[flag().key] = flag.get() 26 return flags 27 28 def to_representation(self, value: dict) -> dict: 29 new_value = value.copy() 30 for flag in Flag.available(exclude_system=False): 31 _flag = flag() 32 # Exclude any system flags that aren't modifiable 33 if _flag.visibility == "system": 34 new_value.pop(_flag.key, None) 35 # Explicitly present unset flags as if they were set to default 36 if _flag.key not in value: 37 value[_flag.key] = _flag.default 38 return super().to_representation(new_value) 39 40 def run_validators(self, value: dict): 41 super().run_validators(value) 42 for flag in Flag.available(): 43 _flag = flag() 44 if _flag.key not in value: 45 continue 46 flag_value = value.get(_flag.key) 47 flag_type = get_args(_flag.__orig_bases__[0])[0] 48 if flag_value and not isinstance(flag_value, flag_type): 49 raise ValidationError( 50 _("Value for flag {flag_key} needs to be of type {type}.").format( 51 flag_key=_flag.key, type=flag_type.__name__ 52 ) 53 ) 54 55 56class FlagsJSONExtension(OpenApiSerializerFieldExtension): 57 """Generate API Schema for JSON fields as""" 58 59 target_class = "authentik.tenants.api.settings.FlagJSONField" 60 61 def map_serializer_field(self, auto_schema, direction): 62 props = {} 63 for flag in Flag.available(): 64 _flag = flag() 65 props[_flag.key] = build_basic_type(get_args(_flag.__orig_bases__[0])[0]) 66 if _flag.description: 67 props[_flag.key]["description"] = _flag.description 68 if _flag.deprecated: 69 props[_flag.key]["deprecated"] = _flag.deprecated 70 return build_object_type(props, required=props.keys()) 71 72 73class SettingsSerializer(ModelSerializer): 74 """Settings Serializer""" 75 76 footer_links = JSONField(required=False) 77 flags = FlagJSONField() 78 79 class Meta: 80 model = Tenant 81 fields = [ 82 "avatars", 83 "default_user_change_name", 84 "default_user_change_email", 85 "default_user_change_username", 86 "event_retention", 87 "reputation_lower_limit", 88 "reputation_upper_limit", 89 "footer_links", 90 "gdpr_compliance", 91 "impersonation", 92 "impersonation_require_reason", 93 "default_token_duration", 94 "default_token_length", 95 "pagination_default_page_size", 96 "pagination_max_page_size", 97 "flags", 98 ] 99 100 101class SettingsView(RetrieveUpdateAPIView): 102 """Settings view""" 103 104 queryset = Tenant.objects.filter(ready=True) 105 serializer_class = SettingsSerializer 106 filter_backends = [] 107 108 def get_permissions(self): 109 return [ 110 HasPermission( 111 "authentik_rbac.view_system_settings" 112 if self.request.method in SAFE_METHODS 113 else "authentik_rbac.edit_system_settings" 114 )() 115 ] 116 117 def get_object(self): 118 obj = self.request.tenant 119 self.check_object_permissions(self.request, obj) 120 return obj 121 122 def perform_update(self, serializer): 123 # We need to be in the public schema to actually modify a tenant 124 with Tenant.objects.get(schema_name=get_public_schema_name()): 125 super().perform_update(serializer)
21class FlagJSONField(JSONDictField): 22 23 def to_internal_value(self, data: str): 24 flags = super().to_internal_value(data) 25 for flag in Flag.available(visibility="system", exclude_system=False): 26 flags[flag().key] = flag.get() 27 return flags 28 29 def to_representation(self, value: dict) -> dict: 30 new_value = value.copy() 31 for flag in Flag.available(exclude_system=False): 32 _flag = flag() 33 # Exclude any system flags that aren't modifiable 34 if _flag.visibility == "system": 35 new_value.pop(_flag.key, None) 36 # Explicitly present unset flags as if they were set to default 37 if _flag.key not in value: 38 value[_flag.key] = _flag.default 39 return super().to_representation(new_value) 40 41 def run_validators(self, value: dict): 42 super().run_validators(value) 43 for flag in Flag.available(): 44 _flag = flag() 45 if _flag.key not in value: 46 continue 47 flag_value = value.get(_flag.key) 48 flag_type = get_args(_flag.__orig_bases__[0])[0] 49 if flag_value and not isinstance(flag_value, flag_type): 50 raise ValidationError( 51 _("Value for flag {flag_key} needs to be of type {type}.").format( 52 flag_key=_flag.key, type=flag_type.__name__ 53 ) 54 )
JSON Field which only allows dictionaries
def
to_internal_value(self, data: str):
23 def to_internal_value(self, data: str): 24 flags = super().to_internal_value(data) 25 for flag in Flag.available(visibility="system", exclude_system=False): 26 flags[flag().key] = flag.get() 27 return flags
Transform the incoming primitive data into a native value.
def
to_representation(self, value: dict) -> dict:
29 def to_representation(self, value: dict) -> dict: 30 new_value = value.copy() 31 for flag in Flag.available(exclude_system=False): 32 _flag = flag() 33 # Exclude any system flags that aren't modifiable 34 if _flag.visibility == "system": 35 new_value.pop(_flag.key, None) 36 # Explicitly present unset flags as if they were set to default 37 if _flag.key not in value: 38 value[_flag.key] = _flag.default 39 return super().to_representation(new_value)
Transform the outgoing native value into primitive data.
def
run_validators(self, value: dict):
41 def run_validators(self, value: dict): 42 super().run_validators(value) 43 for flag in Flag.available(): 44 _flag = flag() 45 if _flag.key not in value: 46 continue 47 flag_value = value.get(_flag.key) 48 flag_type = get_args(_flag.__orig_bases__[0])[0] 49 if flag_value and not isinstance(flag_value, flag_type): 50 raise ValidationError( 51 _("Value for flag {flag_key} needs to be of type {type}.").format( 52 flag_key=_flag.key, type=flag_type.__name__ 53 ) 54 )
Test the given value against all the validators on the field,
and either raise a ValidationError or simply return.
Inherited Members
class
FlagsJSONExtension(drf_spectacular.plumbing.OpenApiGeneratorExtension[ForwardRef('OpenApiSerializerFieldExtension')]):
57class FlagsJSONExtension(OpenApiSerializerFieldExtension): 58 """Generate API Schema for JSON fields as""" 59 60 target_class = "authentik.tenants.api.settings.FlagJSONField" 61 62 def map_serializer_field(self, auto_schema, direction): 63 props = {} 64 for flag in Flag.available(): 65 _flag = flag() 66 props[_flag.key] = build_basic_type(get_args(_flag.__orig_bases__[0])[0]) 67 if _flag.description: 68 props[_flag.key]["description"] = _flag.description 69 if _flag.deprecated: 70 props[_flag.key]["deprecated"] = _flag.deprecated 71 return build_object_type(props, required=props.keys())
Generate API Schema for JSON fields as
target_class =
'FlagJSONField'
def
map_serializer_field(self, auto_schema, direction):
62 def map_serializer_field(self, auto_schema, direction): 63 props = {} 64 for flag in Flag.available(): 65 _flag = flag() 66 props[_flag.key] = build_basic_type(get_args(_flag.__orig_bases__[0])[0]) 67 if _flag.description: 68 props[_flag.key]["description"] = _flag.description 69 if _flag.deprecated: 70 props[_flag.key]["deprecated"] = _flag.deprecated 71 return build_object_type(props, required=props.keys())
override for customized serializer field mapping
74class SettingsSerializer(ModelSerializer): 75 """Settings Serializer""" 76 77 footer_links = JSONField(required=False) 78 flags = FlagJSONField() 79 80 class Meta: 81 model = Tenant 82 fields = [ 83 "avatars", 84 "default_user_change_name", 85 "default_user_change_email", 86 "default_user_change_username", 87 "event_retention", 88 "reputation_lower_limit", 89 "reputation_upper_limit", 90 "footer_links", 91 "gdpr_compliance", 92 "impersonation", 93 "impersonation_require_reason", 94 "default_token_duration", 95 "default_token_length", 96 "pagination_default_page_size", 97 "pagination_max_page_size", 98 "flags", 99 ]
Settings Serializer
Inherited Members
class
SettingsSerializer.Meta:
80 class Meta: 81 model = Tenant 82 fields = [ 83 "avatars", 84 "default_user_change_name", 85 "default_user_change_email", 86 "default_user_change_username", 87 "event_retention", 88 "reputation_lower_limit", 89 "reputation_upper_limit", 90 "footer_links", 91 "gdpr_compliance", 92 "impersonation", 93 "impersonation_require_reason", 94 "default_token_duration", 95 "default_token_length", 96 "pagination_default_page_size", 97 "pagination_max_page_size", 98 "flags", 99 ]
model =
<class 'authentik.tenants.models.Tenant'>
fields =
['avatars', 'default_user_change_name', 'default_user_change_email', 'default_user_change_username', 'event_retention', 'reputation_lower_limit', 'reputation_upper_limit', 'footer_links', 'gdpr_compliance', 'impersonation', 'impersonation_require_reason', 'default_token_duration', 'default_token_length', 'pagination_default_page_size', 'pagination_max_page_size', 'flags']
class
SettingsView(rest_framework.generics.RetrieveUpdateAPIView):
102class SettingsView(RetrieveUpdateAPIView): 103 """Settings view""" 104 105 queryset = Tenant.objects.filter(ready=True) 106 serializer_class = SettingsSerializer 107 filter_backends = [] 108 109 def get_permissions(self): 110 return [ 111 HasPermission( 112 "authentik_rbac.view_system_settings" 113 if self.request.method in SAFE_METHODS 114 else "authentik_rbac.edit_system_settings" 115 )() 116 ] 117 118 def get_object(self): 119 obj = self.request.tenant 120 self.check_object_permissions(self.request, obj) 121 return obj 122 123 def perform_update(self, serializer): 124 # We need to be in the public schema to actually modify a tenant 125 with Tenant.objects.get(schema_name=get_public_schema_name()): 126 super().perform_update(serializer)
Settings view
serializer_class =
<class 'SettingsSerializer'>
def
get_permissions(self):
109 def get_permissions(self): 110 return [ 111 HasPermission( 112 "authentik_rbac.view_system_settings" 113 if self.request.method in SAFE_METHODS 114 else "authentik_rbac.edit_system_settings" 115 )() 116 ]
Instantiates and returns the list of permissions that this view requires.
def
get_object(self):
118 def get_object(self): 119 obj = self.request.tenant 120 self.check_object_permissions(self.request, obj) 121 return obj
Returns the object the view is displaying.
You may want to override this if you need to provide non-standard queryset lookups. Eg if objects are referenced using multiple keyword arguments in the url conf.