authentik.blueprints.api

Serializer mixin for managed models

  1"""Serializer mixin for managed models"""
  2
  3from typing import cast
  4
  5from django.conf import settings
  6from django.core.files.uploadedfile import InMemoryUploadedFile
  7from django.utils.translation import gettext_lazy as _
  8from drf_spectacular.utils import extend_schema, inline_serializer
  9from rest_framework.decorators import action
 10from rest_framework.exceptions import PermissionDenied, ValidationError
 11from rest_framework.fields import (
 12    BooleanField,
 13    CharField,
 14    DateTimeField,
 15    FileField,
 16)
 17from rest_framework.parsers import MultiPartParser
 18from rest_framework.request import Request
 19from rest_framework.response import Response
 20from rest_framework.serializers import ListSerializer
 21from rest_framework.viewsets import ModelViewSet
 22
 23from authentik.api.validation import validate
 24from authentik.blueprints.models import BlueprintInstance
 25from authentik.blueprints.v1.common import Blueprint
 26from authentik.blueprints.v1.importer import Importer
 27from authentik.blueprints.v1.oci import OCI_PREFIX
 28from authentik.blueprints.v1.tasks import apply_blueprint, blueprints_find_dict
 29from authentik.core.api.used_by import UsedByMixin
 30from authentik.core.api.utils import JSONDictField, ModelSerializer, PassiveSerializer
 31from authentik.core.models import User
 32from authentik.events.logs import LogEventSerializer
 33from authentik.rbac.decorators import permission_required
 34
 35
 36def get_blueprints():
 37    if settings.DEBUG:
 38        return blueprints_find_dict()
 39    return blueprints_find_dict.send().get_result(block=True)
 40
 41
 42class BlueprintUploadSerializer(PassiveSerializer):
 43    """Serializer to upload file"""
 44
 45    file = FileField(required=False)
 46    path = CharField(required=False)
 47
 48    def validate_path(self, path: str) -> str:
 49        """Ensure the path (if set) specified is retrievable"""
 50        if path == "":
 51            return path
 52        files: list[dict] = get_blueprints()
 53        if path not in [file["path"] for file in files]:
 54            raise ValidationError(_("Blueprint file does not exist"))
 55        return path
 56
 57
 58class ManagedSerializer:
 59    """Managed Serializer"""
 60
 61    managed = CharField(read_only=True, allow_null=True)
 62
 63
 64class MetadataSerializer(PassiveSerializer):
 65    """Serializer for blueprint metadata"""
 66
 67    name = CharField()
 68    labels = JSONDictField()
 69
 70
 71class BlueprintInstanceSerializer(ModelSerializer):
 72    """Info about a single blueprint instance file"""
 73
 74    def validate_path(self, path: str) -> str:
 75        """Ensure the path (if set) specified is retrievable"""
 76        if path == "" or path.startswith(OCI_PREFIX):
 77            return path
 78        files: list[dict] = get_blueprints()
 79        if path not in [file["path"] for file in files]:
 80            raise ValidationError(_("Blueprint file does not exist"))
 81        return path
 82
 83    def validate_content(self, content: str) -> str:
 84        """Ensure content (if set) is a valid blueprint"""
 85        if content == "":
 86            return content
 87        context = self.instance.context if self.instance else {}
 88        valid, logs = Importer.from_string(content, context).validate()
 89        if not valid:
 90            raise ValidationError(
 91                [
 92                    _("Failed to validate blueprint"),
 93                    *[f"- {x.event}" for x in logs],
 94                ]
 95            )
 96        return content
 97
 98    def validate(self, attrs: dict) -> dict:
 99        if attrs.get("path", "") == "" and attrs.get("content", "") == "":
100            raise ValidationError(_("Either path or content must be set."))
101        return super().validate(attrs)
102
103    class Meta:
104        model = BlueprintInstance
105        fields = [
106            "pk",
107            "name",
108            "path",
109            "context",
110            "last_applied",
111            "last_applied_hash",
112            "status",
113            "enabled",
114            "managed_models",
115            "metadata",
116            "content",
117        ]
118        extra_kwargs = {
119            "status": {"read_only": True},
120            "last_applied": {"read_only": True},
121            "last_applied_hash": {"read_only": True},
122            "managed_models": {"read_only": True},
123            "metadata": {"read_only": True},
124        }
125
126
127def check_blueprint_perms(blueprint: Blueprint, user: User, explicit_action: str | None = None):
128    """Check for individual permissions for each model in a blueprint"""
129    for entry in blueprint.entries:
130        full_model = entry.get_model(blueprint)
131        app, __, model = full_model.partition(".")
132        perms = [
133            f"{app}.add_{model}",
134            f"{app}.change_{model}",
135            f"{app}.delete_{model}",
136        ]
137        if explicit_action:
138            perms = [f"{app}.{explicit_action}_{model}"]
139        for perm in perms:
140            if not user.has_perm(perm):
141                raise PermissionDenied(
142                    {
143                        entry.id: _(
144                            "User lacks permission to create {model}".format_map(
145                                {
146                                    "model": full_model,
147                                }
148                            )
149                        )
150                    }
151                )
152
153
154class BlueprintInstanceViewSet(UsedByMixin, ModelViewSet):
155    """Blueprint instances"""
156
157    serializer_class = BlueprintInstanceSerializer
158    queryset = BlueprintInstance.objects.all()
159    search_fields = ["name", "path"]
160    filterset_fields = ["name", "path"]
161    ordering = ["name"]
162
163    class BlueprintImportResultSerializer(PassiveSerializer):
164        """Logs of an attempted blueprint import"""
165
166        logs = LogEventSerializer(many=True, read_only=True)
167        success = BooleanField(read_only=True)
168
169    @extend_schema(
170        responses={
171            200: ListSerializer(
172                child=inline_serializer(
173                    "BlueprintFile",
174                    fields={
175                        "path": CharField(),
176                        "last_m": DateTimeField(),
177                        "hash": CharField(),
178                        "meta": MetadataSerializer(required=False, read_only=True),
179                    },
180                )
181            )
182        }
183    )
184    @action(detail=False, pagination_class=None, filter_backends=[])
185    def available(self, request: Request) -> Response:
186        """Get blueprints"""
187        files: list[dict] = get_blueprints()
188        return Response(files)
189
190    @permission_required("authentik_blueprints.view_blueprintinstance")
191    @extend_schema(
192        request=None,
193        responses={
194            200: BlueprintInstanceSerializer(),
195        },
196    )
197    @action(detail=True, pagination_class=None, filter_backends=[], methods=["POST"])
198    def apply(self, request: Request, *args, **kwargs) -> Response:
199        """Apply a blueprint"""
200        blueprint = self.get_object()
201        apply_blueprint.send_with_options(args=(blueprint.pk,), rel_obj=blueprint)
202        return self.retrieve(request, *args, **kwargs)
203
204    @extend_schema(
205        request={"multipart/form-data": BlueprintUploadSerializer},
206        responses={
207            204: BlueprintImportResultSerializer,
208            400: BlueprintImportResultSerializer,
209        },
210    )
211    @action(url_path="import", detail=False, methods=["POST"], parser_classes=(MultiPartParser,))
212    @validate(
213        BlueprintUploadSerializer,
214    )
215    def import_(self, request: Request, body: BlueprintUploadSerializer) -> Response:
216        """Import blueprint from .yaml file and apply it once, without creating an instance"""
217        string_contents = ""
218        if body.validated_data.get("file"):
219            file = cast(InMemoryUploadedFile, body.validated_data["file"])
220            string_contents = file.read().decode()
221        elif body.validated_data.get("path"):
222            string_contents = BlueprintInstance(
223                path=body.validated_data.get("path")
224            ).retrieve_file()
225        else:
226            raise ValidationError("Either path or file must be set")
227        importer = Importer.from_string(string_contents)
228
229        check_blueprint_perms(importer.blueprint, request.user)
230
231        valid, logs = importer.validate()
232
233        import_response = self.BlueprintImportResultSerializer(
234            data={
235                "logs": [],
236                "success": False,
237            }
238        )
239        import_response.is_valid(raise_exception=True)
240
241        import_response.initial_data["logs"] = [LogEventSerializer(log).data for log in logs]
242        import_response.initial_data["success"] = valid
243        import_response.is_valid()
244        if not valid:
245            return Response(data=import_response.initial_data, status=200)
246
247        successful = importer.apply()
248        import_response.initial_data["success"] = successful
249        import_response.is_valid()
250        if not successful:
251            return Response(data=import_response.initial_data, status=200)
252        return Response(data=import_response.initial_data, status=200)
def get_blueprints():
37def get_blueprints():
38    if settings.DEBUG:
39        return blueprints_find_dict()
40    return blueprints_find_dict.send().get_result(block=True)
class BlueprintUploadSerializer(authentik.core.api.utils.PassiveSerializer):
43class BlueprintUploadSerializer(PassiveSerializer):
44    """Serializer to upload file"""
45
46    file = FileField(required=False)
47    path = CharField(required=False)
48
49    def validate_path(self, path: str) -> str:
50        """Ensure the path (if set) specified is retrievable"""
51        if path == "":
52            return path
53        files: list[dict] = get_blueprints()
54        if path not in [file["path"] for file in files]:
55            raise ValidationError(_("Blueprint file does not exist"))
56        return path

Serializer to upload file

file
path
def validate_path(self, path: str) -> str:
49    def validate_path(self, path: str) -> str:
50        """Ensure the path (if set) specified is retrievable"""
51        if path == "":
52            return path
53        files: list[dict] = get_blueprints()
54        if path not in [file["path"] for file in files]:
55            raise ValidationError(_("Blueprint file does not exist"))
56        return path

Ensure the path (if set) specified is retrievable

class ManagedSerializer:
59class ManagedSerializer:
60    """Managed Serializer"""
61
62    managed = CharField(read_only=True, allow_null=True)

Managed Serializer

managed = CharField(allow_null=True, read_only=True)
class MetadataSerializer(authentik.core.api.utils.PassiveSerializer):
65class MetadataSerializer(PassiveSerializer):
66    """Serializer for blueprint metadata"""
67
68    name = CharField()
69    labels = JSONDictField()

Serializer for blueprint metadata

name
labels
class BlueprintInstanceSerializer(authentik.core.api.utils.ModelSerializer):
 72class BlueprintInstanceSerializer(ModelSerializer):
 73    """Info about a single blueprint instance file"""
 74
 75    def validate_path(self, path: str) -> str:
 76        """Ensure the path (if set) specified is retrievable"""
 77        if path == "" or path.startswith(OCI_PREFIX):
 78            return path
 79        files: list[dict] = get_blueprints()
 80        if path not in [file["path"] for file in files]:
 81            raise ValidationError(_("Blueprint file does not exist"))
 82        return path
 83
 84    def validate_content(self, content: str) -> str:
 85        """Ensure content (if set) is a valid blueprint"""
 86        if content == "":
 87            return content
 88        context = self.instance.context if self.instance else {}
 89        valid, logs = Importer.from_string(content, context).validate()
 90        if not valid:
 91            raise ValidationError(
 92                [
 93                    _("Failed to validate blueprint"),
 94                    *[f"- {x.event}" for x in logs],
 95                ]
 96            )
 97        return content
 98
 99    def validate(self, attrs: dict) -> dict:
100        if attrs.get("path", "") == "" and attrs.get("content", "") == "":
101            raise ValidationError(_("Either path or content must be set."))
102        return super().validate(attrs)
103
104    class Meta:
105        model = BlueprintInstance
106        fields = [
107            "pk",
108            "name",
109            "path",
110            "context",
111            "last_applied",
112            "last_applied_hash",
113            "status",
114            "enabled",
115            "managed_models",
116            "metadata",
117            "content",
118        ]
119        extra_kwargs = {
120            "status": {"read_only": True},
121            "last_applied": {"read_only": True},
122            "last_applied_hash": {"read_only": True},
123            "managed_models": {"read_only": True},
124            "metadata": {"read_only": True},
125        }

Info about a single blueprint instance file

def validate_path(self, path: str) -> str:
75    def validate_path(self, path: str) -> str:
76        """Ensure the path (if set) specified is retrievable"""
77        if path == "" or path.startswith(OCI_PREFIX):
78            return path
79        files: list[dict] = get_blueprints()
80        if path not in [file["path"] for file in files]:
81            raise ValidationError(_("Blueprint file does not exist"))
82        return path

Ensure the path (if set) specified is retrievable

def validate_content(self, content: str) -> str:
84    def validate_content(self, content: str) -> str:
85        """Ensure content (if set) is a valid blueprint"""
86        if content == "":
87            return content
88        context = self.instance.context if self.instance else {}
89        valid, logs = Importer.from_string(content, context).validate()
90        if not valid:
91            raise ValidationError(
92                [
93                    _("Failed to validate blueprint"),
94                    *[f"- {x.event}" for x in logs],
95                ]
96            )
97        return content

Ensure content (if set) is a valid blueprint

def validate(self, attrs: dict) -> dict:
 99    def validate(self, attrs: dict) -> dict:
100        if attrs.get("path", "") == "" and attrs.get("content", "") == "":
101            raise ValidationError(_("Either path or content must be set."))
102        return super().validate(attrs)
class BlueprintInstanceSerializer.Meta:
104    class Meta:
105        model = BlueprintInstance
106        fields = [
107            "pk",
108            "name",
109            "path",
110            "context",
111            "last_applied",
112            "last_applied_hash",
113            "status",
114            "enabled",
115            "managed_models",
116            "metadata",
117            "content",
118        ]
119        extra_kwargs = {
120            "status": {"read_only": True},
121            "last_applied": {"read_only": True},
122            "last_applied_hash": {"read_only": True},
123            "managed_models": {"read_only": True},
124            "metadata": {"read_only": True},
125        }
fields = ['pk', 'name', 'path', 'context', 'last_applied', 'last_applied_hash', 'status', 'enabled', 'managed_models', 'metadata', 'content']
extra_kwargs = {'status': {'read_only': True}, 'last_applied': {'read_only': True}, 'last_applied_hash': {'read_only': True}, 'managed_models': {'read_only': True}, 'metadata': {'read_only': True}}
def check_blueprint_perms( blueprint: authentik.blueprints.v1.common.Blueprint, user: authentik.core.models.User, explicit_action: str | None = None):
128def check_blueprint_perms(blueprint: Blueprint, user: User, explicit_action: str | None = None):
129    """Check for individual permissions for each model in a blueprint"""
130    for entry in blueprint.entries:
131        full_model = entry.get_model(blueprint)
132        app, __, model = full_model.partition(".")
133        perms = [
134            f"{app}.add_{model}",
135            f"{app}.change_{model}",
136            f"{app}.delete_{model}",
137        ]
138        if explicit_action:
139            perms = [f"{app}.{explicit_action}_{model}"]
140        for perm in perms:
141            if not user.has_perm(perm):
142                raise PermissionDenied(
143                    {
144                        entry.id: _(
145                            "User lacks permission to create {model}".format_map(
146                                {
147                                    "model": full_model,
148                                }
149                            )
150                        )
151                    }
152                )

Check for individual permissions for each model in a blueprint

class BlueprintInstanceViewSet(authentik.core.api.used_by.UsedByMixin, rest_framework.viewsets.ModelViewSet):
155class BlueprintInstanceViewSet(UsedByMixin, ModelViewSet):
156    """Blueprint instances"""
157
158    serializer_class = BlueprintInstanceSerializer
159    queryset = BlueprintInstance.objects.all()
160    search_fields = ["name", "path"]
161    filterset_fields = ["name", "path"]
162    ordering = ["name"]
163
164    class BlueprintImportResultSerializer(PassiveSerializer):
165        """Logs of an attempted blueprint import"""
166
167        logs = LogEventSerializer(many=True, read_only=True)
168        success = BooleanField(read_only=True)
169
170    @extend_schema(
171        responses={
172            200: ListSerializer(
173                child=inline_serializer(
174                    "BlueprintFile",
175                    fields={
176                        "path": CharField(),
177                        "last_m": DateTimeField(),
178                        "hash": CharField(),
179                        "meta": MetadataSerializer(required=False, read_only=True),
180                    },
181                )
182            )
183        }
184    )
185    @action(detail=False, pagination_class=None, filter_backends=[])
186    def available(self, request: Request) -> Response:
187        """Get blueprints"""
188        files: list[dict] = get_blueprints()
189        return Response(files)
190
191    @permission_required("authentik_blueprints.view_blueprintinstance")
192    @extend_schema(
193        request=None,
194        responses={
195            200: BlueprintInstanceSerializer(),
196        },
197    )
198    @action(detail=True, pagination_class=None, filter_backends=[], methods=["POST"])
199    def apply(self, request: Request, *args, **kwargs) -> Response:
200        """Apply a blueprint"""
201        blueprint = self.get_object()
202        apply_blueprint.send_with_options(args=(blueprint.pk,), rel_obj=blueprint)
203        return self.retrieve(request, *args, **kwargs)
204
205    @extend_schema(
206        request={"multipart/form-data": BlueprintUploadSerializer},
207        responses={
208            204: BlueprintImportResultSerializer,
209            400: BlueprintImportResultSerializer,
210        },
211    )
212    @action(url_path="import", detail=False, methods=["POST"], parser_classes=(MultiPartParser,))
213    @validate(
214        BlueprintUploadSerializer,
215    )
216    def import_(self, request: Request, body: BlueprintUploadSerializer) -> Response:
217        """Import blueprint from .yaml file and apply it once, without creating an instance"""
218        string_contents = ""
219        if body.validated_data.get("file"):
220            file = cast(InMemoryUploadedFile, body.validated_data["file"])
221            string_contents = file.read().decode()
222        elif body.validated_data.get("path"):
223            string_contents = BlueprintInstance(
224                path=body.validated_data.get("path")
225            ).retrieve_file()
226        else:
227            raise ValidationError("Either path or file must be set")
228        importer = Importer.from_string(string_contents)
229
230        check_blueprint_perms(importer.blueprint, request.user)
231
232        valid, logs = importer.validate()
233
234        import_response = self.BlueprintImportResultSerializer(
235            data={
236                "logs": [],
237                "success": False,
238            }
239        )
240        import_response.is_valid(raise_exception=True)
241
242        import_response.initial_data["logs"] = [LogEventSerializer(log).data for log in logs]
243        import_response.initial_data["success"] = valid
244        import_response.is_valid()
245        if not valid:
246            return Response(data=import_response.initial_data, status=200)
247
248        successful = importer.apply()
249        import_response.initial_data["success"] = successful
250        import_response.is_valid()
251        if not successful:
252            return Response(data=import_response.initial_data, status=200)
253        return Response(data=import_response.initial_data, status=200)

Blueprint instances

serializer_class = <class 'BlueprintInstanceSerializer'>
queryset = <QuerySet []>
search_fields = ['name', 'path']
filterset_fields = ['name', 'path']
ordering = ['name']
@extend_schema(responses={200: ListSerializer(child=inline_serializer('BlueprintFile', fields={'path': CharField(), 'last_m': DateTimeField(), 'hash': CharField(), 'meta': MetadataSerializer(required=False, read_only=True)}))})
@action(detail=False, pagination_class=None, filter_backends=[])
def available( self, request: rest_framework.request.Request) -> rest_framework.response.Response:
170    @extend_schema(
171        responses={
172            200: ListSerializer(
173                child=inline_serializer(
174                    "BlueprintFile",
175                    fields={
176                        "path": CharField(),
177                        "last_m": DateTimeField(),
178                        "hash": CharField(),
179                        "meta": MetadataSerializer(required=False, read_only=True),
180                    },
181                )
182            )
183        }
184    )
185    @action(detail=False, pagination_class=None, filter_backends=[])
186    def available(self, request: Request) -> Response:
187        """Get blueprints"""
188        files: list[dict] = get_blueprints()
189        return Response(files)

Get blueprints

@permission_required('authentik_blueprints.view_blueprintinstance')
@extend_schema(request=None, responses={200: BlueprintInstanceSerializer()})
@action(detail=True, pagination_class=None, filter_backends=[], methods=['POST'])
def apply( self, request: rest_framework.request.Request, *args, **kwargs) -> rest_framework.response.Response:
191    @permission_required("authentik_blueprints.view_blueprintinstance")
192    @extend_schema(
193        request=None,
194        responses={
195            200: BlueprintInstanceSerializer(),
196        },
197    )
198    @action(detail=True, pagination_class=None, filter_backends=[], methods=["POST"])
199    def apply(self, request: Request, *args, **kwargs) -> Response:
200        """Apply a blueprint"""
201        blueprint = self.get_object()
202        apply_blueprint.send_with_options(args=(blueprint.pk,), rel_obj=blueprint)
203        return self.retrieve(request, *args, **kwargs)

Apply a blueprint

@extend_schema(request={'multipart/form-data': BlueprintUploadSerializer}, responses={204: BlueprintImportResultSerializer, 400: BlueprintImportResultSerializer})
@action(url_path='import', detail=False, methods=['POST'], parser_classes=(MultiPartParser,))
@validate(BlueprintUploadSerializer)
def import_( self, request: rest_framework.request.Request, body: BlueprintUploadSerializer) -> rest_framework.response.Response:
205    @extend_schema(
206        request={"multipart/form-data": BlueprintUploadSerializer},
207        responses={
208            204: BlueprintImportResultSerializer,
209            400: BlueprintImportResultSerializer,
210        },
211    )
212    @action(url_path="import", detail=False, methods=["POST"], parser_classes=(MultiPartParser,))
213    @validate(
214        BlueprintUploadSerializer,
215    )
216    def import_(self, request: Request, body: BlueprintUploadSerializer) -> Response:
217        """Import blueprint from .yaml file and apply it once, without creating an instance"""
218        string_contents = ""
219        if body.validated_data.get("file"):
220            file = cast(InMemoryUploadedFile, body.validated_data["file"])
221            string_contents = file.read().decode()
222        elif body.validated_data.get("path"):
223            string_contents = BlueprintInstance(
224                path=body.validated_data.get("path")
225            ).retrieve_file()
226        else:
227            raise ValidationError("Either path or file must be set")
228        importer = Importer.from_string(string_contents)
229
230        check_blueprint_perms(importer.blueprint, request.user)
231
232        valid, logs = importer.validate()
233
234        import_response = self.BlueprintImportResultSerializer(
235            data={
236                "logs": [],
237                "success": False,
238            }
239        )
240        import_response.is_valid(raise_exception=True)
241
242        import_response.initial_data["logs"] = [LogEventSerializer(log).data for log in logs]
243        import_response.initial_data["success"] = valid
244        import_response.is_valid()
245        if not valid:
246            return Response(data=import_response.initial_data, status=200)
247
248        successful = importer.apply()
249        import_response.initial_data["success"] = successful
250        import_response.is_valid()
251        if not successful:
252            return Response(data=import_response.initial_data, status=200)
253        return Response(data=import_response.initial_data, status=200)

Import blueprint from .yaml file and apply it once, without creating an instance

name = None
description = None
suffix = None
detail = None
basename = None
class BlueprintInstanceViewSet.BlueprintImportResultSerializer(authentik.core.api.utils.PassiveSerializer):
164    class BlueprintImportResultSerializer(PassiveSerializer):
165        """Logs of an attempted blueprint import"""
166
167        logs = LogEventSerializer(many=True, read_only=True)
168        success = BooleanField(read_only=True)

Logs of an attempted blueprint import

logs
success