authentik.blueprints.v1.schema

Generate JSON Schema for blueprints

  1"""Generate JSON Schema for blueprints"""
  2
  3from typing import Any
  4
  5from django.db.models import Model, fields
  6from django.db.models.fields.related import OneToOneField
  7from drf_jsonschema_serializer.convert import converter, field_to_converter
  8from rest_framework.fields import Field, JSONField, UUIDField
  9from rest_framework.relations import PrimaryKeyRelatedField
 10from rest_framework.serializers import Serializer
 11from structlog.stdlib import get_logger
 12
 13from authentik import authentik_version
 14from authentik.blueprints.v1.common import BlueprintEntryDesiredState
 15from authentik.blueprints.v1.importer import SERIALIZER_CONTEXT_BLUEPRINT, is_model_allowed
 16from authentik.blueprints.v1.meta.registry import BaseMetaModel, registry
 17from authentik.lib.models import SerializerModel
 18
 19LOGGER = get_logger()
 20
 21
 22@converter
 23class PrimaryKeyRelatedFieldConverter:
 24    """Custom primary key field converter which is aware of non-integer based PKs
 25
 26    This is not an exhaustive fix for other non-int PKs, however in authentik we either
 27    use UUIDs or ints"""
 28
 29    field_class = PrimaryKeyRelatedField
 30
 31    def convert(self, field: PrimaryKeyRelatedField):
 32        model: Model = field.queryset.model
 33        pk_field = model._meta.pk
 34        if isinstance(pk_field, OneToOneField):
 35            pk_field = pk_field.related_fields[0][1]
 36        if isinstance(pk_field, fields.UUIDField):
 37            return {"type": "string", "format": "uuid"}
 38        return {"type": "integer"}
 39
 40
 41class SchemaBuilder:
 42    """Generate JSON Schema for blueprints"""
 43
 44    schema: dict
 45
 46    def __init__(self):
 47        self.schema = {
 48            "$schema": "http://json-schema.org/draft-07/schema",
 49            "$id": "https://goauthentik.io/blueprints/schema.json",
 50            "type": "object",
 51            "title": f"authentik {authentik_version()} Blueprint schema",
 52            "required": ["version", "entries"],
 53            "properties": {
 54                "version": {
 55                    "$id": "#/properties/version",
 56                    "type": "integer",
 57                    "title": "Blueprint version",
 58                    "default": 1,
 59                },
 60                "metadata": {
 61                    "$id": "#/properties/metadata",
 62                    "type": "object",
 63                    "required": ["name"],
 64                    "properties": {
 65                        "name": {"type": "string"},
 66                        "labels": {"type": "object", "additionalProperties": {"type": "string"}},
 67                    },
 68                },
 69                "context": {
 70                    "$id": "#/properties/context",
 71                    "type": "object",
 72                    "additionalProperties": True,
 73                },
 74                "entries": {
 75                    "anyOf": [
 76                        {
 77                            "type": "array",
 78                            "items": {"$ref": "#/$defs/blueprint_entry"},
 79                        },
 80                        {
 81                            "type": "object",
 82                            "additionalProperties": {
 83                                "type": "array",
 84                                "items": {"$ref": "#/$defs/blueprint_entry"},
 85                            },
 86                        },
 87                    ],
 88                },
 89            },
 90            "$defs": {"blueprint_entry": {"oneOf": []}},
 91        }
 92
 93    @staticmethod
 94    def json_default(value: Any) -> Any:
 95        """Helper that handles gettext_lazy strings that JSON doesn't handle"""
 96        return str(value)
 97
 98    def build(self):
 99        """Build all models into the schema"""
100        for model in registry.get_models():
101            if issubclass(model, BaseMetaModel):
102                serializer_class = model.serializer()
103            else:
104                if model._meta.abstract:
105                    continue
106                if not is_model_allowed(model):
107                    continue
108                model_instance: Model = model()
109                if not isinstance(model_instance, SerializerModel):
110                    continue
111                try:
112                    serializer_class = model_instance.serializer
113                except NotImplementedError as exc:
114                    raise ValueError(f"SerializerModel not implemented by {model}") from exc
115            serializer = serializer_class(
116                context={
117                    SERIALIZER_CONTEXT_BLUEPRINT: False,
118                }
119            )
120            model_path = f"{model._meta.app_label}.{model._meta.model_name}"
121            self.schema["$defs"]["blueprint_entry"]["oneOf"].append(
122                self.template_entry(model_path, model, serializer)
123            )
124
125    def template_entry(self, model_path: str, model: type[Model], serializer: Serializer) -> dict:
126        """Template entry for a single model"""
127        model_schema = self.to_jsonschema(serializer)
128        model_schema["required"] = []
129        def_name = f"model_{model_path}"
130        def_path = f"#/$defs/{def_name}"
131        self.schema["$defs"][def_name] = model_schema
132        def_name_perm = f"model_{model_path}_permissions"
133        def_path_perm = f"#/$defs/{def_name_perm}"
134        self.schema["$defs"][def_name_perm] = self.model_permissions(model)
135        template = {
136            "type": "object",
137            "required": ["model", "identifiers"],
138            "properties": {
139                "model": {"const": model_path},
140                "id": {"type": "string"},
141                "state": {
142                    "type": "string",
143                    "enum": sorted([s.value for s in BlueprintEntryDesiredState]),
144                    "default": "present",
145                },
146                "conditions": {"type": "array", "items": {"type": "boolean"}},
147                "permissions": {"$ref": def_path_perm},
148                "attrs": {"$ref": def_path},
149                "identifiers": {"$ref": def_path},
150            },
151        }
152        # Meta models don't require identifiers, as there's no matching database model to find
153        if issubclass(model, BaseMetaModel):
154            del template["properties"]["identifiers"]
155            template["required"].remove("identifiers")
156        return template
157
158    def field_to_jsonschema(self, field: Field) -> dict:
159        """Convert a single field to json schema"""
160        if isinstance(field, Serializer):
161            result = self.to_jsonschema(field)
162        else:
163            try:
164                converter = field_to_converter[field]
165                result = converter.convert(field)
166            except KeyError:
167                if isinstance(field, JSONField):
168                    result = {"type": "object", "additionalProperties": True}
169                elif isinstance(field, UUIDField):
170                    result = {"type": "string", "format": "uuid"}
171                else:
172                    raise
173        if field.label:
174            result["title"] = field.label
175        if field.help_text:
176            result["description"] = field.help_text
177        return self.clean_result(result)
178
179    def clean_result(self, result: dict) -> dict:
180        """Remove enumNames from result, recursively"""
181        result.pop("enumNames", None)
182        for key, value in result.items():
183            if isinstance(value, dict):
184                result[key] = self.clean_result(value)
185        return result
186
187    def to_jsonschema(self, serializer: Serializer) -> dict:
188        """Convert serializer to json schema"""
189        properties = {}
190        required = []
191        for name, field in serializer.fields.items():
192            if field.read_only:
193                continue
194            sub_schema = self.field_to_jsonschema(field)
195            if field.required:
196                required.append(name)
197            properties[name] = sub_schema
198
199        result = {"type": "object", "properties": properties}
200        if required:
201            result["required"] = required
202        return result
203
204    def model_permissions(self, model: type[Model]) -> dict:
205        perms = [x[0] for x in model._meta.permissions]
206        for action in model._meta.default_permissions:
207            perms.append(f"{action}_{model._meta.model_name}")
208        return {
209            "type": "array",
210            "items": {
211                "type": "object",
212                "required": ["permission"],
213                "properties": {
214                    "permission": {"type": "string", "enum": sorted(perms)},
215                    "user": {"type": "integer"},
216                    "role": {"type": "string"},
217                },
218            },
219        }
LOGGER = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
@converter
class PrimaryKeyRelatedFieldConverter:
23@converter
24class PrimaryKeyRelatedFieldConverter:
25    """Custom primary key field converter which is aware of non-integer based PKs
26
27    This is not an exhaustive fix for other non-int PKs, however in authentik we either
28    use UUIDs or ints"""
29
30    field_class = PrimaryKeyRelatedField
31
32    def convert(self, field: PrimaryKeyRelatedField):
33        model: Model = field.queryset.model
34        pk_field = model._meta.pk
35        if isinstance(pk_field, OneToOneField):
36            pk_field = pk_field.related_fields[0][1]
37        if isinstance(pk_field, fields.UUIDField):
38            return {"type": "string", "format": "uuid"}
39        return {"type": "integer"}

Custom primary key field converter which is aware of non-integer based PKs

This is not an exhaustive fix for other non-int PKs, however in authentik we either use UUIDs or ints

field_class = <class 'rest_framework.relations.PrimaryKeyRelatedField'>
def convert(self, field: rest_framework.relations.PrimaryKeyRelatedField):
32    def convert(self, field: PrimaryKeyRelatedField):
33        model: Model = field.queryset.model
34        pk_field = model._meta.pk
35        if isinstance(pk_field, OneToOneField):
36            pk_field = pk_field.related_fields[0][1]
37        if isinstance(pk_field, fields.UUIDField):
38            return {"type": "string", "format": "uuid"}
39        return {"type": "integer"}
class SchemaBuilder:
 42class SchemaBuilder:
 43    """Generate JSON Schema for blueprints"""
 44
 45    schema: dict
 46
 47    def __init__(self):
 48        self.schema = {
 49            "$schema": "http://json-schema.org/draft-07/schema",
 50            "$id": "https://goauthentik.io/blueprints/schema.json",
 51            "type": "object",
 52            "title": f"authentik {authentik_version()} Blueprint schema",
 53            "required": ["version", "entries"],
 54            "properties": {
 55                "version": {
 56                    "$id": "#/properties/version",
 57                    "type": "integer",
 58                    "title": "Blueprint version",
 59                    "default": 1,
 60                },
 61                "metadata": {
 62                    "$id": "#/properties/metadata",
 63                    "type": "object",
 64                    "required": ["name"],
 65                    "properties": {
 66                        "name": {"type": "string"},
 67                        "labels": {"type": "object", "additionalProperties": {"type": "string"}},
 68                    },
 69                },
 70                "context": {
 71                    "$id": "#/properties/context",
 72                    "type": "object",
 73                    "additionalProperties": True,
 74                },
 75                "entries": {
 76                    "anyOf": [
 77                        {
 78                            "type": "array",
 79                            "items": {"$ref": "#/$defs/blueprint_entry"},
 80                        },
 81                        {
 82                            "type": "object",
 83                            "additionalProperties": {
 84                                "type": "array",
 85                                "items": {"$ref": "#/$defs/blueprint_entry"},
 86                            },
 87                        },
 88                    ],
 89                },
 90            },
 91            "$defs": {"blueprint_entry": {"oneOf": []}},
 92        }
 93
 94    @staticmethod
 95    def json_default(value: Any) -> Any:
 96        """Helper that handles gettext_lazy strings that JSON doesn't handle"""
 97        return str(value)
 98
 99    def build(self):
100        """Build all models into the schema"""
101        for model in registry.get_models():
102            if issubclass(model, BaseMetaModel):
103                serializer_class = model.serializer()
104            else:
105                if model._meta.abstract:
106                    continue
107                if not is_model_allowed(model):
108                    continue
109                model_instance: Model = model()
110                if not isinstance(model_instance, SerializerModel):
111                    continue
112                try:
113                    serializer_class = model_instance.serializer
114                except NotImplementedError as exc:
115                    raise ValueError(f"SerializerModel not implemented by {model}") from exc
116            serializer = serializer_class(
117                context={
118                    SERIALIZER_CONTEXT_BLUEPRINT: False,
119                }
120            )
121            model_path = f"{model._meta.app_label}.{model._meta.model_name}"
122            self.schema["$defs"]["blueprint_entry"]["oneOf"].append(
123                self.template_entry(model_path, model, serializer)
124            )
125
126    def template_entry(self, model_path: str, model: type[Model], serializer: Serializer) -> dict:
127        """Template entry for a single model"""
128        model_schema = self.to_jsonschema(serializer)
129        model_schema["required"] = []
130        def_name = f"model_{model_path}"
131        def_path = f"#/$defs/{def_name}"
132        self.schema["$defs"][def_name] = model_schema
133        def_name_perm = f"model_{model_path}_permissions"
134        def_path_perm = f"#/$defs/{def_name_perm}"
135        self.schema["$defs"][def_name_perm] = self.model_permissions(model)
136        template = {
137            "type": "object",
138            "required": ["model", "identifiers"],
139            "properties": {
140                "model": {"const": model_path},
141                "id": {"type": "string"},
142                "state": {
143                    "type": "string",
144                    "enum": sorted([s.value for s in BlueprintEntryDesiredState]),
145                    "default": "present",
146                },
147                "conditions": {"type": "array", "items": {"type": "boolean"}},
148                "permissions": {"$ref": def_path_perm},
149                "attrs": {"$ref": def_path},
150                "identifiers": {"$ref": def_path},
151            },
152        }
153        # Meta models don't require identifiers, as there's no matching database model to find
154        if issubclass(model, BaseMetaModel):
155            del template["properties"]["identifiers"]
156            template["required"].remove("identifiers")
157        return template
158
159    def field_to_jsonschema(self, field: Field) -> dict:
160        """Convert a single field to json schema"""
161        if isinstance(field, Serializer):
162            result = self.to_jsonschema(field)
163        else:
164            try:
165                converter = field_to_converter[field]
166                result = converter.convert(field)
167            except KeyError:
168                if isinstance(field, JSONField):
169                    result = {"type": "object", "additionalProperties": True}
170                elif isinstance(field, UUIDField):
171                    result = {"type": "string", "format": "uuid"}
172                else:
173                    raise
174        if field.label:
175            result["title"] = field.label
176        if field.help_text:
177            result["description"] = field.help_text
178        return self.clean_result(result)
179
180    def clean_result(self, result: dict) -> dict:
181        """Remove enumNames from result, recursively"""
182        result.pop("enumNames", None)
183        for key, value in result.items():
184            if isinstance(value, dict):
185                result[key] = self.clean_result(value)
186        return result
187
188    def to_jsonschema(self, serializer: Serializer) -> dict:
189        """Convert serializer to json schema"""
190        properties = {}
191        required = []
192        for name, field in serializer.fields.items():
193            if field.read_only:
194                continue
195            sub_schema = self.field_to_jsonschema(field)
196            if field.required:
197                required.append(name)
198            properties[name] = sub_schema
199
200        result = {"type": "object", "properties": properties}
201        if required:
202            result["required"] = required
203        return result
204
205    def model_permissions(self, model: type[Model]) -> dict:
206        perms = [x[0] for x in model._meta.permissions]
207        for action in model._meta.default_permissions:
208            perms.append(f"{action}_{model._meta.model_name}")
209        return {
210            "type": "array",
211            "items": {
212                "type": "object",
213                "required": ["permission"],
214                "properties": {
215                    "permission": {"type": "string", "enum": sorted(perms)},
216                    "user": {"type": "integer"},
217                    "role": {"type": "string"},
218                },
219            },
220        }

Generate JSON Schema for blueprints

schema: dict
@staticmethod
def json_default(value: Any) -> Any:
94    @staticmethod
95    def json_default(value: Any) -> Any:
96        """Helper that handles gettext_lazy strings that JSON doesn't handle"""
97        return str(value)

Helper that handles gettext_lazy strings that JSON doesn't handle

def build(self):
 99    def build(self):
100        """Build all models into the schema"""
101        for model in registry.get_models():
102            if issubclass(model, BaseMetaModel):
103                serializer_class = model.serializer()
104            else:
105                if model._meta.abstract:
106                    continue
107                if not is_model_allowed(model):
108                    continue
109                model_instance: Model = model()
110                if not isinstance(model_instance, SerializerModel):
111                    continue
112                try:
113                    serializer_class = model_instance.serializer
114                except NotImplementedError as exc:
115                    raise ValueError(f"SerializerModel not implemented by {model}") from exc
116            serializer = serializer_class(
117                context={
118                    SERIALIZER_CONTEXT_BLUEPRINT: False,
119                }
120            )
121            model_path = f"{model._meta.app_label}.{model._meta.model_name}"
122            self.schema["$defs"]["blueprint_entry"]["oneOf"].append(
123                self.template_entry(model_path, model, serializer)
124            )

Build all models into the schema

def template_entry( self, model_path: str, model: type[django.db.models.base.Model], serializer: rest_framework.serializers.Serializer) -> dict:
126    def template_entry(self, model_path: str, model: type[Model], serializer: Serializer) -> dict:
127        """Template entry for a single model"""
128        model_schema = self.to_jsonschema(serializer)
129        model_schema["required"] = []
130        def_name = f"model_{model_path}"
131        def_path = f"#/$defs/{def_name}"
132        self.schema["$defs"][def_name] = model_schema
133        def_name_perm = f"model_{model_path}_permissions"
134        def_path_perm = f"#/$defs/{def_name_perm}"
135        self.schema["$defs"][def_name_perm] = self.model_permissions(model)
136        template = {
137            "type": "object",
138            "required": ["model", "identifiers"],
139            "properties": {
140                "model": {"const": model_path},
141                "id": {"type": "string"},
142                "state": {
143                    "type": "string",
144                    "enum": sorted([s.value for s in BlueprintEntryDesiredState]),
145                    "default": "present",
146                },
147                "conditions": {"type": "array", "items": {"type": "boolean"}},
148                "permissions": {"$ref": def_path_perm},
149                "attrs": {"$ref": def_path},
150                "identifiers": {"$ref": def_path},
151            },
152        }
153        # Meta models don't require identifiers, as there's no matching database model to find
154        if issubclass(model, BaseMetaModel):
155            del template["properties"]["identifiers"]
156            template["required"].remove("identifiers")
157        return template

Template entry for a single model

def field_to_jsonschema(self, field: rest_framework.fields.Field) -> dict:
159    def field_to_jsonschema(self, field: Field) -> dict:
160        """Convert a single field to json schema"""
161        if isinstance(field, Serializer):
162            result = self.to_jsonschema(field)
163        else:
164            try:
165                converter = field_to_converter[field]
166                result = converter.convert(field)
167            except KeyError:
168                if isinstance(field, JSONField):
169                    result = {"type": "object", "additionalProperties": True}
170                elif isinstance(field, UUIDField):
171                    result = {"type": "string", "format": "uuid"}
172                else:
173                    raise
174        if field.label:
175            result["title"] = field.label
176        if field.help_text:
177            result["description"] = field.help_text
178        return self.clean_result(result)

Convert a single field to json schema

def clean_result(self, result: dict) -> dict:
180    def clean_result(self, result: dict) -> dict:
181        """Remove enumNames from result, recursively"""
182        result.pop("enumNames", None)
183        for key, value in result.items():
184            if isinstance(value, dict):
185                result[key] = self.clean_result(value)
186        return result

Remove enumNames from result, recursively

def to_jsonschema(self, serializer: rest_framework.serializers.Serializer) -> dict:
188    def to_jsonschema(self, serializer: Serializer) -> dict:
189        """Convert serializer to json schema"""
190        properties = {}
191        required = []
192        for name, field in serializer.fields.items():
193            if field.read_only:
194                continue
195            sub_schema = self.field_to_jsonschema(field)
196            if field.required:
197                required.append(name)
198            properties[name] = sub_schema
199
200        result = {"type": "object", "properties": properties}
201        if required:
202            result["required"] = required
203        return result

Convert serializer to json schema

def model_permissions(self, model: type[django.db.models.base.Model]) -> dict:
205    def model_permissions(self, model: type[Model]) -> dict:
206        perms = [x[0] for x in model._meta.permissions]
207        for action in model._meta.default_permissions:
208            perms.append(f"{action}_{model._meta.model_name}")
209        return {
210            "type": "array",
211            "items": {
212                "type": "object",
213                "required": ["permission"],
214                "properties": {
215                    "permission": {"type": "string", "enum": sorted(perms)},
216                    "user": {"type": "integer"},
217                    "role": {"type": "string"},
218                },
219            },
220        }