authentik.blueprints.api
Serializer mixin for managed models
1"""Serializer mixin for managed models""" 2 3from json import JSONDecodeError, loads 4from typing import cast 5 6from django.conf import settings 7from django.core.files.uploadedfile import InMemoryUploadedFile 8from django.utils.translation import gettext_lazy as _ 9from drf_spectacular.utils import extend_schema, inline_serializer 10from rest_framework.decorators import action 11from rest_framework.exceptions import PermissionDenied, ValidationError 12from rest_framework.fields import ( 13 BooleanField, 14 CharField, 15 DateTimeField, 16 FileField, 17) 18from rest_framework.parsers import MultiPartParser 19from rest_framework.request import Request 20from rest_framework.response import Response 21from rest_framework.serializers import ListSerializer 22from rest_framework.viewsets import ModelViewSet 23 24from authentik.api.validation import validate 25from authentik.blueprints.models import BlueprintInstance 26from authentik.blueprints.v1.common import Blueprint 27from authentik.blueprints.v1.importer import Importer 28from authentik.blueprints.v1.oci import OCI_PREFIX 29from authentik.blueprints.v1.tasks import apply_blueprint, blueprints_find_dict 30from authentik.core.api.used_by import UsedByMixin 31from authentik.core.api.utils import JSONDictField, ModelSerializer, PassiveSerializer 32from authentik.core.models import User 33from authentik.events.logs import LogEventSerializer 34from authentik.rbac.decorators import permission_required 35 36 37def get_blueprints(): 38 if settings.DEBUG: 39 return blueprints_find_dict() 40 return blueprints_find_dict.send().get_result(block=True) 41 42 43class BlueprintUploadSerializer(PassiveSerializer): 44 """Serializer to upload file""" 45 46 file = FileField(required=False) 47 path = CharField(required=False) 48 context = CharField(required=False, allow_blank=True) 49 50 def validate_path(self, path: str) -> str: 51 """Ensure the path (if set) specified is retrievable""" 52 if path == "": 53 return path 54 files: list[dict] = get_blueprints() 55 if path not in [file["path"] for file in files]: 56 raise ValidationError(_("Blueprint file does not exist")) 57 return path 58 59 def validate_context(self, context: str) -> dict: 60 """Parse context as a JSON object""" 61 if not context: 62 return {} 63 try: 64 parsed = loads(context) 65 except JSONDecodeError as exc: 66 raise ValidationError(_("Context must be valid JSON")) from exc 67 if not isinstance(parsed, dict): 68 raise ValidationError(_("Context must be a JSON object")) 69 return parsed 70 71 72class ManagedSerializer: 73 """Managed Serializer""" 74 75 managed = CharField(read_only=True, allow_null=True) 76 77 78class MetadataSerializer(PassiveSerializer): 79 """Serializer for blueprint metadata""" 80 81 name = CharField() 82 labels = JSONDictField() 83 84 85class BlueprintInstanceSerializer(ModelSerializer): 86 """Info about a single blueprint instance file""" 87 88 def validate_path(self, path: str) -> str: 89 """Ensure the path (if set) specified is retrievable""" 90 if path == "" or path.startswith(OCI_PREFIX): 91 return path 92 files: list[dict] = get_blueprints() 93 if path not in [file["path"] for file in files]: 94 raise ValidationError(_("Blueprint file does not exist")) 95 return path 96 97 def validate_content(self, content: str) -> str: 98 """Ensure content (if set) is a valid blueprint""" 99 if content == "": 100 return content 101 context = self.instance.context if self.instance else {} 102 valid, logs = Importer.from_string(content, context).validate() 103 if not valid: 104 raise ValidationError( 105 [ 106 _("Failed to validate blueprint"), 107 *[f"- {x.event}" for x in logs], 108 ] 109 ) 110 return content 111 112 def validate(self, attrs: dict) -> dict: 113 if attrs.get("path", "") == "" and attrs.get("content", "") == "": 114 raise ValidationError(_("Either path or content must be set.")) 115 return super().validate(attrs) 116 117 class Meta: 118 model = BlueprintInstance 119 fields = [ 120 "pk", 121 "name", 122 "path", 123 "context", 124 "last_applied", 125 "last_applied_hash", 126 "status", 127 "enabled", 128 "managed_models", 129 "metadata", 130 "content", 131 ] 132 extra_kwargs = { 133 "status": {"read_only": True}, 134 "last_applied": {"read_only": True}, 135 "last_applied_hash": {"read_only": True}, 136 "managed_models": {"read_only": True}, 137 "metadata": {"read_only": True}, 138 } 139 140 141def check_blueprint_perms(blueprint: Blueprint, user: User, explicit_action: str | None = None): 142 """Check for individual permissions for each model in a blueprint""" 143 for entry in blueprint.iter_entries(): 144 full_model = entry.get_model(blueprint) 145 app, __, model = full_model.partition(".") 146 perms = [ 147 f"{app}.add_{model}", 148 f"{app}.change_{model}", 149 f"{app}.delete_{model}", 150 ] 151 if explicit_action: 152 perms = [f"{app}.{explicit_action}_{model}"] 153 for perm in perms: 154 if not user.has_perm(perm): 155 raise PermissionDenied( 156 { 157 entry.id: _( 158 "User lacks permission to create {model}".format_map( 159 { 160 "model": full_model, 161 } 162 ) 163 ) 164 } 165 ) 166 167 168class BlueprintInstanceViewSet(UsedByMixin, ModelViewSet): 169 """Blueprint instances""" 170 171 serializer_class = BlueprintInstanceSerializer 172 queryset = BlueprintInstance.objects.all() 173 search_fields = ["name", "path"] 174 filterset_fields = ["name", "path"] 175 ordering = ["name"] 176 177 class BlueprintImportResultSerializer(PassiveSerializer): 178 """Logs of an attempted blueprint import""" 179 180 logs = LogEventSerializer(many=True, read_only=True) 181 success = BooleanField(read_only=True) 182 183 @extend_schema( 184 responses={ 185 200: ListSerializer( 186 child=inline_serializer( 187 "BlueprintFile", 188 fields={ 189 "path": CharField(), 190 "last_m": DateTimeField(), 191 "hash": CharField(), 192 "meta": MetadataSerializer(required=False, read_only=True), 193 }, 194 ) 195 ) 196 } 197 ) 198 @action(detail=False, pagination_class=None, filter_backends=[]) 199 def available(self, request: Request) -> Response: 200 """Get blueprints""" 201 files: list[dict] = get_blueprints() 202 return Response(files) 203 204 @permission_required("authentik_blueprints.view_blueprintinstance") 205 @extend_schema( 206 request=None, 207 responses={ 208 200: BlueprintInstanceSerializer(), 209 }, 210 ) 211 @action(detail=True, pagination_class=None, filter_backends=[], methods=["POST"]) 212 def apply(self, request: Request, *args, **kwargs) -> Response: 213 """Apply a blueprint""" 214 blueprint = self.get_object() 215 apply_blueprint.send_with_options(args=(blueprint.pk,), rel_obj=blueprint) 216 return self.retrieve(request, *args, **kwargs) 217 218 @extend_schema( 219 request={"multipart/form-data": BlueprintUploadSerializer}, 220 responses={200: BlueprintImportResultSerializer}, 221 ) 222 @action(url_path="import", detail=False, methods=["POST"], parser_classes=(MultiPartParser,)) 223 @validate( 224 BlueprintUploadSerializer, 225 ) 226 def import_(self, request: Request, body: BlueprintUploadSerializer) -> Response: 227 """Import blueprint from .yaml file and apply it once, without creating an instance""" 228 string_contents = "" 229 if body.validated_data.get("file"): 230 file = cast(InMemoryUploadedFile, body.validated_data["file"]) 231 string_contents = file.read().decode() 232 elif body.validated_data.get("path"): 233 string_contents = BlueprintInstance( 234 path=body.validated_data.get("path") 235 ).retrieve_file() 236 else: 237 raise ValidationError("Either path or file must be set") 238 context = body.validated_data.get("context") or {} 239 importer = Importer.from_string(string_contents, context) 240 241 check_blueprint_perms(importer.blueprint, request.user) 242 243 valid, logs = importer.validate() 244 245 import_response = self.BlueprintImportResultSerializer( 246 data={ 247 "logs": [LogEventSerializer(log).data for log in logs], 248 "success": valid, 249 } 250 ) 251 import_response.is_valid(raise_exception=True) 252 253 if valid: 254 import_response.initial_data["success"] = importer.apply() 255 import_response.is_valid() 256 return Response(data=import_response.initial_data, status=200)
def
get_blueprints():
44class BlueprintUploadSerializer(PassiveSerializer): 45 """Serializer to upload file""" 46 47 file = FileField(required=False) 48 path = CharField(required=False) 49 context = CharField(required=False, allow_blank=True) 50 51 def validate_path(self, path: str) -> str: 52 """Ensure the path (if set) specified is retrievable""" 53 if path == "": 54 return path 55 files: list[dict] = get_blueprints() 56 if path not in [file["path"] for file in files]: 57 raise ValidationError(_("Blueprint file does not exist")) 58 return path 59 60 def validate_context(self, context: str) -> dict: 61 """Parse context as a JSON object""" 62 if not context: 63 return {} 64 try: 65 parsed = loads(context) 66 except JSONDecodeError as exc: 67 raise ValidationError(_("Context must be valid JSON")) from exc 68 if not isinstance(parsed, dict): 69 raise ValidationError(_("Context must be a JSON object")) 70 return parsed
Serializer to upload file
context
614 @property 615 def context(self): 616 """ 617 Returns the context as passed to the root serializer on initialization. 618 """ 619 return getattr(self.root, '_context', {})
Returns the context as passed to the root serializer on initialization.
def
validate_path(self, path: str) -> str:
51 def validate_path(self, path: str) -> str: 52 """Ensure the path (if set) specified is retrievable""" 53 if path == "": 54 return path 55 files: list[dict] = get_blueprints() 56 if path not in [file["path"] for file in files]: 57 raise ValidationError(_("Blueprint file does not exist")) 58 return path
Ensure the path (if set) specified is retrievable
def
validate_context(self, context: str) -> dict:
60 def validate_context(self, context: str) -> dict: 61 """Parse context as a JSON object""" 62 if not context: 63 return {} 64 try: 65 parsed = loads(context) 66 except JSONDecodeError as exc: 67 raise ValidationError(_("Context must be valid JSON")) from exc 68 if not isinstance(parsed, dict): 69 raise ValidationError(_("Context must be a JSON object")) 70 return parsed
Parse context as a JSON object
Inherited Members
class
ManagedSerializer:
73class ManagedSerializer: 74 """Managed Serializer""" 75 76 managed = CharField(read_only=True, allow_null=True)
Managed Serializer
79class MetadataSerializer(PassiveSerializer): 80 """Serializer for blueprint metadata""" 81 82 name = CharField() 83 labels = JSONDictField()
Serializer for blueprint metadata
Inherited Members
86class BlueprintInstanceSerializer(ModelSerializer): 87 """Info about a single blueprint instance file""" 88 89 def validate_path(self, path: str) -> str: 90 """Ensure the path (if set) specified is retrievable""" 91 if path == "" or path.startswith(OCI_PREFIX): 92 return path 93 files: list[dict] = get_blueprints() 94 if path not in [file["path"] for file in files]: 95 raise ValidationError(_("Blueprint file does not exist")) 96 return path 97 98 def validate_content(self, content: str) -> str: 99 """Ensure content (if set) is a valid blueprint""" 100 if content == "": 101 return content 102 context = self.instance.context if self.instance else {} 103 valid, logs = Importer.from_string(content, context).validate() 104 if not valid: 105 raise ValidationError( 106 [ 107 _("Failed to validate blueprint"), 108 *[f"- {x.event}" for x in logs], 109 ] 110 ) 111 return content 112 113 def validate(self, attrs: dict) -> dict: 114 if attrs.get("path", "") == "" and attrs.get("content", "") == "": 115 raise ValidationError(_("Either path or content must be set.")) 116 return super().validate(attrs) 117 118 class Meta: 119 model = BlueprintInstance 120 fields = [ 121 "pk", 122 "name", 123 "path", 124 "context", 125 "last_applied", 126 "last_applied_hash", 127 "status", 128 "enabled", 129 "managed_models", 130 "metadata", 131 "content", 132 ] 133 extra_kwargs = { 134 "status": {"read_only": True}, 135 "last_applied": {"read_only": True}, 136 "last_applied_hash": {"read_only": True}, 137 "managed_models": {"read_only": True}, 138 "metadata": {"read_only": True}, 139 }
Info about a single blueprint instance file
def
validate_path(self, path: str) -> str:
89 def validate_path(self, path: str) -> str: 90 """Ensure the path (if set) specified is retrievable""" 91 if path == "" or path.startswith(OCI_PREFIX): 92 return path 93 files: list[dict] = get_blueprints() 94 if path not in [file["path"] for file in files]: 95 raise ValidationError(_("Blueprint file does not exist")) 96 return path
Ensure the path (if set) specified is retrievable
def
validate_content(self, content: str) -> str:
98 def validate_content(self, content: str) -> str: 99 """Ensure content (if set) is a valid blueprint""" 100 if content == "": 101 return content 102 context = self.instance.context if self.instance else {} 103 valid, logs = Importer.from_string(content, context).validate() 104 if not valid: 105 raise ValidationError( 106 [ 107 _("Failed to validate blueprint"), 108 *[f"- {x.event}" for x in logs], 109 ] 110 ) 111 return content
Ensure content (if set) is a valid blueprint
Inherited Members
class
BlueprintInstanceSerializer.Meta:
118 class Meta: 119 model = BlueprintInstance 120 fields = [ 121 "pk", 122 "name", 123 "path", 124 "context", 125 "last_applied", 126 "last_applied_hash", 127 "status", 128 "enabled", 129 "managed_models", 130 "metadata", 131 "content", 132 ] 133 extra_kwargs = { 134 "status": {"read_only": True}, 135 "last_applied": {"read_only": True}, 136 "last_applied_hash": {"read_only": True}, 137 "managed_models": {"read_only": True}, 138 "metadata": {"read_only": True}, 139 }
model =
<class 'authentik.blueprints.models.BlueprintInstance'>
def
check_blueprint_perms( blueprint: authentik.blueprints.v1.common.Blueprint, user: authentik.core.models.User, explicit_action: str | None = None):
142def check_blueprint_perms(blueprint: Blueprint, user: User, explicit_action: str | None = None): 143 """Check for individual permissions for each model in a blueprint""" 144 for entry in blueprint.iter_entries(): 145 full_model = entry.get_model(blueprint) 146 app, __, model = full_model.partition(".") 147 perms = [ 148 f"{app}.add_{model}", 149 f"{app}.change_{model}", 150 f"{app}.delete_{model}", 151 ] 152 if explicit_action: 153 perms = [f"{app}.{explicit_action}_{model}"] 154 for perm in perms: 155 if not user.has_perm(perm): 156 raise PermissionDenied( 157 { 158 entry.id: _( 159 "User lacks permission to create {model}".format_map( 160 { 161 "model": full_model, 162 } 163 ) 164 ) 165 } 166 )
Check for individual permissions for each model in a blueprint
class
BlueprintInstanceViewSet(authentik.core.api.used_by.UsedByMixin, rest_framework.viewsets.ModelViewSet):
169class BlueprintInstanceViewSet(UsedByMixin, ModelViewSet): 170 """Blueprint instances""" 171 172 serializer_class = BlueprintInstanceSerializer 173 queryset = BlueprintInstance.objects.all() 174 search_fields = ["name", "path"] 175 filterset_fields = ["name", "path"] 176 ordering = ["name"] 177 178 class BlueprintImportResultSerializer(PassiveSerializer): 179 """Logs of an attempted blueprint import""" 180 181 logs = LogEventSerializer(many=True, read_only=True) 182 success = BooleanField(read_only=True) 183 184 @extend_schema( 185 responses={ 186 200: ListSerializer( 187 child=inline_serializer( 188 "BlueprintFile", 189 fields={ 190 "path": CharField(), 191 "last_m": DateTimeField(), 192 "hash": CharField(), 193 "meta": MetadataSerializer(required=False, read_only=True), 194 }, 195 ) 196 ) 197 } 198 ) 199 @action(detail=False, pagination_class=None, filter_backends=[]) 200 def available(self, request: Request) -> Response: 201 """Get blueprints""" 202 files: list[dict] = get_blueprints() 203 return Response(files) 204 205 @permission_required("authentik_blueprints.view_blueprintinstance") 206 @extend_schema( 207 request=None, 208 responses={ 209 200: BlueprintInstanceSerializer(), 210 }, 211 ) 212 @action(detail=True, pagination_class=None, filter_backends=[], methods=["POST"]) 213 def apply(self, request: Request, *args, **kwargs) -> Response: 214 """Apply a blueprint""" 215 blueprint = self.get_object() 216 apply_blueprint.send_with_options(args=(blueprint.pk,), rel_obj=blueprint) 217 return self.retrieve(request, *args, **kwargs) 218 219 @extend_schema( 220 request={"multipart/form-data": BlueprintUploadSerializer}, 221 responses={200: BlueprintImportResultSerializer}, 222 ) 223 @action(url_path="import", detail=False, methods=["POST"], parser_classes=(MultiPartParser,)) 224 @validate( 225 BlueprintUploadSerializer, 226 ) 227 def import_(self, request: Request, body: BlueprintUploadSerializer) -> Response: 228 """Import blueprint from .yaml file and apply it once, without creating an instance""" 229 string_contents = "" 230 if body.validated_data.get("file"): 231 file = cast(InMemoryUploadedFile, body.validated_data["file"]) 232 string_contents = file.read().decode() 233 elif body.validated_data.get("path"): 234 string_contents = BlueprintInstance( 235 path=body.validated_data.get("path") 236 ).retrieve_file() 237 else: 238 raise ValidationError("Either path or file must be set") 239 context = body.validated_data.get("context") or {} 240 importer = Importer.from_string(string_contents, context) 241 242 check_blueprint_perms(importer.blueprint, request.user) 243 244 valid, logs = importer.validate() 245 246 import_response = self.BlueprintImportResultSerializer( 247 data={ 248 "logs": [LogEventSerializer(log).data for log in logs], 249 "success": valid, 250 } 251 ) 252 import_response.is_valid(raise_exception=True) 253 254 if valid: 255 import_response.initial_data["success"] = importer.apply() 256 import_response.is_valid() 257 return Response(data=import_response.initial_data, status=200)
Blueprint instances
serializer_class =
<class 'BlueprintInstanceSerializer'>
@extend_schema(responses={200: ListSerializer(child=inline_serializer('BlueprintFile', fields={'path': CharField(), 'last_m': DateTimeField(), 'hash': CharField(), 'meta': MetadataSerializer(required=False, read_only=True)}))})
@action(detail=False, pagination_class=None, filter_backends=[])
def
available( self, request: rest_framework.request.Request) -> rest_framework.response.Response:
184 @extend_schema( 185 responses={ 186 200: ListSerializer( 187 child=inline_serializer( 188 "BlueprintFile", 189 fields={ 190 "path": CharField(), 191 "last_m": DateTimeField(), 192 "hash": CharField(), 193 "meta": MetadataSerializer(required=False, read_only=True), 194 }, 195 ) 196 ) 197 } 198 ) 199 @action(detail=False, pagination_class=None, filter_backends=[]) 200 def available(self, request: Request) -> Response: 201 """Get blueprints""" 202 files: list[dict] = get_blueprints() 203 return Response(files)
Get blueprints
@extend_schema(request=None, responses={200: BlueprintInstanceSerializer()})
@action(detail=True, pagination_class=None, filter_backends=[], methods=['POST'])
def
apply( self, request: rest_framework.request.Request, *args, **kwargs) -> rest_framework.response.Response:
205 @permission_required("authentik_blueprints.view_blueprintinstance") 206 @extend_schema( 207 request=None, 208 responses={ 209 200: BlueprintInstanceSerializer(), 210 }, 211 ) 212 @action(detail=True, pagination_class=None, filter_backends=[], methods=["POST"]) 213 def apply(self, request: Request, *args, **kwargs) -> Response: 214 """Apply a blueprint""" 215 blueprint = self.get_object() 216 apply_blueprint.send_with_options(args=(blueprint.pk,), rel_obj=blueprint) 217 return self.retrieve(request, *args, **kwargs)
Apply a blueprint
@extend_schema(request={'multipart/form-data': BlueprintUploadSerializer}, responses={200: BlueprintImportResultSerializer})
@action(url_path='import', detail=False, methods=['POST'], parser_classes=(MultiPartParser,))
@validate(BlueprintUploadSerializer)
def
import_( self, request: rest_framework.request.Request, body: BlueprintUploadSerializer) -> rest_framework.response.Response:
219 @extend_schema( 220 request={"multipart/form-data": BlueprintUploadSerializer}, 221 responses={200: BlueprintImportResultSerializer}, 222 ) 223 @action(url_path="import", detail=False, methods=["POST"], parser_classes=(MultiPartParser,)) 224 @validate( 225 BlueprintUploadSerializer, 226 ) 227 def import_(self, request: Request, body: BlueprintUploadSerializer) -> Response: 228 """Import blueprint from .yaml file and apply it once, without creating an instance""" 229 string_contents = "" 230 if body.validated_data.get("file"): 231 file = cast(InMemoryUploadedFile, body.validated_data["file"]) 232 string_contents = file.read().decode() 233 elif body.validated_data.get("path"): 234 string_contents = BlueprintInstance( 235 path=body.validated_data.get("path") 236 ).retrieve_file() 237 else: 238 raise ValidationError("Either path or file must be set") 239 context = body.validated_data.get("context") or {} 240 importer = Importer.from_string(string_contents, context) 241 242 check_blueprint_perms(importer.blueprint, request.user) 243 244 valid, logs = importer.validate() 245 246 import_response = self.BlueprintImportResultSerializer( 247 data={ 248 "logs": [LogEventSerializer(log).data for log in logs], 249 "success": valid, 250 } 251 ) 252 import_response.is_valid(raise_exception=True) 253 254 if valid: 255 import_response.initial_data["success"] = importer.apply() 256 import_response.is_valid() 257 return Response(data=import_response.initial_data, status=200)
Import blueprint from .yaml file and apply it once, without creating an instance
Inherited Members
class
BlueprintInstanceViewSet.BlueprintImportResultSerializer(authentik.core.api.utils.PassiveSerializer):
178 class BlueprintImportResultSerializer(PassiveSerializer): 179 """Logs of an attempted blueprint import""" 180 181 logs = LogEventSerializer(many=True, read_only=True) 182 success = BooleanField(read_only=True)
Logs of an attempted blueprint import