authentik.blueprints.api

Serializer mixin for managed models

  1"""Serializer mixin for managed models"""
  2
  3from json import JSONDecodeError, loads
  4from typing import cast
  5
  6from django.conf import settings
  7from django.core.files.uploadedfile import InMemoryUploadedFile
  8from django.utils.translation import gettext_lazy as _
  9from drf_spectacular.utils import extend_schema, inline_serializer
 10from rest_framework.decorators import action
 11from rest_framework.exceptions import PermissionDenied, ValidationError
 12from rest_framework.fields import (
 13    BooleanField,
 14    CharField,
 15    DateTimeField,
 16    FileField,
 17)
 18from rest_framework.parsers import MultiPartParser
 19from rest_framework.request import Request
 20from rest_framework.response import Response
 21from rest_framework.serializers import ListSerializer
 22from rest_framework.viewsets import ModelViewSet
 23
 24from authentik.api.validation import validate
 25from authentik.blueprints.models import BlueprintInstance
 26from authentik.blueprints.v1.common import Blueprint
 27from authentik.blueprints.v1.importer import Importer
 28from authentik.blueprints.v1.oci import OCI_PREFIX
 29from authentik.blueprints.v1.tasks import apply_blueprint, blueprints_find_dict
 30from authentik.core.api.used_by import UsedByMixin
 31from authentik.core.api.utils import JSONDictField, ModelSerializer, PassiveSerializer
 32from authentik.core.models import User
 33from authentik.events.logs import LogEventSerializer
 34from authentik.rbac.decorators import permission_required
 35
 36
 37def get_blueprints():
 38    if settings.DEBUG:
 39        return blueprints_find_dict()
 40    return blueprints_find_dict.send().get_result(block=True)
 41
 42
 43class BlueprintUploadSerializer(PassiveSerializer):
 44    """Serializer to upload file"""
 45
 46    file = FileField(required=False)
 47    path = CharField(required=False)
 48    context = CharField(required=False, allow_blank=True)
 49
 50    def validate_path(self, path: str) -> str:
 51        """Ensure the path (if set) specified is retrievable"""
 52        if path == "":
 53            return path
 54        files: list[dict] = get_blueprints()
 55        if path not in [file["path"] for file in files]:
 56            raise ValidationError(_("Blueprint file does not exist"))
 57        return path
 58
 59    def validate_context(self, context: str) -> dict:
 60        """Parse context as a JSON object"""
 61        if not context:
 62            return {}
 63        try:
 64            parsed = loads(context)
 65        except JSONDecodeError as exc:
 66            raise ValidationError(_("Context must be valid JSON")) from exc
 67        if not isinstance(parsed, dict):
 68            raise ValidationError(_("Context must be a JSON object"))
 69        return parsed
 70
 71
 72class ManagedSerializer:
 73    """Managed Serializer"""
 74
 75    managed = CharField(read_only=True, allow_null=True)
 76
 77
 78class MetadataSerializer(PassiveSerializer):
 79    """Serializer for blueprint metadata"""
 80
 81    name = CharField()
 82    labels = JSONDictField()
 83
 84
 85class BlueprintInstanceSerializer(ModelSerializer):
 86    """Info about a single blueprint instance file"""
 87
 88    def validate_path(self, path: str) -> str:
 89        """Ensure the path (if set) specified is retrievable"""
 90        if path == "" or path.startswith(OCI_PREFIX):
 91            return path
 92        files: list[dict] = get_blueprints()
 93        if path not in [file["path"] for file in files]:
 94            raise ValidationError(_("Blueprint file does not exist"))
 95        return path
 96
 97    def validate_content(self, content: str) -> str:
 98        """Ensure content (if set) is a valid blueprint"""
 99        if content == "":
100            return content
101        context = self.instance.context if self.instance else {}
102        valid, logs = Importer.from_string(content, context).validate()
103        if not valid:
104            raise ValidationError(
105                [
106                    _("Failed to validate blueprint"),
107                    *[f"- {x.event}" for x in logs],
108                ]
109            )
110        return content
111
112    def validate(self, attrs: dict) -> dict:
113        if attrs.get("path", "") == "" and attrs.get("content", "") == "":
114            raise ValidationError(_("Either path or content must be set."))
115        return super().validate(attrs)
116
117    class Meta:
118        model = BlueprintInstance
119        fields = [
120            "pk",
121            "name",
122            "path",
123            "context",
124            "last_applied",
125            "last_applied_hash",
126            "status",
127            "enabled",
128            "managed_models",
129            "metadata",
130            "content",
131        ]
132        extra_kwargs = {
133            "status": {"read_only": True},
134            "last_applied": {"read_only": True},
135            "last_applied_hash": {"read_only": True},
136            "managed_models": {"read_only": True},
137            "metadata": {"read_only": True},
138        }
139
140
141def check_blueprint_perms(blueprint: Blueprint, user: User, explicit_action: str | None = None):
142    """Check for individual permissions for each model in a blueprint"""
143    for entry in blueprint.iter_entries():
144        full_model = entry.get_model(blueprint)
145        app, __, model = full_model.partition(".")
146        perms = [
147            f"{app}.add_{model}",
148            f"{app}.change_{model}",
149            f"{app}.delete_{model}",
150        ]
151        if explicit_action:
152            perms = [f"{app}.{explicit_action}_{model}"]
153        for perm in perms:
154            if not user.has_perm(perm):
155                raise PermissionDenied(
156                    {
157                        entry.id: _(
158                            "User lacks permission to create {model}".format_map(
159                                {
160                                    "model": full_model,
161                                }
162                            )
163                        )
164                    }
165                )
166
167
168class BlueprintInstanceViewSet(UsedByMixin, ModelViewSet):
169    """Blueprint instances"""
170
171    serializer_class = BlueprintInstanceSerializer
172    queryset = BlueprintInstance.objects.all()
173    search_fields = ["name", "path"]
174    filterset_fields = ["name", "path"]
175    ordering = ["name"]
176
177    class BlueprintImportResultSerializer(PassiveSerializer):
178        """Logs of an attempted blueprint import"""
179
180        logs = LogEventSerializer(many=True, read_only=True)
181        success = BooleanField(read_only=True)
182
183    @extend_schema(
184        responses={
185            200: ListSerializer(
186                child=inline_serializer(
187                    "BlueprintFile",
188                    fields={
189                        "path": CharField(),
190                        "last_m": DateTimeField(),
191                        "hash": CharField(),
192                        "meta": MetadataSerializer(required=False, read_only=True),
193                    },
194                )
195            )
196        }
197    )
198    @action(detail=False, pagination_class=None, filter_backends=[])
199    def available(self, request: Request) -> Response:
200        """Get blueprints"""
201        files: list[dict] = get_blueprints()
202        return Response(files)
203
204    @permission_required("authentik_blueprints.view_blueprintinstance")
205    @extend_schema(
206        request=None,
207        responses={
208            200: BlueprintInstanceSerializer(),
209        },
210    )
211    @action(detail=True, pagination_class=None, filter_backends=[], methods=["POST"])
212    def apply(self, request: Request, *args, **kwargs) -> Response:
213        """Apply a blueprint"""
214        blueprint = self.get_object()
215        apply_blueprint.send_with_options(args=(blueprint.pk,), rel_obj=blueprint)
216        return self.retrieve(request, *args, **kwargs)
217
218    @extend_schema(
219        request={"multipart/form-data": BlueprintUploadSerializer},
220        responses={200: BlueprintImportResultSerializer},
221    )
222    @action(url_path="import", detail=False, methods=["POST"], parser_classes=(MultiPartParser,))
223    @validate(
224        BlueprintUploadSerializer,
225    )
226    def import_(self, request: Request, body: BlueprintUploadSerializer) -> Response:
227        """Import blueprint from .yaml file and apply it once, without creating an instance"""
228        string_contents = ""
229        if body.validated_data.get("file"):
230            file = cast(InMemoryUploadedFile, body.validated_data["file"])
231            string_contents = file.read().decode()
232        elif body.validated_data.get("path"):
233            string_contents = BlueprintInstance(
234                path=body.validated_data.get("path")
235            ).retrieve_file()
236        else:
237            raise ValidationError("Either path or file must be set")
238        context = body.validated_data.get("context") or {}
239        importer = Importer.from_string(string_contents, context)
240
241        check_blueprint_perms(importer.blueprint, request.user)
242
243        valid, logs = importer.validate()
244
245        import_response = self.BlueprintImportResultSerializer(
246            data={
247                "logs": [LogEventSerializer(log).data for log in logs],
248                "success": valid,
249            }
250        )
251        import_response.is_valid(raise_exception=True)
252
253        if valid:
254            import_response.initial_data["success"] = importer.apply()
255            import_response.is_valid()
256        return Response(data=import_response.initial_data, status=200)
def get_blueprints():
38def get_blueprints():
39    if settings.DEBUG:
40        return blueprints_find_dict()
41    return blueprints_find_dict.send().get_result(block=True)
class BlueprintUploadSerializer(authentik.core.api.utils.PassiveSerializer):
44class BlueprintUploadSerializer(PassiveSerializer):
45    """Serializer to upload file"""
46
47    file = FileField(required=False)
48    path = CharField(required=False)
49    context = CharField(required=False, allow_blank=True)
50
51    def validate_path(self, path: str) -> str:
52        """Ensure the path (if set) specified is retrievable"""
53        if path == "":
54            return path
55        files: list[dict] = get_blueprints()
56        if path not in [file["path"] for file in files]:
57            raise ValidationError(_("Blueprint file does not exist"))
58        return path
59
60    def validate_context(self, context: str) -> dict:
61        """Parse context as a JSON object"""
62        if not context:
63            return {}
64        try:
65            parsed = loads(context)
66        except JSONDecodeError as exc:
67            raise ValidationError(_("Context must be valid JSON")) from exc
68        if not isinstance(parsed, dict):
69            raise ValidationError(_("Context must be a JSON object"))
70        return parsed

Serializer to upload file

file
path
context
614    @property
615    def context(self):
616        """
617        Returns the context as passed to the root serializer on initialization.
618        """
619        return getattr(self.root, '_context', {})

Returns the context as passed to the root serializer on initialization.

def validate_path(self, path: str) -> str:
51    def validate_path(self, path: str) -> str:
52        """Ensure the path (if set) specified is retrievable"""
53        if path == "":
54            return path
55        files: list[dict] = get_blueprints()
56        if path not in [file["path"] for file in files]:
57            raise ValidationError(_("Blueprint file does not exist"))
58        return path

Ensure the path (if set) specified is retrievable

def validate_context(self, context: str) -> dict:
60    def validate_context(self, context: str) -> dict:
61        """Parse context as a JSON object"""
62        if not context:
63            return {}
64        try:
65            parsed = loads(context)
66        except JSONDecodeError as exc:
67            raise ValidationError(_("Context must be valid JSON")) from exc
68        if not isinstance(parsed, dict):
69            raise ValidationError(_("Context must be a JSON object"))
70        return parsed

Parse context as a JSON object

class ManagedSerializer:
73class ManagedSerializer:
74    """Managed Serializer"""
75
76    managed = CharField(read_only=True, allow_null=True)

Managed Serializer

managed = CharField(allow_null=True, read_only=True)
class MetadataSerializer(authentik.core.api.utils.PassiveSerializer):
79class MetadataSerializer(PassiveSerializer):
80    """Serializer for blueprint metadata"""
81
82    name = CharField()
83    labels = JSONDictField()

Serializer for blueprint metadata

name
labels
class BlueprintInstanceSerializer(authentik.core.api.utils.ModelSerializer):
 86class BlueprintInstanceSerializer(ModelSerializer):
 87    """Info about a single blueprint instance file"""
 88
 89    def validate_path(self, path: str) -> str:
 90        """Ensure the path (if set) specified is retrievable"""
 91        if path == "" or path.startswith(OCI_PREFIX):
 92            return path
 93        files: list[dict] = get_blueprints()
 94        if path not in [file["path"] for file in files]:
 95            raise ValidationError(_("Blueprint file does not exist"))
 96        return path
 97
 98    def validate_content(self, content: str) -> str:
 99        """Ensure content (if set) is a valid blueprint"""
100        if content == "":
101            return content
102        context = self.instance.context if self.instance else {}
103        valid, logs = Importer.from_string(content, context).validate()
104        if not valid:
105            raise ValidationError(
106                [
107                    _("Failed to validate blueprint"),
108                    *[f"- {x.event}" for x in logs],
109                ]
110            )
111        return content
112
113    def validate(self, attrs: dict) -> dict:
114        if attrs.get("path", "") == "" and attrs.get("content", "") == "":
115            raise ValidationError(_("Either path or content must be set."))
116        return super().validate(attrs)
117
118    class Meta:
119        model = BlueprintInstance
120        fields = [
121            "pk",
122            "name",
123            "path",
124            "context",
125            "last_applied",
126            "last_applied_hash",
127            "status",
128            "enabled",
129            "managed_models",
130            "metadata",
131            "content",
132        ]
133        extra_kwargs = {
134            "status": {"read_only": True},
135            "last_applied": {"read_only": True},
136            "last_applied_hash": {"read_only": True},
137            "managed_models": {"read_only": True},
138            "metadata": {"read_only": True},
139        }

Info about a single blueprint instance file

def validate_path(self, path: str) -> str:
89    def validate_path(self, path: str) -> str:
90        """Ensure the path (if set) specified is retrievable"""
91        if path == "" or path.startswith(OCI_PREFIX):
92            return path
93        files: list[dict] = get_blueprints()
94        if path not in [file["path"] for file in files]:
95            raise ValidationError(_("Blueprint file does not exist"))
96        return path

Ensure the path (if set) specified is retrievable

def validate_content(self, content: str) -> str:
 98    def validate_content(self, content: str) -> str:
 99        """Ensure content (if set) is a valid blueprint"""
100        if content == "":
101            return content
102        context = self.instance.context if self.instance else {}
103        valid, logs = Importer.from_string(content, context).validate()
104        if not valid:
105            raise ValidationError(
106                [
107                    _("Failed to validate blueprint"),
108                    *[f"- {x.event}" for x in logs],
109                ]
110            )
111        return content

Ensure content (if set) is a valid blueprint

def validate(self, attrs: dict) -> dict:
113    def validate(self, attrs: dict) -> dict:
114        if attrs.get("path", "") == "" and attrs.get("content", "") == "":
115            raise ValidationError(_("Either path or content must be set."))
116        return super().validate(attrs)
class BlueprintInstanceSerializer.Meta:
118    class Meta:
119        model = BlueprintInstance
120        fields = [
121            "pk",
122            "name",
123            "path",
124            "context",
125            "last_applied",
126            "last_applied_hash",
127            "status",
128            "enabled",
129            "managed_models",
130            "metadata",
131            "content",
132        ]
133        extra_kwargs = {
134            "status": {"read_only": True},
135            "last_applied": {"read_only": True},
136            "last_applied_hash": {"read_only": True},
137            "managed_models": {"read_only": True},
138            "metadata": {"read_only": True},
139        }
fields = ['pk', 'name', 'path', 'context', 'last_applied', 'last_applied_hash', 'status', 'enabled', 'managed_models', 'metadata', 'content']
extra_kwargs = {'status': {'read_only': True}, 'last_applied': {'read_only': True}, 'last_applied_hash': {'read_only': True}, 'managed_models': {'read_only': True}, 'metadata': {'read_only': True}}
def check_blueprint_perms( blueprint: authentik.blueprints.v1.common.Blueprint, user: authentik.core.models.User, explicit_action: str | None = None):
142def check_blueprint_perms(blueprint: Blueprint, user: User, explicit_action: str | None = None):
143    """Check for individual permissions for each model in a blueprint"""
144    for entry in blueprint.iter_entries():
145        full_model = entry.get_model(blueprint)
146        app, __, model = full_model.partition(".")
147        perms = [
148            f"{app}.add_{model}",
149            f"{app}.change_{model}",
150            f"{app}.delete_{model}",
151        ]
152        if explicit_action:
153            perms = [f"{app}.{explicit_action}_{model}"]
154        for perm in perms:
155            if not user.has_perm(perm):
156                raise PermissionDenied(
157                    {
158                        entry.id: _(
159                            "User lacks permission to create {model}".format_map(
160                                {
161                                    "model": full_model,
162                                }
163                            )
164                        )
165                    }
166                )

Check for individual permissions for each model in a blueprint

class BlueprintInstanceViewSet(authentik.core.api.used_by.UsedByMixin, rest_framework.viewsets.ModelViewSet):
169class BlueprintInstanceViewSet(UsedByMixin, ModelViewSet):
170    """Blueprint instances"""
171
172    serializer_class = BlueprintInstanceSerializer
173    queryset = BlueprintInstance.objects.all()
174    search_fields = ["name", "path"]
175    filterset_fields = ["name", "path"]
176    ordering = ["name"]
177
178    class BlueprintImportResultSerializer(PassiveSerializer):
179        """Logs of an attempted blueprint import"""
180
181        logs = LogEventSerializer(many=True, read_only=True)
182        success = BooleanField(read_only=True)
183
184    @extend_schema(
185        responses={
186            200: ListSerializer(
187                child=inline_serializer(
188                    "BlueprintFile",
189                    fields={
190                        "path": CharField(),
191                        "last_m": DateTimeField(),
192                        "hash": CharField(),
193                        "meta": MetadataSerializer(required=False, read_only=True),
194                    },
195                )
196            )
197        }
198    )
199    @action(detail=False, pagination_class=None, filter_backends=[])
200    def available(self, request: Request) -> Response:
201        """Get blueprints"""
202        files: list[dict] = get_blueprints()
203        return Response(files)
204
205    @permission_required("authentik_blueprints.view_blueprintinstance")
206    @extend_schema(
207        request=None,
208        responses={
209            200: BlueprintInstanceSerializer(),
210        },
211    )
212    @action(detail=True, pagination_class=None, filter_backends=[], methods=["POST"])
213    def apply(self, request: Request, *args, **kwargs) -> Response:
214        """Apply a blueprint"""
215        blueprint = self.get_object()
216        apply_blueprint.send_with_options(args=(blueprint.pk,), rel_obj=blueprint)
217        return self.retrieve(request, *args, **kwargs)
218
219    @extend_schema(
220        request={"multipart/form-data": BlueprintUploadSerializer},
221        responses={200: BlueprintImportResultSerializer},
222    )
223    @action(url_path="import", detail=False, methods=["POST"], parser_classes=(MultiPartParser,))
224    @validate(
225        BlueprintUploadSerializer,
226    )
227    def import_(self, request: Request, body: BlueprintUploadSerializer) -> Response:
228        """Import blueprint from .yaml file and apply it once, without creating an instance"""
229        string_contents = ""
230        if body.validated_data.get("file"):
231            file = cast(InMemoryUploadedFile, body.validated_data["file"])
232            string_contents = file.read().decode()
233        elif body.validated_data.get("path"):
234            string_contents = BlueprintInstance(
235                path=body.validated_data.get("path")
236            ).retrieve_file()
237        else:
238            raise ValidationError("Either path or file must be set")
239        context = body.validated_data.get("context") or {}
240        importer = Importer.from_string(string_contents, context)
241
242        check_blueprint_perms(importer.blueprint, request.user)
243
244        valid, logs = importer.validate()
245
246        import_response = self.BlueprintImportResultSerializer(
247            data={
248                "logs": [LogEventSerializer(log).data for log in logs],
249                "success": valid,
250            }
251        )
252        import_response.is_valid(raise_exception=True)
253
254        if valid:
255            import_response.initial_data["success"] = importer.apply()
256            import_response.is_valid()
257        return Response(data=import_response.initial_data, status=200)

Blueprint instances

serializer_class = <class 'BlueprintInstanceSerializer'>
queryset = <QuerySet []>
search_fields = ['name', 'path']
filterset_fields = ['name', 'path']
ordering = ['name']
@extend_schema(responses={200: ListSerializer(child=inline_serializer('BlueprintFile', fields={'path': CharField(), 'last_m': DateTimeField(), 'hash': CharField(), 'meta': MetadataSerializer(required=False, read_only=True)}))})
@action(detail=False, pagination_class=None, filter_backends=[])
def available( self, request: rest_framework.request.Request) -> rest_framework.response.Response:
184    @extend_schema(
185        responses={
186            200: ListSerializer(
187                child=inline_serializer(
188                    "BlueprintFile",
189                    fields={
190                        "path": CharField(),
191                        "last_m": DateTimeField(),
192                        "hash": CharField(),
193                        "meta": MetadataSerializer(required=False, read_only=True),
194                    },
195                )
196            )
197        }
198    )
199    @action(detail=False, pagination_class=None, filter_backends=[])
200    def available(self, request: Request) -> Response:
201        """Get blueprints"""
202        files: list[dict] = get_blueprints()
203        return Response(files)

Get blueprints

@permission_required('authentik_blueprints.view_blueprintinstance')
@extend_schema(request=None, responses={200: BlueprintInstanceSerializer()})
@action(detail=True, pagination_class=None, filter_backends=[], methods=['POST'])
def apply( self, request: rest_framework.request.Request, *args, **kwargs) -> rest_framework.response.Response:
205    @permission_required("authentik_blueprints.view_blueprintinstance")
206    @extend_schema(
207        request=None,
208        responses={
209            200: BlueprintInstanceSerializer(),
210        },
211    )
212    @action(detail=True, pagination_class=None, filter_backends=[], methods=["POST"])
213    def apply(self, request: Request, *args, **kwargs) -> Response:
214        """Apply a blueprint"""
215        blueprint = self.get_object()
216        apply_blueprint.send_with_options(args=(blueprint.pk,), rel_obj=blueprint)
217        return self.retrieve(request, *args, **kwargs)

Apply a blueprint

@extend_schema(request={'multipart/form-data': BlueprintUploadSerializer}, responses={200: BlueprintImportResultSerializer})
@action(url_path='import', detail=False, methods=['POST'], parser_classes=(MultiPartParser,))
@validate(BlueprintUploadSerializer)
def import_( self, request: rest_framework.request.Request, body: BlueprintUploadSerializer) -> rest_framework.response.Response:
219    @extend_schema(
220        request={"multipart/form-data": BlueprintUploadSerializer},
221        responses={200: BlueprintImportResultSerializer},
222    )
223    @action(url_path="import", detail=False, methods=["POST"], parser_classes=(MultiPartParser,))
224    @validate(
225        BlueprintUploadSerializer,
226    )
227    def import_(self, request: Request, body: BlueprintUploadSerializer) -> Response:
228        """Import blueprint from .yaml file and apply it once, without creating an instance"""
229        string_contents = ""
230        if body.validated_data.get("file"):
231            file = cast(InMemoryUploadedFile, body.validated_data["file"])
232            string_contents = file.read().decode()
233        elif body.validated_data.get("path"):
234            string_contents = BlueprintInstance(
235                path=body.validated_data.get("path")
236            ).retrieve_file()
237        else:
238            raise ValidationError("Either path or file must be set")
239        context = body.validated_data.get("context") or {}
240        importer = Importer.from_string(string_contents, context)
241
242        check_blueprint_perms(importer.blueprint, request.user)
243
244        valid, logs = importer.validate()
245
246        import_response = self.BlueprintImportResultSerializer(
247            data={
248                "logs": [LogEventSerializer(log).data for log in logs],
249                "success": valid,
250            }
251        )
252        import_response.is_valid(raise_exception=True)
253
254        if valid:
255            import_response.initial_data["success"] = importer.apply()
256            import_response.is_valid()
257        return Response(data=import_response.initial_data, status=200)

Import blueprint from .yaml file and apply it once, without creating an instance

name = None
description = None
suffix = None
detail = None
basename = None
class BlueprintInstanceViewSet.BlueprintImportResultSerializer(authentik.core.api.utils.PassiveSerializer):
178    class BlueprintImportResultSerializer(PassiveSerializer):
179        """Logs of an attempted blueprint import"""
180
181        logs = LogEventSerializer(many=True, read_only=True)
182        success = BooleanField(read_only=True)

Logs of an attempted blueprint import

logs
success