authentik.blueprints.v1.schema
Generate JSON Schema for blueprints
1"""Generate JSON Schema for blueprints""" 2 3from typing import Any 4 5from django.db.models import Model, fields 6from django.db.models.fields.related import OneToOneField 7from drf_jsonschema_serializer.convert import converter, field_to_converter 8from rest_framework.fields import Field, JSONField, UUIDField 9from rest_framework.relations import PrimaryKeyRelatedField 10from rest_framework.serializers import Serializer 11from structlog.stdlib import get_logger 12 13from authentik import authentik_version 14from authentik.blueprints.v1.common import BlueprintEntryDesiredState 15from authentik.blueprints.v1.importer import SERIALIZER_CONTEXT_BLUEPRINT, is_model_allowed 16from authentik.blueprints.v1.meta.registry import BaseMetaModel, registry 17from authentik.lib.models import SerializerModel 18 19LOGGER = get_logger() 20 21 22@converter 23class PrimaryKeyRelatedFieldConverter: 24 """Custom primary key field converter which is aware of non-integer based PKs 25 26 This is not an exhaustive fix for other non-int PKs, however in authentik we either 27 use UUIDs or ints""" 28 29 field_class = PrimaryKeyRelatedField 30 31 def convert(self, field: PrimaryKeyRelatedField): 32 model: Model = field.queryset.model 33 pk_field = model._meta.pk 34 if isinstance(pk_field, OneToOneField): 35 pk_field = pk_field.related_fields[0][1] 36 if isinstance(pk_field, fields.UUIDField): 37 return {"type": "string", "format": "uuid"} 38 return {"type": "integer"} 39 40 41class SchemaBuilder: 42 """Generate JSON Schema for blueprints""" 43 44 schema: dict 45 46 def __init__(self): 47 self.schema = { 48 "$schema": "http://json-schema.org/draft-07/schema", 49 "$id": "https://goauthentik.io/blueprints/schema.json", 50 "type": "object", 51 "title": f"authentik {authentik_version()} Blueprint schema", 52 "required": ["version", "entries"], 53 "properties": { 54 "version": { 55 "$id": "#/properties/version", 56 "type": "integer", 57 "title": "Blueprint version", 58 "default": 1, 59 }, 60 "metadata": { 61 "$id": "#/properties/metadata", 62 "type": "object", 63 "required": ["name"], 64 "properties": { 65 "name": {"type": "string"}, 66 "labels": {"type": "object", "additionalProperties": {"type": "string"}}, 67 }, 68 }, 69 "context": { 70 "$id": "#/properties/context", 71 "type": "object", 72 "additionalProperties": True, 73 }, 74 "entries": { 75 "anyOf": [ 76 { 77 "type": "array", 78 "items": {"$ref": "#/$defs/blueprint_entry"}, 79 }, 80 { 81 "type": "object", 82 "additionalProperties": { 83 "type": "array", 84 "items": {"$ref": "#/$defs/blueprint_entry"}, 85 }, 86 }, 87 ], 88 }, 89 }, 90 "$defs": {"blueprint_entry": {"oneOf": []}}, 91 } 92 93 @staticmethod 94 def json_default(value: Any) -> Any: 95 """Helper that handles gettext_lazy strings that JSON doesn't handle""" 96 return str(value) 97 98 def build(self): 99 """Build all models into the schema""" 100 for model in registry.get_models(): 101 if issubclass(model, BaseMetaModel): 102 serializer_class = model.serializer() 103 else: 104 if model._meta.abstract: 105 continue 106 if not is_model_allowed(model): 107 continue 108 model_instance: Model = model() 109 if not isinstance(model_instance, SerializerModel): 110 continue 111 try: 112 serializer_class = model_instance.serializer 113 except NotImplementedError as exc: 114 raise ValueError(f"SerializerModel not implemented by {model}") from exc 115 serializer = serializer_class( 116 context={ 117 SERIALIZER_CONTEXT_BLUEPRINT: False, 118 } 119 ) 120 model_path = f"{model._meta.app_label}.{model._meta.model_name}" 121 self.schema["$defs"]["blueprint_entry"]["oneOf"].append( 122 self.template_entry(model_path, model, serializer) 123 ) 124 125 def template_entry(self, model_path: str, model: type[Model], serializer: Serializer) -> dict: 126 """Template entry for a single model""" 127 model_schema = self.to_jsonschema(serializer) 128 model_schema["required"] = [] 129 def_name = f"model_{model_path}" 130 def_path = f"#/$defs/{def_name}" 131 self.schema["$defs"][def_name] = model_schema 132 def_name_perm = f"model_{model_path}_permissions" 133 def_path_perm = f"#/$defs/{def_name_perm}" 134 self.schema["$defs"][def_name_perm] = self.model_permissions(model) 135 template = { 136 "type": "object", 137 "required": ["model", "identifiers"], 138 "properties": { 139 "model": {"const": model_path}, 140 "id": {"type": "string"}, 141 "state": { 142 "type": "string", 143 "enum": sorted([s.value for s in BlueprintEntryDesiredState]), 144 "default": "present", 145 }, 146 "conditions": {"type": "array", "items": {"type": "boolean"}}, 147 "permissions": {"$ref": def_path_perm}, 148 "attrs": {"$ref": def_path}, 149 "identifiers": {"$ref": def_path}, 150 }, 151 } 152 # Meta models don't require identifiers, as there's no matching database model to find 153 if issubclass(model, BaseMetaModel): 154 del template["properties"]["identifiers"] 155 template["required"].remove("identifiers") 156 return template 157 158 def field_to_jsonschema(self, field: Field) -> dict: 159 """Convert a single field to json schema""" 160 if isinstance(field, Serializer): 161 result = self.to_jsonschema(field) 162 else: 163 try: 164 converter = field_to_converter[field] 165 result = converter.convert(field) 166 except KeyError: 167 if isinstance(field, JSONField): 168 result = {"type": "object", "additionalProperties": True} 169 elif isinstance(field, UUIDField): 170 result = {"type": "string", "format": "uuid"} 171 else: 172 raise 173 if field.label: 174 result["title"] = field.label 175 if field.help_text: 176 result["description"] = field.help_text 177 return self.clean_result(result) 178 179 def clean_result(self, result: dict) -> dict: 180 """Remove enumNames from result, recursively""" 181 result.pop("enumNames", None) 182 for key, value in result.items(): 183 if isinstance(value, dict): 184 result[key] = self.clean_result(value) 185 return result 186 187 def to_jsonschema(self, serializer: Serializer) -> dict: 188 """Convert serializer to json schema""" 189 properties = {} 190 required = [] 191 for name, field in serializer.fields.items(): 192 if field.read_only: 193 continue 194 sub_schema = self.field_to_jsonschema(field) 195 if field.required: 196 required.append(name) 197 properties[name] = sub_schema 198 199 result = {"type": "object", "properties": properties} 200 if required: 201 result["required"] = required 202 return result 203 204 def model_permissions(self, model: type[Model]) -> dict: 205 perms = [x[0] for x in model._meta.permissions] 206 for action in model._meta.default_permissions: 207 perms.append(f"{action}_{model._meta.model_name}") 208 return { 209 "type": "array", 210 "items": { 211 "type": "object", 212 "required": ["permission"], 213 "properties": { 214 "permission": {"type": "string", "enum": sorted(perms)}, 215 "user": {"type": "integer"}, 216 "role": {"type": "string"}, 217 }, 218 }, 219 }
LOGGER =
<BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
@converter
class
PrimaryKeyRelatedFieldConverter:
23@converter 24class PrimaryKeyRelatedFieldConverter: 25 """Custom primary key field converter which is aware of non-integer based PKs 26 27 This is not an exhaustive fix for other non-int PKs, however in authentik we either 28 use UUIDs or ints""" 29 30 field_class = PrimaryKeyRelatedField 31 32 def convert(self, field: PrimaryKeyRelatedField): 33 model: Model = field.queryset.model 34 pk_field = model._meta.pk 35 if isinstance(pk_field, OneToOneField): 36 pk_field = pk_field.related_fields[0][1] 37 if isinstance(pk_field, fields.UUIDField): 38 return {"type": "string", "format": "uuid"} 39 return {"type": "integer"}
Custom primary key field converter which is aware of non-integer based PKs
This is not an exhaustive fix for other non-int PKs, however in authentik we either use UUIDs or ints
def
convert(self, field: rest_framework.relations.PrimaryKeyRelatedField):
32 def convert(self, field: PrimaryKeyRelatedField): 33 model: Model = field.queryset.model 34 pk_field = model._meta.pk 35 if isinstance(pk_field, OneToOneField): 36 pk_field = pk_field.related_fields[0][1] 37 if isinstance(pk_field, fields.UUIDField): 38 return {"type": "string", "format": "uuid"} 39 return {"type": "integer"}
class
SchemaBuilder:
42class SchemaBuilder: 43 """Generate JSON Schema for blueprints""" 44 45 schema: dict 46 47 def __init__(self): 48 self.schema = { 49 "$schema": "http://json-schema.org/draft-07/schema", 50 "$id": "https://goauthentik.io/blueprints/schema.json", 51 "type": "object", 52 "title": f"authentik {authentik_version()} Blueprint schema", 53 "required": ["version", "entries"], 54 "properties": { 55 "version": { 56 "$id": "#/properties/version", 57 "type": "integer", 58 "title": "Blueprint version", 59 "default": 1, 60 }, 61 "metadata": { 62 "$id": "#/properties/metadata", 63 "type": "object", 64 "required": ["name"], 65 "properties": { 66 "name": {"type": "string"}, 67 "labels": {"type": "object", "additionalProperties": {"type": "string"}}, 68 }, 69 }, 70 "context": { 71 "$id": "#/properties/context", 72 "type": "object", 73 "additionalProperties": True, 74 }, 75 "entries": { 76 "anyOf": [ 77 { 78 "type": "array", 79 "items": {"$ref": "#/$defs/blueprint_entry"}, 80 }, 81 { 82 "type": "object", 83 "additionalProperties": { 84 "type": "array", 85 "items": {"$ref": "#/$defs/blueprint_entry"}, 86 }, 87 }, 88 ], 89 }, 90 }, 91 "$defs": {"blueprint_entry": {"oneOf": []}}, 92 } 93 94 @staticmethod 95 def json_default(value: Any) -> Any: 96 """Helper that handles gettext_lazy strings that JSON doesn't handle""" 97 return str(value) 98 99 def build(self): 100 """Build all models into the schema""" 101 for model in registry.get_models(): 102 if issubclass(model, BaseMetaModel): 103 serializer_class = model.serializer() 104 else: 105 if model._meta.abstract: 106 continue 107 if not is_model_allowed(model): 108 continue 109 model_instance: Model = model() 110 if not isinstance(model_instance, SerializerModel): 111 continue 112 try: 113 serializer_class = model_instance.serializer 114 except NotImplementedError as exc: 115 raise ValueError(f"SerializerModel not implemented by {model}") from exc 116 serializer = serializer_class( 117 context={ 118 SERIALIZER_CONTEXT_BLUEPRINT: False, 119 } 120 ) 121 model_path = f"{model._meta.app_label}.{model._meta.model_name}" 122 self.schema["$defs"]["blueprint_entry"]["oneOf"].append( 123 self.template_entry(model_path, model, serializer) 124 ) 125 126 def template_entry(self, model_path: str, model: type[Model], serializer: Serializer) -> dict: 127 """Template entry for a single model""" 128 model_schema = self.to_jsonschema(serializer) 129 model_schema["required"] = [] 130 def_name = f"model_{model_path}" 131 def_path = f"#/$defs/{def_name}" 132 self.schema["$defs"][def_name] = model_schema 133 def_name_perm = f"model_{model_path}_permissions" 134 def_path_perm = f"#/$defs/{def_name_perm}" 135 self.schema["$defs"][def_name_perm] = self.model_permissions(model) 136 template = { 137 "type": "object", 138 "required": ["model", "identifiers"], 139 "properties": { 140 "model": {"const": model_path}, 141 "id": {"type": "string"}, 142 "state": { 143 "type": "string", 144 "enum": sorted([s.value for s in BlueprintEntryDesiredState]), 145 "default": "present", 146 }, 147 "conditions": {"type": "array", "items": {"type": "boolean"}}, 148 "permissions": {"$ref": def_path_perm}, 149 "attrs": {"$ref": def_path}, 150 "identifiers": {"$ref": def_path}, 151 }, 152 } 153 # Meta models don't require identifiers, as there's no matching database model to find 154 if issubclass(model, BaseMetaModel): 155 del template["properties"]["identifiers"] 156 template["required"].remove("identifiers") 157 return template 158 159 def field_to_jsonschema(self, field: Field) -> dict: 160 """Convert a single field to json schema""" 161 if isinstance(field, Serializer): 162 result = self.to_jsonschema(field) 163 else: 164 try: 165 converter = field_to_converter[field] 166 result = converter.convert(field) 167 except KeyError: 168 if isinstance(field, JSONField): 169 result = {"type": "object", "additionalProperties": True} 170 elif isinstance(field, UUIDField): 171 result = {"type": "string", "format": "uuid"} 172 else: 173 raise 174 if field.label: 175 result["title"] = field.label 176 if field.help_text: 177 result["description"] = field.help_text 178 return self.clean_result(result) 179 180 def clean_result(self, result: dict) -> dict: 181 """Remove enumNames from result, recursively""" 182 result.pop("enumNames", None) 183 for key, value in result.items(): 184 if isinstance(value, dict): 185 result[key] = self.clean_result(value) 186 return result 187 188 def to_jsonschema(self, serializer: Serializer) -> dict: 189 """Convert serializer to json schema""" 190 properties = {} 191 required = [] 192 for name, field in serializer.fields.items(): 193 if field.read_only: 194 continue 195 sub_schema = self.field_to_jsonschema(field) 196 if field.required: 197 required.append(name) 198 properties[name] = sub_schema 199 200 result = {"type": "object", "properties": properties} 201 if required: 202 result["required"] = required 203 return result 204 205 def model_permissions(self, model: type[Model]) -> dict: 206 perms = [x[0] for x in model._meta.permissions] 207 for action in model._meta.default_permissions: 208 perms.append(f"{action}_{model._meta.model_name}") 209 return { 210 "type": "array", 211 "items": { 212 "type": "object", 213 "required": ["permission"], 214 "properties": { 215 "permission": {"type": "string", "enum": sorted(perms)}, 216 "user": {"type": "integer"}, 217 "role": {"type": "string"}, 218 }, 219 }, 220 }
Generate JSON Schema for blueprints
@staticmethod
def
json_default(value: Any) -> Any:
94 @staticmethod 95 def json_default(value: Any) -> Any: 96 """Helper that handles gettext_lazy strings that JSON doesn't handle""" 97 return str(value)
Helper that handles gettext_lazy strings that JSON doesn't handle
def
build(self):
99 def build(self): 100 """Build all models into the schema""" 101 for model in registry.get_models(): 102 if issubclass(model, BaseMetaModel): 103 serializer_class = model.serializer() 104 else: 105 if model._meta.abstract: 106 continue 107 if not is_model_allowed(model): 108 continue 109 model_instance: Model = model() 110 if not isinstance(model_instance, SerializerModel): 111 continue 112 try: 113 serializer_class = model_instance.serializer 114 except NotImplementedError as exc: 115 raise ValueError(f"SerializerModel not implemented by {model}") from exc 116 serializer = serializer_class( 117 context={ 118 SERIALIZER_CONTEXT_BLUEPRINT: False, 119 } 120 ) 121 model_path = f"{model._meta.app_label}.{model._meta.model_name}" 122 self.schema["$defs"]["blueprint_entry"]["oneOf"].append( 123 self.template_entry(model_path, model, serializer) 124 )
Build all models into the schema
def
template_entry( self, model_path: str, model: type[django.db.models.base.Model], serializer: rest_framework.serializers.Serializer) -> dict:
126 def template_entry(self, model_path: str, model: type[Model], serializer: Serializer) -> dict: 127 """Template entry for a single model""" 128 model_schema = self.to_jsonschema(serializer) 129 model_schema["required"] = [] 130 def_name = f"model_{model_path}" 131 def_path = f"#/$defs/{def_name}" 132 self.schema["$defs"][def_name] = model_schema 133 def_name_perm = f"model_{model_path}_permissions" 134 def_path_perm = f"#/$defs/{def_name_perm}" 135 self.schema["$defs"][def_name_perm] = self.model_permissions(model) 136 template = { 137 "type": "object", 138 "required": ["model", "identifiers"], 139 "properties": { 140 "model": {"const": model_path}, 141 "id": {"type": "string"}, 142 "state": { 143 "type": "string", 144 "enum": sorted([s.value for s in BlueprintEntryDesiredState]), 145 "default": "present", 146 }, 147 "conditions": {"type": "array", "items": {"type": "boolean"}}, 148 "permissions": {"$ref": def_path_perm}, 149 "attrs": {"$ref": def_path}, 150 "identifiers": {"$ref": def_path}, 151 }, 152 } 153 # Meta models don't require identifiers, as there's no matching database model to find 154 if issubclass(model, BaseMetaModel): 155 del template["properties"]["identifiers"] 156 template["required"].remove("identifiers") 157 return template
Template entry for a single model
def
field_to_jsonschema(self, field: rest_framework.fields.Field) -> dict:
159 def field_to_jsonschema(self, field: Field) -> dict: 160 """Convert a single field to json schema""" 161 if isinstance(field, Serializer): 162 result = self.to_jsonschema(field) 163 else: 164 try: 165 converter = field_to_converter[field] 166 result = converter.convert(field) 167 except KeyError: 168 if isinstance(field, JSONField): 169 result = {"type": "object", "additionalProperties": True} 170 elif isinstance(field, UUIDField): 171 result = {"type": "string", "format": "uuid"} 172 else: 173 raise 174 if field.label: 175 result["title"] = field.label 176 if field.help_text: 177 result["description"] = field.help_text 178 return self.clean_result(result)
Convert a single field to json schema
def
clean_result(self, result: dict) -> dict:
180 def clean_result(self, result: dict) -> dict: 181 """Remove enumNames from result, recursively""" 182 result.pop("enumNames", None) 183 for key, value in result.items(): 184 if isinstance(value, dict): 185 result[key] = self.clean_result(value) 186 return result
Remove enumNames from result, recursively
def
to_jsonschema(self, serializer: rest_framework.serializers.Serializer) -> dict:
188 def to_jsonschema(self, serializer: Serializer) -> dict: 189 """Convert serializer to json schema""" 190 properties = {} 191 required = [] 192 for name, field in serializer.fields.items(): 193 if field.read_only: 194 continue 195 sub_schema = self.field_to_jsonschema(field) 196 if field.required: 197 required.append(name) 198 properties[name] = sub_schema 199 200 result = {"type": "object", "properties": properties} 201 if required: 202 result["required"] = required 203 return result
Convert serializer to json schema
def
model_permissions(self, model: type[django.db.models.base.Model]) -> dict:
205 def model_permissions(self, model: type[Model]) -> dict: 206 perms = [x[0] for x in model._meta.permissions] 207 for action in model._meta.default_permissions: 208 perms.append(f"{action}_{model._meta.model_name}") 209 return { 210 "type": "array", 211 "items": { 212 "type": "object", 213 "required": ["permission"], 214 "properties": { 215 "permission": {"type": "string", "enum": sorted(perms)}, 216 "user": {"type": "integer"}, 217 "role": {"type": "string"}, 218 }, 219 }, 220 }