authentik.core.api.transactional_applications
transactional application and provider creation
1"""transactional application and provider creation""" 2 3from django.apps import apps 4from django.db.models import Model 5from drf_spectacular.utils import PolymorphicProxySerializer, extend_schema, extend_schema_field 6from rest_framework.exceptions import ValidationError 7from rest_framework.fields import BooleanField, CharField, ChoiceField, DictField, ListField 8from rest_framework.permissions import IsAuthenticated 9from rest_framework.request import Request 10from rest_framework.response import Response 11from rest_framework.views import APIView 12from yaml import ScalarNode 13 14from authentik.api.validation import validate 15from authentik.blueprints.api import check_blueprint_perms 16from authentik.blueprints.v1.common import ( 17 Blueprint, 18 BlueprintEntry, 19 BlueprintEntryDesiredState, 20 EntryInvalidError, 21 KeyOf, 22) 23from authentik.blueprints.v1.importer import Importer 24from authentik.core.api.applications import ApplicationSerializer 25from authentik.core.api.utils import PassiveSerializer 26from authentik.core.models import Application, Provider 27from authentik.lib.utils.reflection import all_subclasses 28from authentik.policies.api.bindings import PolicyBindingSerializer 29 30 31def get_provider_serializer_mapping(): 32 """Get a mapping of all providers' model names and their serializers""" 33 mapping = {} 34 for model in all_subclasses(Provider): 35 if model._meta.abstract: 36 continue 37 mapping[f"{model._meta.app_label}.{model._meta.model_name}"] = model().serializer 38 return mapping 39 40 41@extend_schema_field( 42 PolymorphicProxySerializer( 43 component_name="model", 44 serializers=get_provider_serializer_mapping, 45 resource_type_field_name="provider_model", 46 ) 47) 48class TransactionProviderField(DictField): 49 """Dictionary field which can hold provider creation data""" 50 51 52class TransactionPolicyBindingSerializer(PolicyBindingSerializer): 53 """PolicyBindingSerializer which does not require target as target is set implicitly""" 54 55 def validate(self, attrs): 56 # As the PolicyBindingSerializer checks that the correct things can be bound to a target 57 # but we don't have a target here as that's set by the blueprint, pass in an empty app 58 # which will have the correct allowed combination of group/user/policy. 59 attrs["target"] = Application() 60 return super().validate(attrs) 61 62 class Meta(PolicyBindingSerializer.Meta): 63 fields = [x for x in PolicyBindingSerializer.Meta.fields if x != "target"] 64 65 66class TransactionApplicationSerializer(PassiveSerializer): 67 """Serializer for creating a provider and an application in one transaction""" 68 69 app = ApplicationSerializer() 70 provider_model = ChoiceField(choices=list(get_provider_serializer_mapping().keys())) 71 provider = TransactionProviderField() 72 73 policy_bindings = TransactionPolicyBindingSerializer(many=True, required=False) 74 75 _provider_model: type[Provider] = None 76 77 def validate_provider_model(self, fq_model_name: str) -> str: 78 """Validate that the model exists and is a provider""" 79 if "." not in fq_model_name: 80 raise ValidationError("Invalid provider model") 81 try: 82 app, _, model_name = fq_model_name.partition(".") 83 model = apps.get_model(app, model_name) 84 if not issubclass(model, Provider): 85 raise ValidationError("Invalid provider model") 86 self._provider_model = model 87 except LookupError: 88 raise ValidationError("Invalid provider model") from None 89 return fq_model_name 90 91 def validate(self, attrs: dict) -> dict: 92 blueprint = Blueprint() 93 blueprint.entries.append( 94 BlueprintEntry( 95 model=attrs["provider_model"], 96 state=BlueprintEntryDesiredState.MUST_CREATED, 97 identifiers={ 98 "name": attrs["provider"]["name"], 99 }, 100 # Must match the name of the field on `self` 101 id="provider", 102 attrs=attrs["provider"], 103 ) 104 ) 105 app_data = attrs["app"] 106 app_data["provider"] = KeyOf(None, ScalarNode(tag="", value="provider")) 107 blueprint.entries.append( 108 BlueprintEntry( 109 model="authentik_core.application", 110 state=BlueprintEntryDesiredState.MUST_CREATED, 111 identifiers={ 112 "slug": attrs["app"]["slug"], 113 }, 114 attrs=app_data, 115 # Must match the name of the field on `self` 116 id="app", 117 ) 118 ) 119 for binding in attrs.get("policy_bindings", []): 120 binding["target"] = KeyOf(None, ScalarNode(tag="", value="app")) 121 for key, value in binding.items(): 122 if not isinstance(value, Model): 123 continue 124 binding[key] = value.pk 125 blueprint.entries.append( 126 BlueprintEntry( 127 model="authentik_policies.policybinding", 128 state=BlueprintEntryDesiredState.MUST_CREATED, 129 identifiers=binding, 130 ) 131 ) 132 importer = Importer(blueprint, {}) 133 try: 134 valid, _ = importer.validate(raise_validation_errors=True) 135 if not valid: 136 raise ValidationError("Invalid blueprint") 137 except EntryInvalidError as exc: 138 raise ValidationError( 139 { 140 exc.entry_id: exc.validation_error.detail, 141 } 142 ) from None 143 return blueprint 144 145 146class TransactionApplicationResponseSerializer(PassiveSerializer): 147 """Transactional creation response""" 148 149 applied = BooleanField() 150 logs = ListField(child=CharField()) 151 152 153class TransactionalApplicationView(APIView): 154 """Create provider and application and attach them in a single transaction""" 155 156 permission_classes = [IsAuthenticated] 157 158 @extend_schema( 159 request=TransactionApplicationSerializer(), 160 responses={ 161 200: TransactionApplicationResponseSerializer(), 162 }, 163 ) 164 @validate(TransactionApplicationSerializer) 165 def put(self, request: Request, body: TransactionApplicationSerializer) -> Response: 166 """Convert data into a blueprint, validate it and apply it""" 167 blueprint: Blueprint = body.validated_data 168 check_blueprint_perms(blueprint, request.user, explicit_action="add") 169 importer = Importer(blueprint, {}) 170 applied = importer.apply() 171 response = {"applied": False, "logs": []} 172 response["applied"] = applied 173 return Response(response, status=200)
def
get_provider_serializer_mapping():
32def get_provider_serializer_mapping(): 33 """Get a mapping of all providers' model names and their serializers""" 34 mapping = {} 35 for model in all_subclasses(Provider): 36 if model._meta.abstract: 37 continue 38 mapping[f"{model._meta.app_label}.{model._meta.model_name}"] = model().serializer 39 return mapping
Get a mapping of all providers' model names and their serializers
@extend_schema_field(PolymorphicProxySerializer(component_name='model', serializers=get_provider_serializer_mapping, resource_type_field_name='provider_model'))
class
TransactionProviderField42@extend_schema_field( 43 PolymorphicProxySerializer( 44 component_name="model", 45 serializers=get_provider_serializer_mapping, 46 resource_type_field_name="provider_model", 47 ) 48) 49class TransactionProviderField(DictField): 50 """Dictionary field which can hold provider creation data"""
Dictionary field which can hold provider creation data
53class TransactionPolicyBindingSerializer(PolicyBindingSerializer): 54 """PolicyBindingSerializer which does not require target as target is set implicitly""" 55 56 def validate(self, attrs): 57 # As the PolicyBindingSerializer checks that the correct things can be bound to a target 58 # but we don't have a target here as that's set by the blueprint, pass in an empty app 59 # which will have the correct allowed combination of group/user/policy. 60 attrs["target"] = Application() 61 return super().validate(attrs) 62 63 class Meta(PolicyBindingSerializer.Meta): 64 fields = [x for x in PolicyBindingSerializer.Meta.fields if x != "target"]
PolicyBindingSerializer which does not require target as target is set implicitly
def
validate(self, attrs):
56 def validate(self, attrs): 57 # As the PolicyBindingSerializer checks that the correct things can be bound to a target 58 # but we don't have a target here as that's set by the blueprint, pass in an empty app 59 # which will have the correct allowed combination of group/user/policy. 60 attrs["target"] = Application() 61 return super().validate(attrs)
Check that either policy, group or user is set.
class
TransactionPolicyBindingSerializer.Meta(authentik.policies.api.bindings.PolicyBindingSerializer.Meta):
63 class Meta(PolicyBindingSerializer.Meta): 64 fields = [x for x in PolicyBindingSerializer.Meta.fields if x != "target"]
fields =
['pk', 'policy', 'group', 'user', 'policy_obj', 'group_obj', 'user_obj', 'negate', 'enabled', 'order', 'timeout', 'failure_result']
Inherited Members
67class TransactionApplicationSerializer(PassiveSerializer): 68 """Serializer for creating a provider and an application in one transaction""" 69 70 app = ApplicationSerializer() 71 provider_model = ChoiceField(choices=list(get_provider_serializer_mapping().keys())) 72 provider = TransactionProviderField() 73 74 policy_bindings = TransactionPolicyBindingSerializer(many=True, required=False) 75 76 _provider_model: type[Provider] = None 77 78 def validate_provider_model(self, fq_model_name: str) -> str: 79 """Validate that the model exists and is a provider""" 80 if "." not in fq_model_name: 81 raise ValidationError("Invalid provider model") 82 try: 83 app, _, model_name = fq_model_name.partition(".") 84 model = apps.get_model(app, model_name) 85 if not issubclass(model, Provider): 86 raise ValidationError("Invalid provider model") 87 self._provider_model = model 88 except LookupError: 89 raise ValidationError("Invalid provider model") from None 90 return fq_model_name 91 92 def validate(self, attrs: dict) -> dict: 93 blueprint = Blueprint() 94 blueprint.entries.append( 95 BlueprintEntry( 96 model=attrs["provider_model"], 97 state=BlueprintEntryDesiredState.MUST_CREATED, 98 identifiers={ 99 "name": attrs["provider"]["name"], 100 }, 101 # Must match the name of the field on `self` 102 id="provider", 103 attrs=attrs["provider"], 104 ) 105 ) 106 app_data = attrs["app"] 107 app_data["provider"] = KeyOf(None, ScalarNode(tag="", value="provider")) 108 blueprint.entries.append( 109 BlueprintEntry( 110 model="authentik_core.application", 111 state=BlueprintEntryDesiredState.MUST_CREATED, 112 identifiers={ 113 "slug": attrs["app"]["slug"], 114 }, 115 attrs=app_data, 116 # Must match the name of the field on `self` 117 id="app", 118 ) 119 ) 120 for binding in attrs.get("policy_bindings", []): 121 binding["target"] = KeyOf(None, ScalarNode(tag="", value="app")) 122 for key, value in binding.items(): 123 if not isinstance(value, Model): 124 continue 125 binding[key] = value.pk 126 blueprint.entries.append( 127 BlueprintEntry( 128 model="authentik_policies.policybinding", 129 state=BlueprintEntryDesiredState.MUST_CREATED, 130 identifiers=binding, 131 ) 132 ) 133 importer = Importer(blueprint, {}) 134 try: 135 valid, _ = importer.validate(raise_validation_errors=True) 136 if not valid: 137 raise ValidationError("Invalid blueprint") 138 except EntryInvalidError as exc: 139 raise ValidationError( 140 { 141 exc.entry_id: exc.validation_error.detail, 142 } 143 ) from None 144 return blueprint
Serializer for creating a provider and an application in one transaction
def
validate_provider_model(self, fq_model_name: str) -> str:
78 def validate_provider_model(self, fq_model_name: str) -> str: 79 """Validate that the model exists and is a provider""" 80 if "." not in fq_model_name: 81 raise ValidationError("Invalid provider model") 82 try: 83 app, _, model_name = fq_model_name.partition(".") 84 model = apps.get_model(app, model_name) 85 if not issubclass(model, Provider): 86 raise ValidationError("Invalid provider model") 87 self._provider_model = model 88 except LookupError: 89 raise ValidationError("Invalid provider model") from None 90 return fq_model_name
Validate that the model exists and is a provider
def
validate(self, attrs: dict) -> dict:
92 def validate(self, attrs: dict) -> dict: 93 blueprint = Blueprint() 94 blueprint.entries.append( 95 BlueprintEntry( 96 model=attrs["provider_model"], 97 state=BlueprintEntryDesiredState.MUST_CREATED, 98 identifiers={ 99 "name": attrs["provider"]["name"], 100 }, 101 # Must match the name of the field on `self` 102 id="provider", 103 attrs=attrs["provider"], 104 ) 105 ) 106 app_data = attrs["app"] 107 app_data["provider"] = KeyOf(None, ScalarNode(tag="", value="provider")) 108 blueprint.entries.append( 109 BlueprintEntry( 110 model="authentik_core.application", 111 state=BlueprintEntryDesiredState.MUST_CREATED, 112 identifiers={ 113 "slug": attrs["app"]["slug"], 114 }, 115 attrs=app_data, 116 # Must match the name of the field on `self` 117 id="app", 118 ) 119 ) 120 for binding in attrs.get("policy_bindings", []): 121 binding["target"] = KeyOf(None, ScalarNode(tag="", value="app")) 122 for key, value in binding.items(): 123 if not isinstance(value, Model): 124 continue 125 binding[key] = value.pk 126 blueprint.entries.append( 127 BlueprintEntry( 128 model="authentik_policies.policybinding", 129 state=BlueprintEntryDesiredState.MUST_CREATED, 130 identifiers=binding, 131 ) 132 ) 133 importer = Importer(blueprint, {}) 134 try: 135 valid, _ = importer.validate(raise_validation_errors=True) 136 if not valid: 137 raise ValidationError("Invalid blueprint") 138 except EntryInvalidError as exc: 139 raise ValidationError( 140 { 141 exc.entry_id: exc.validation_error.detail, 142 } 143 ) from None 144 return blueprint
Inherited Members
147class TransactionApplicationResponseSerializer(PassiveSerializer): 148 """Transactional creation response""" 149 150 applied = BooleanField() 151 logs = ListField(child=CharField())
Transactional creation response
Inherited Members
class
TransactionalApplicationView(rest_framework.views.APIView):
154class TransactionalApplicationView(APIView): 155 """Create provider and application and attach them in a single transaction""" 156 157 permission_classes = [IsAuthenticated] 158 159 @extend_schema( 160 request=TransactionApplicationSerializer(), 161 responses={ 162 200: TransactionApplicationResponseSerializer(), 163 }, 164 ) 165 @validate(TransactionApplicationSerializer) 166 def put(self, request: Request, body: TransactionApplicationSerializer) -> Response: 167 """Convert data into a blueprint, validate it and apply it""" 168 blueprint: Blueprint = body.validated_data 169 check_blueprint_perms(blueprint, request.user, explicit_action="add") 170 importer = Importer(blueprint, {}) 171 applied = importer.apply() 172 response = {"applied": False, "logs": []} 173 response["applied"] = applied 174 return Response(response, status=200)
Create provider and application and attach them in a single transaction
@extend_schema(request=TransactionApplicationSerializer(), responses={200: TransactionApplicationResponseSerializer()})
@validate(TransactionApplicationSerializer)
def
put( self, request: rest_framework.request.Request, body: TransactionApplicationSerializer) -> rest_framework.response.Response:
159 @extend_schema( 160 request=TransactionApplicationSerializer(), 161 responses={ 162 200: TransactionApplicationResponseSerializer(), 163 }, 164 ) 165 @validate(TransactionApplicationSerializer) 166 def put(self, request: Request, body: TransactionApplicationSerializer) -> Response: 167 """Convert data into a blueprint, validate it and apply it""" 168 blueprint: Blueprint = body.validated_data 169 check_blueprint_perms(blueprint, request.user, explicit_action="add") 170 importer = Importer(blueprint, {}) 171 applied = importer.apply() 172 response = {"applied": False, "logs": []} 173 response["applied"] = applied 174 return Response(response, status=200)
Convert data into a blueprint, validate it and apply it