authentik.blueprints.api
Serializer mixin for managed models
1"""Serializer mixin for managed models""" 2 3from typing import cast 4 5from django.conf import settings 6from django.core.files.uploadedfile import InMemoryUploadedFile 7from django.utils.translation import gettext_lazy as _ 8from drf_spectacular.utils import extend_schema, inline_serializer 9from rest_framework.decorators import action 10from rest_framework.exceptions import PermissionDenied, ValidationError 11from rest_framework.fields import ( 12 BooleanField, 13 CharField, 14 DateTimeField, 15 FileField, 16) 17from rest_framework.parsers import MultiPartParser 18from rest_framework.request import Request 19from rest_framework.response import Response 20from rest_framework.serializers import ListSerializer 21from rest_framework.viewsets import ModelViewSet 22 23from authentik.api.validation import validate 24from authentik.blueprints.models import BlueprintInstance 25from authentik.blueprints.v1.common import Blueprint 26from authentik.blueprints.v1.importer import Importer 27from authentik.blueprints.v1.oci import OCI_PREFIX 28from authentik.blueprints.v1.tasks import apply_blueprint, blueprints_find_dict 29from authentik.core.api.used_by import UsedByMixin 30from authentik.core.api.utils import JSONDictField, ModelSerializer, PassiveSerializer 31from authentik.core.models import User 32from authentik.events.logs import LogEventSerializer 33from authentik.rbac.decorators import permission_required 34 35 36def get_blueprints(): 37 if settings.DEBUG: 38 return blueprints_find_dict() 39 return blueprints_find_dict.send().get_result(block=True) 40 41 42class BlueprintUploadSerializer(PassiveSerializer): 43 """Serializer to upload file""" 44 45 file = FileField(required=False) 46 path = CharField(required=False) 47 48 def validate_path(self, path: str) -> str: 49 """Ensure the path (if set) specified is retrievable""" 50 if path == "": 51 return path 52 files: list[dict] = get_blueprints() 53 if path not in [file["path"] for file in files]: 54 raise ValidationError(_("Blueprint file does not exist")) 55 return path 56 57 58class ManagedSerializer: 59 """Managed Serializer""" 60 61 managed = CharField(read_only=True, allow_null=True) 62 63 64class MetadataSerializer(PassiveSerializer): 65 """Serializer for blueprint metadata""" 66 67 name = CharField() 68 labels = JSONDictField() 69 70 71class BlueprintInstanceSerializer(ModelSerializer): 72 """Info about a single blueprint instance file""" 73 74 def validate_path(self, path: str) -> str: 75 """Ensure the path (if set) specified is retrievable""" 76 if path == "" or path.startswith(OCI_PREFIX): 77 return path 78 files: list[dict] = get_blueprints() 79 if path not in [file["path"] for file in files]: 80 raise ValidationError(_("Blueprint file does not exist")) 81 return path 82 83 def validate_content(self, content: str) -> str: 84 """Ensure content (if set) is a valid blueprint""" 85 if content == "": 86 return content 87 context = self.instance.context if self.instance else {} 88 valid, logs = Importer.from_string(content, context).validate() 89 if not valid: 90 raise ValidationError( 91 [ 92 _("Failed to validate blueprint"), 93 *[f"- {x.event}" for x in logs], 94 ] 95 ) 96 return content 97 98 def validate(self, attrs: dict) -> dict: 99 if attrs.get("path", "") == "" and attrs.get("content", "") == "": 100 raise ValidationError(_("Either path or content must be set.")) 101 return super().validate(attrs) 102 103 class Meta: 104 model = BlueprintInstance 105 fields = [ 106 "pk", 107 "name", 108 "path", 109 "context", 110 "last_applied", 111 "last_applied_hash", 112 "status", 113 "enabled", 114 "managed_models", 115 "metadata", 116 "content", 117 ] 118 extra_kwargs = { 119 "status": {"read_only": True}, 120 "last_applied": {"read_only": True}, 121 "last_applied_hash": {"read_only": True}, 122 "managed_models": {"read_only": True}, 123 "metadata": {"read_only": True}, 124 } 125 126 127def check_blueprint_perms(blueprint: Blueprint, user: User, explicit_action: str | None = None): 128 """Check for individual permissions for each model in a blueprint""" 129 for entry in blueprint.entries: 130 full_model = entry.get_model(blueprint) 131 app, __, model = full_model.partition(".") 132 perms = [ 133 f"{app}.add_{model}", 134 f"{app}.change_{model}", 135 f"{app}.delete_{model}", 136 ] 137 if explicit_action: 138 perms = [f"{app}.{explicit_action}_{model}"] 139 for perm in perms: 140 if not user.has_perm(perm): 141 raise PermissionDenied( 142 { 143 entry.id: _( 144 "User lacks permission to create {model}".format_map( 145 { 146 "model": full_model, 147 } 148 ) 149 ) 150 } 151 ) 152 153 154class BlueprintInstanceViewSet(UsedByMixin, ModelViewSet): 155 """Blueprint instances""" 156 157 serializer_class = BlueprintInstanceSerializer 158 queryset = BlueprintInstance.objects.all() 159 search_fields = ["name", "path"] 160 filterset_fields = ["name", "path"] 161 ordering = ["name"] 162 163 class BlueprintImportResultSerializer(PassiveSerializer): 164 """Logs of an attempted blueprint import""" 165 166 logs = LogEventSerializer(many=True, read_only=True) 167 success = BooleanField(read_only=True) 168 169 @extend_schema( 170 responses={ 171 200: ListSerializer( 172 child=inline_serializer( 173 "BlueprintFile", 174 fields={ 175 "path": CharField(), 176 "last_m": DateTimeField(), 177 "hash": CharField(), 178 "meta": MetadataSerializer(required=False, read_only=True), 179 }, 180 ) 181 ) 182 } 183 ) 184 @action(detail=False, pagination_class=None, filter_backends=[]) 185 def available(self, request: Request) -> Response: 186 """Get blueprints""" 187 files: list[dict] = get_blueprints() 188 return Response(files) 189 190 @permission_required("authentik_blueprints.view_blueprintinstance") 191 @extend_schema( 192 request=None, 193 responses={ 194 200: BlueprintInstanceSerializer(), 195 }, 196 ) 197 @action(detail=True, pagination_class=None, filter_backends=[], methods=["POST"]) 198 def apply(self, request: Request, *args, **kwargs) -> Response: 199 """Apply a blueprint""" 200 blueprint = self.get_object() 201 apply_blueprint.send_with_options(args=(blueprint.pk,), rel_obj=blueprint) 202 return self.retrieve(request, *args, **kwargs) 203 204 @extend_schema( 205 request={"multipart/form-data": BlueprintUploadSerializer}, 206 responses={ 207 204: BlueprintImportResultSerializer, 208 400: BlueprintImportResultSerializer, 209 }, 210 ) 211 @action(url_path="import", detail=False, methods=["POST"], parser_classes=(MultiPartParser,)) 212 @validate( 213 BlueprintUploadSerializer, 214 ) 215 def import_(self, request: Request, body: BlueprintUploadSerializer) -> Response: 216 """Import blueprint from .yaml file and apply it once, without creating an instance""" 217 string_contents = "" 218 if body.validated_data.get("file"): 219 file = cast(InMemoryUploadedFile, body.validated_data["file"]) 220 string_contents = file.read().decode() 221 elif body.validated_data.get("path"): 222 string_contents = BlueprintInstance( 223 path=body.validated_data.get("path") 224 ).retrieve_file() 225 else: 226 raise ValidationError("Either path or file must be set") 227 importer = Importer.from_string(string_contents) 228 229 check_blueprint_perms(importer.blueprint, request.user) 230 231 valid, logs = importer.validate() 232 233 import_response = self.BlueprintImportResultSerializer( 234 data={ 235 "logs": [], 236 "success": False, 237 } 238 ) 239 import_response.is_valid(raise_exception=True) 240 241 import_response.initial_data["logs"] = [LogEventSerializer(log).data for log in logs] 242 import_response.initial_data["success"] = valid 243 import_response.is_valid() 244 if not valid: 245 return Response(data=import_response.initial_data, status=200) 246 247 successful = importer.apply() 248 import_response.initial_data["success"] = successful 249 import_response.is_valid() 250 if not successful: 251 return Response(data=import_response.initial_data, status=200) 252 return Response(data=import_response.initial_data, status=200)
def
get_blueprints():
43class BlueprintUploadSerializer(PassiveSerializer): 44 """Serializer to upload file""" 45 46 file = FileField(required=False) 47 path = CharField(required=False) 48 49 def validate_path(self, path: str) -> str: 50 """Ensure the path (if set) specified is retrievable""" 51 if path == "": 52 return path 53 files: list[dict] = get_blueprints() 54 if path not in [file["path"] for file in files]: 55 raise ValidationError(_("Blueprint file does not exist")) 56 return path
Serializer to upload file
def
validate_path(self, path: str) -> str:
49 def validate_path(self, path: str) -> str: 50 """Ensure the path (if set) specified is retrievable""" 51 if path == "": 52 return path 53 files: list[dict] = get_blueprints() 54 if path not in [file["path"] for file in files]: 55 raise ValidationError(_("Blueprint file does not exist")) 56 return path
Ensure the path (if set) specified is retrievable
Inherited Members
class
ManagedSerializer:
59class ManagedSerializer: 60 """Managed Serializer""" 61 62 managed = CharField(read_only=True, allow_null=True)
Managed Serializer
65class MetadataSerializer(PassiveSerializer): 66 """Serializer for blueprint metadata""" 67 68 name = CharField() 69 labels = JSONDictField()
Serializer for blueprint metadata
Inherited Members
72class BlueprintInstanceSerializer(ModelSerializer): 73 """Info about a single blueprint instance file""" 74 75 def validate_path(self, path: str) -> str: 76 """Ensure the path (if set) specified is retrievable""" 77 if path == "" or path.startswith(OCI_PREFIX): 78 return path 79 files: list[dict] = get_blueprints() 80 if path not in [file["path"] for file in files]: 81 raise ValidationError(_("Blueprint file does not exist")) 82 return path 83 84 def validate_content(self, content: str) -> str: 85 """Ensure content (if set) is a valid blueprint""" 86 if content == "": 87 return content 88 context = self.instance.context if self.instance else {} 89 valid, logs = Importer.from_string(content, context).validate() 90 if not valid: 91 raise ValidationError( 92 [ 93 _("Failed to validate blueprint"), 94 *[f"- {x.event}" for x in logs], 95 ] 96 ) 97 return content 98 99 def validate(self, attrs: dict) -> dict: 100 if attrs.get("path", "") == "" and attrs.get("content", "") == "": 101 raise ValidationError(_("Either path or content must be set.")) 102 return super().validate(attrs) 103 104 class Meta: 105 model = BlueprintInstance 106 fields = [ 107 "pk", 108 "name", 109 "path", 110 "context", 111 "last_applied", 112 "last_applied_hash", 113 "status", 114 "enabled", 115 "managed_models", 116 "metadata", 117 "content", 118 ] 119 extra_kwargs = { 120 "status": {"read_only": True}, 121 "last_applied": {"read_only": True}, 122 "last_applied_hash": {"read_only": True}, 123 "managed_models": {"read_only": True}, 124 "metadata": {"read_only": True}, 125 }
Info about a single blueprint instance file
def
validate_path(self, path: str) -> str:
75 def validate_path(self, path: str) -> str: 76 """Ensure the path (if set) specified is retrievable""" 77 if path == "" or path.startswith(OCI_PREFIX): 78 return path 79 files: list[dict] = get_blueprints() 80 if path not in [file["path"] for file in files]: 81 raise ValidationError(_("Blueprint file does not exist")) 82 return path
Ensure the path (if set) specified is retrievable
def
validate_content(self, content: str) -> str:
84 def validate_content(self, content: str) -> str: 85 """Ensure content (if set) is a valid blueprint""" 86 if content == "": 87 return content 88 context = self.instance.context if self.instance else {} 89 valid, logs = Importer.from_string(content, context).validate() 90 if not valid: 91 raise ValidationError( 92 [ 93 _("Failed to validate blueprint"), 94 *[f"- {x.event}" for x in logs], 95 ] 96 ) 97 return content
Ensure content (if set) is a valid blueprint
Inherited Members
class
BlueprintInstanceSerializer.Meta:
104 class Meta: 105 model = BlueprintInstance 106 fields = [ 107 "pk", 108 "name", 109 "path", 110 "context", 111 "last_applied", 112 "last_applied_hash", 113 "status", 114 "enabled", 115 "managed_models", 116 "metadata", 117 "content", 118 ] 119 extra_kwargs = { 120 "status": {"read_only": True}, 121 "last_applied": {"read_only": True}, 122 "last_applied_hash": {"read_only": True}, 123 "managed_models": {"read_only": True}, 124 "metadata": {"read_only": True}, 125 }
model =
<class 'authentik.blueprints.models.BlueprintInstance'>
def
check_blueprint_perms( blueprint: authentik.blueprints.v1.common.Blueprint, user: authentik.core.models.User, explicit_action: str | None = None):
128def check_blueprint_perms(blueprint: Blueprint, user: User, explicit_action: str | None = None): 129 """Check for individual permissions for each model in a blueprint""" 130 for entry in blueprint.entries: 131 full_model = entry.get_model(blueprint) 132 app, __, model = full_model.partition(".") 133 perms = [ 134 f"{app}.add_{model}", 135 f"{app}.change_{model}", 136 f"{app}.delete_{model}", 137 ] 138 if explicit_action: 139 perms = [f"{app}.{explicit_action}_{model}"] 140 for perm in perms: 141 if not user.has_perm(perm): 142 raise PermissionDenied( 143 { 144 entry.id: _( 145 "User lacks permission to create {model}".format_map( 146 { 147 "model": full_model, 148 } 149 ) 150 ) 151 } 152 )
Check for individual permissions for each model in a blueprint
class
BlueprintInstanceViewSet(authentik.core.api.used_by.UsedByMixin, rest_framework.viewsets.ModelViewSet):
155class BlueprintInstanceViewSet(UsedByMixin, ModelViewSet): 156 """Blueprint instances""" 157 158 serializer_class = BlueprintInstanceSerializer 159 queryset = BlueprintInstance.objects.all() 160 search_fields = ["name", "path"] 161 filterset_fields = ["name", "path"] 162 ordering = ["name"] 163 164 class BlueprintImportResultSerializer(PassiveSerializer): 165 """Logs of an attempted blueprint import""" 166 167 logs = LogEventSerializer(many=True, read_only=True) 168 success = BooleanField(read_only=True) 169 170 @extend_schema( 171 responses={ 172 200: ListSerializer( 173 child=inline_serializer( 174 "BlueprintFile", 175 fields={ 176 "path": CharField(), 177 "last_m": DateTimeField(), 178 "hash": CharField(), 179 "meta": MetadataSerializer(required=False, read_only=True), 180 }, 181 ) 182 ) 183 } 184 ) 185 @action(detail=False, pagination_class=None, filter_backends=[]) 186 def available(self, request: Request) -> Response: 187 """Get blueprints""" 188 files: list[dict] = get_blueprints() 189 return Response(files) 190 191 @permission_required("authentik_blueprints.view_blueprintinstance") 192 @extend_schema( 193 request=None, 194 responses={ 195 200: BlueprintInstanceSerializer(), 196 }, 197 ) 198 @action(detail=True, pagination_class=None, filter_backends=[], methods=["POST"]) 199 def apply(self, request: Request, *args, **kwargs) -> Response: 200 """Apply a blueprint""" 201 blueprint = self.get_object() 202 apply_blueprint.send_with_options(args=(blueprint.pk,), rel_obj=blueprint) 203 return self.retrieve(request, *args, **kwargs) 204 205 @extend_schema( 206 request={"multipart/form-data": BlueprintUploadSerializer}, 207 responses={ 208 204: BlueprintImportResultSerializer, 209 400: BlueprintImportResultSerializer, 210 }, 211 ) 212 @action(url_path="import", detail=False, methods=["POST"], parser_classes=(MultiPartParser,)) 213 @validate( 214 BlueprintUploadSerializer, 215 ) 216 def import_(self, request: Request, body: BlueprintUploadSerializer) -> Response: 217 """Import blueprint from .yaml file and apply it once, without creating an instance""" 218 string_contents = "" 219 if body.validated_data.get("file"): 220 file = cast(InMemoryUploadedFile, body.validated_data["file"]) 221 string_contents = file.read().decode() 222 elif body.validated_data.get("path"): 223 string_contents = BlueprintInstance( 224 path=body.validated_data.get("path") 225 ).retrieve_file() 226 else: 227 raise ValidationError("Either path or file must be set") 228 importer = Importer.from_string(string_contents) 229 230 check_blueprint_perms(importer.blueprint, request.user) 231 232 valid, logs = importer.validate() 233 234 import_response = self.BlueprintImportResultSerializer( 235 data={ 236 "logs": [], 237 "success": False, 238 } 239 ) 240 import_response.is_valid(raise_exception=True) 241 242 import_response.initial_data["logs"] = [LogEventSerializer(log).data for log in logs] 243 import_response.initial_data["success"] = valid 244 import_response.is_valid() 245 if not valid: 246 return Response(data=import_response.initial_data, status=200) 247 248 successful = importer.apply() 249 import_response.initial_data["success"] = successful 250 import_response.is_valid() 251 if not successful: 252 return Response(data=import_response.initial_data, status=200) 253 return Response(data=import_response.initial_data, status=200)
Blueprint instances
serializer_class =
<class 'BlueprintInstanceSerializer'>
@extend_schema(responses={200: ListSerializer(child=inline_serializer('BlueprintFile', fields={'path': CharField(), 'last_m': DateTimeField(), 'hash': CharField(), 'meta': MetadataSerializer(required=False, read_only=True)}))})
@action(detail=False, pagination_class=None, filter_backends=[])
def
available( self, request: rest_framework.request.Request) -> rest_framework.response.Response:
170 @extend_schema( 171 responses={ 172 200: ListSerializer( 173 child=inline_serializer( 174 "BlueprintFile", 175 fields={ 176 "path": CharField(), 177 "last_m": DateTimeField(), 178 "hash": CharField(), 179 "meta": MetadataSerializer(required=False, read_only=True), 180 }, 181 ) 182 ) 183 } 184 ) 185 @action(detail=False, pagination_class=None, filter_backends=[]) 186 def available(self, request: Request) -> Response: 187 """Get blueprints""" 188 files: list[dict] = get_blueprints() 189 return Response(files)
Get blueprints
@extend_schema(request=None, responses={200: BlueprintInstanceSerializer()})
@action(detail=True, pagination_class=None, filter_backends=[], methods=['POST'])
def
apply( self, request: rest_framework.request.Request, *args, **kwargs) -> rest_framework.response.Response:
191 @permission_required("authentik_blueprints.view_blueprintinstance") 192 @extend_schema( 193 request=None, 194 responses={ 195 200: BlueprintInstanceSerializer(), 196 }, 197 ) 198 @action(detail=True, pagination_class=None, filter_backends=[], methods=["POST"]) 199 def apply(self, request: Request, *args, **kwargs) -> Response: 200 """Apply a blueprint""" 201 blueprint = self.get_object() 202 apply_blueprint.send_with_options(args=(blueprint.pk,), rel_obj=blueprint) 203 return self.retrieve(request, *args, **kwargs)
Apply a blueprint
@extend_schema(request={'multipart/form-data': BlueprintUploadSerializer}, responses={204: BlueprintImportResultSerializer, 400: BlueprintImportResultSerializer})
@action(url_path='import', detail=False, methods=['POST'], parser_classes=(MultiPartParser,))
@validate(BlueprintUploadSerializer)
def
import_( self, request: rest_framework.request.Request, body: BlueprintUploadSerializer) -> rest_framework.response.Response:
205 @extend_schema( 206 request={"multipart/form-data": BlueprintUploadSerializer}, 207 responses={ 208 204: BlueprintImportResultSerializer, 209 400: BlueprintImportResultSerializer, 210 }, 211 ) 212 @action(url_path="import", detail=False, methods=["POST"], parser_classes=(MultiPartParser,)) 213 @validate( 214 BlueprintUploadSerializer, 215 ) 216 def import_(self, request: Request, body: BlueprintUploadSerializer) -> Response: 217 """Import blueprint from .yaml file and apply it once, without creating an instance""" 218 string_contents = "" 219 if body.validated_data.get("file"): 220 file = cast(InMemoryUploadedFile, body.validated_data["file"]) 221 string_contents = file.read().decode() 222 elif body.validated_data.get("path"): 223 string_contents = BlueprintInstance( 224 path=body.validated_data.get("path") 225 ).retrieve_file() 226 else: 227 raise ValidationError("Either path or file must be set") 228 importer = Importer.from_string(string_contents) 229 230 check_blueprint_perms(importer.blueprint, request.user) 231 232 valid, logs = importer.validate() 233 234 import_response = self.BlueprintImportResultSerializer( 235 data={ 236 "logs": [], 237 "success": False, 238 } 239 ) 240 import_response.is_valid(raise_exception=True) 241 242 import_response.initial_data["logs"] = [LogEventSerializer(log).data for log in logs] 243 import_response.initial_data["success"] = valid 244 import_response.is_valid() 245 if not valid: 246 return Response(data=import_response.initial_data, status=200) 247 248 successful = importer.apply() 249 import_response.initial_data["success"] = successful 250 import_response.is_valid() 251 if not successful: 252 return Response(data=import_response.initial_data, status=200) 253 return Response(data=import_response.initial_data, status=200)
Import blueprint from .yaml file and apply it once, without creating an instance
Inherited Members
class
BlueprintInstanceViewSet.BlueprintImportResultSerializer(authentik.core.api.utils.PassiveSerializer):
164 class BlueprintImportResultSerializer(PassiveSerializer): 165 """Logs of an attempted blueprint import""" 166 167 logs = LogEventSerializer(many=True, read_only=True) 168 success = BooleanField(read_only=True)
Logs of an attempted blueprint import