authentik.core.api.transactional_applications

transactional application and provider creation

  1"""transactional application and provider creation"""
  2
  3from django.apps import apps
  4from django.db.models import Model
  5from drf_spectacular.utils import PolymorphicProxySerializer, extend_schema, extend_schema_field
  6from rest_framework.exceptions import ValidationError
  7from rest_framework.fields import BooleanField, CharField, ChoiceField, DictField, ListField
  8from rest_framework.permissions import IsAuthenticated
  9from rest_framework.request import Request
 10from rest_framework.response import Response
 11from rest_framework.views import APIView
 12from yaml import ScalarNode
 13
 14from authentik.api.validation import validate
 15from authentik.blueprints.api import check_blueprint_perms
 16from authentik.blueprints.v1.common import (
 17    Blueprint,
 18    BlueprintEntry,
 19    BlueprintEntryDesiredState,
 20    EntryInvalidError,
 21    KeyOf,
 22)
 23from authentik.blueprints.v1.importer import Importer
 24from authentik.core.api.applications import ApplicationSerializer
 25from authentik.core.api.utils import PassiveSerializer
 26from authentik.core.models import Application, Provider
 27from authentik.lib.utils.reflection import all_subclasses
 28from authentik.policies.api.bindings import PolicyBindingSerializer
 29
 30
 31def get_provider_serializer_mapping():
 32    """Get a mapping of all providers' model names and their serializers"""
 33    mapping = {}
 34    for model in all_subclasses(Provider):
 35        if model._meta.abstract:
 36            continue
 37        mapping[f"{model._meta.app_label}.{model._meta.model_name}"] = model().serializer
 38    return mapping
 39
 40
 41@extend_schema_field(
 42    PolymorphicProxySerializer(
 43        component_name="model",
 44        serializers=get_provider_serializer_mapping,
 45        resource_type_field_name="provider_model",
 46    )
 47)
 48class TransactionProviderField(DictField):
 49    """Dictionary field which can hold provider creation data"""
 50
 51
 52class TransactionPolicyBindingSerializer(PolicyBindingSerializer):
 53    """PolicyBindingSerializer which does not require target as target is set implicitly"""
 54
 55    def validate(self, attrs):
 56        # As the PolicyBindingSerializer checks that the correct things can be bound to a target
 57        # but we don't have a target here as that's set by the blueprint, pass in an empty app
 58        # which will have the correct allowed combination of group/user/policy.
 59        attrs["target"] = Application()
 60        return super().validate(attrs)
 61
 62    class Meta(PolicyBindingSerializer.Meta):
 63        fields = [x for x in PolicyBindingSerializer.Meta.fields if x != "target"]
 64
 65
 66class TransactionApplicationSerializer(PassiveSerializer):
 67    """Serializer for creating a provider and an application in one transaction"""
 68
 69    app = ApplicationSerializer()
 70    provider_model = ChoiceField(choices=list(get_provider_serializer_mapping().keys()))
 71    provider = TransactionProviderField()
 72
 73    policy_bindings = TransactionPolicyBindingSerializer(many=True, required=False)
 74
 75    _provider_model: type[Provider] = None
 76
 77    def validate_provider_model(self, fq_model_name: str) -> str:
 78        """Validate that the model exists and is a provider"""
 79        if "." not in fq_model_name:
 80            raise ValidationError("Invalid provider model")
 81        try:
 82            app, _, model_name = fq_model_name.partition(".")
 83            model = apps.get_model(app, model_name)
 84            if not issubclass(model, Provider):
 85                raise ValidationError("Invalid provider model")
 86            self._provider_model = model
 87        except LookupError:
 88            raise ValidationError("Invalid provider model") from None
 89        return fq_model_name
 90
 91    def validate(self, attrs: dict) -> dict:
 92        blueprint = Blueprint()
 93        blueprint.entries.append(
 94            BlueprintEntry(
 95                model=attrs["provider_model"],
 96                state=BlueprintEntryDesiredState.MUST_CREATED,
 97                identifiers={
 98                    "name": attrs["provider"]["name"],
 99                },
100                # Must match the name of the field on `self`
101                id="provider",
102                attrs=attrs["provider"],
103            )
104        )
105        app_data = attrs["app"]
106        app_data["provider"] = KeyOf(None, ScalarNode(tag="", value="provider"))
107        blueprint.entries.append(
108            BlueprintEntry(
109                model="authentik_core.application",
110                state=BlueprintEntryDesiredState.MUST_CREATED,
111                identifiers={
112                    "slug": attrs["app"]["slug"],
113                },
114                attrs=app_data,
115                # Must match the name of the field on `self`
116                id="app",
117            )
118        )
119        for binding in attrs.get("policy_bindings", []):
120            binding["target"] = KeyOf(None, ScalarNode(tag="", value="app"))
121            for key, value in binding.items():
122                if not isinstance(value, Model):
123                    continue
124                binding[key] = value.pk
125            blueprint.entries.append(
126                BlueprintEntry(
127                    model="authentik_policies.policybinding",
128                    state=BlueprintEntryDesiredState.MUST_CREATED,
129                    identifiers=binding,
130                )
131            )
132        importer = Importer(blueprint, {})
133        try:
134            valid, _ = importer.validate(raise_validation_errors=True)
135            if not valid:
136                raise ValidationError("Invalid blueprint")
137        except EntryInvalidError as exc:
138            raise ValidationError(
139                {
140                    exc.entry_id: exc.validation_error.detail,
141                }
142            ) from None
143        return blueprint
144
145
146class TransactionApplicationResponseSerializer(PassiveSerializer):
147    """Transactional creation response"""
148
149    applied = BooleanField()
150    logs = ListField(child=CharField())
151
152
153class TransactionalApplicationView(APIView):
154    """Create provider and application and attach them in a single transaction"""
155
156    permission_classes = [IsAuthenticated]
157
158    @extend_schema(
159        request=TransactionApplicationSerializer(),
160        responses={
161            200: TransactionApplicationResponseSerializer(),
162        },
163    )
164    @validate(TransactionApplicationSerializer)
165    def put(self, request: Request, body: TransactionApplicationSerializer) -> Response:
166        """Convert data into a blueprint, validate it and apply it"""
167        blueprint: Blueprint = body.validated_data
168        check_blueprint_perms(blueprint, request.user, explicit_action="add")
169        importer = Importer(blueprint, {})
170        applied = importer.apply()
171        response = {"applied": False, "logs": []}
172        response["applied"] = applied
173        return Response(response, status=200)
def get_provider_serializer_mapping():
32def get_provider_serializer_mapping():
33    """Get a mapping of all providers' model names and their serializers"""
34    mapping = {}
35    for model in all_subclasses(Provider):
36        if model._meta.abstract:
37            continue
38        mapping[f"{model._meta.app_label}.{model._meta.model_name}"] = model().serializer
39    return mapping

Get a mapping of all providers' model names and their serializers

@extend_schema_field(PolymorphicProxySerializer(component_name='model', serializers=get_provider_serializer_mapping, resource_type_field_name='provider_model'))
class TransactionProviderField(rest_framework.fields.DictField):
42@extend_schema_field(
43    PolymorphicProxySerializer(
44        component_name="model",
45        serializers=get_provider_serializer_mapping,
46        resource_type_field_name="provider_model",
47    )
48)
49class TransactionProviderField(DictField):
50    """Dictionary field which can hold provider creation data"""

Dictionary field which can hold provider creation data

class TransactionPolicyBindingSerializer(authentik.policies.api.bindings.PolicyBindingSerializer):
53class TransactionPolicyBindingSerializer(PolicyBindingSerializer):
54    """PolicyBindingSerializer which does not require target as target is set implicitly"""
55
56    def validate(self, attrs):
57        # As the PolicyBindingSerializer checks that the correct things can be bound to a target
58        # but we don't have a target here as that's set by the blueprint, pass in an empty app
59        # which will have the correct allowed combination of group/user/policy.
60        attrs["target"] = Application()
61        return super().validate(attrs)
62
63    class Meta(PolicyBindingSerializer.Meta):
64        fields = [x for x in PolicyBindingSerializer.Meta.fields if x != "target"]

PolicyBindingSerializer which does not require target as target is set implicitly

def validate(self, attrs):
56    def validate(self, attrs):
57        # As the PolicyBindingSerializer checks that the correct things can be bound to a target
58        # but we don't have a target here as that's set by the blueprint, pass in an empty app
59        # which will have the correct allowed combination of group/user/policy.
60        attrs["target"] = Application()
61        return super().validate(attrs)

Check that either policy, group or user is set.

class TransactionPolicyBindingSerializer.Meta(authentik.policies.api.bindings.PolicyBindingSerializer.Meta):
63    class Meta(PolicyBindingSerializer.Meta):
64        fields = [x for x in PolicyBindingSerializer.Meta.fields if x != "target"]
fields = ['pk', 'policy', 'group', 'user', 'policy_obj', 'group_obj', 'user_obj', 'negate', 'enabled', 'order', 'timeout', 'failure_result']
class TransactionApplicationSerializer(authentik.core.api.utils.PassiveSerializer):
 67class TransactionApplicationSerializer(PassiveSerializer):
 68    """Serializer for creating a provider and an application in one transaction"""
 69
 70    app = ApplicationSerializer()
 71    provider_model = ChoiceField(choices=list(get_provider_serializer_mapping().keys()))
 72    provider = TransactionProviderField()
 73
 74    policy_bindings = TransactionPolicyBindingSerializer(many=True, required=False)
 75
 76    _provider_model: type[Provider] = None
 77
 78    def validate_provider_model(self, fq_model_name: str) -> str:
 79        """Validate that the model exists and is a provider"""
 80        if "." not in fq_model_name:
 81            raise ValidationError("Invalid provider model")
 82        try:
 83            app, _, model_name = fq_model_name.partition(".")
 84            model = apps.get_model(app, model_name)
 85            if not issubclass(model, Provider):
 86                raise ValidationError("Invalid provider model")
 87            self._provider_model = model
 88        except LookupError:
 89            raise ValidationError("Invalid provider model") from None
 90        return fq_model_name
 91
 92    def validate(self, attrs: dict) -> dict:
 93        blueprint = Blueprint()
 94        blueprint.entries.append(
 95            BlueprintEntry(
 96                model=attrs["provider_model"],
 97                state=BlueprintEntryDesiredState.MUST_CREATED,
 98                identifiers={
 99                    "name": attrs["provider"]["name"],
100                },
101                # Must match the name of the field on `self`
102                id="provider",
103                attrs=attrs["provider"],
104            )
105        )
106        app_data = attrs["app"]
107        app_data["provider"] = KeyOf(None, ScalarNode(tag="", value="provider"))
108        blueprint.entries.append(
109            BlueprintEntry(
110                model="authentik_core.application",
111                state=BlueprintEntryDesiredState.MUST_CREATED,
112                identifiers={
113                    "slug": attrs["app"]["slug"],
114                },
115                attrs=app_data,
116                # Must match the name of the field on `self`
117                id="app",
118            )
119        )
120        for binding in attrs.get("policy_bindings", []):
121            binding["target"] = KeyOf(None, ScalarNode(tag="", value="app"))
122            for key, value in binding.items():
123                if not isinstance(value, Model):
124                    continue
125                binding[key] = value.pk
126            blueprint.entries.append(
127                BlueprintEntry(
128                    model="authentik_policies.policybinding",
129                    state=BlueprintEntryDesiredState.MUST_CREATED,
130                    identifiers=binding,
131                )
132            )
133        importer = Importer(blueprint, {})
134        try:
135            valid, _ = importer.validate(raise_validation_errors=True)
136            if not valid:
137                raise ValidationError("Invalid blueprint")
138        except EntryInvalidError as exc:
139            raise ValidationError(
140                {
141                    exc.entry_id: exc.validation_error.detail,
142                }
143            ) from None
144        return blueprint

Serializer for creating a provider and an application in one transaction

app
provider_model
provider
policy_bindings
def validate_provider_model(self, fq_model_name: str) -> str:
78    def validate_provider_model(self, fq_model_name: str) -> str:
79        """Validate that the model exists and is a provider"""
80        if "." not in fq_model_name:
81            raise ValidationError("Invalid provider model")
82        try:
83            app, _, model_name = fq_model_name.partition(".")
84            model = apps.get_model(app, model_name)
85            if not issubclass(model, Provider):
86                raise ValidationError("Invalid provider model")
87            self._provider_model = model
88        except LookupError:
89            raise ValidationError("Invalid provider model") from None
90        return fq_model_name

Validate that the model exists and is a provider

def validate(self, attrs: dict) -> dict:
 92    def validate(self, attrs: dict) -> dict:
 93        blueprint = Blueprint()
 94        blueprint.entries.append(
 95            BlueprintEntry(
 96                model=attrs["provider_model"],
 97                state=BlueprintEntryDesiredState.MUST_CREATED,
 98                identifiers={
 99                    "name": attrs["provider"]["name"],
100                },
101                # Must match the name of the field on `self`
102                id="provider",
103                attrs=attrs["provider"],
104            )
105        )
106        app_data = attrs["app"]
107        app_data["provider"] = KeyOf(None, ScalarNode(tag="", value="provider"))
108        blueprint.entries.append(
109            BlueprintEntry(
110                model="authentik_core.application",
111                state=BlueprintEntryDesiredState.MUST_CREATED,
112                identifiers={
113                    "slug": attrs["app"]["slug"],
114                },
115                attrs=app_data,
116                # Must match the name of the field on `self`
117                id="app",
118            )
119        )
120        for binding in attrs.get("policy_bindings", []):
121            binding["target"] = KeyOf(None, ScalarNode(tag="", value="app"))
122            for key, value in binding.items():
123                if not isinstance(value, Model):
124                    continue
125                binding[key] = value.pk
126            blueprint.entries.append(
127                BlueprintEntry(
128                    model="authentik_policies.policybinding",
129                    state=BlueprintEntryDesiredState.MUST_CREATED,
130                    identifiers=binding,
131                )
132            )
133        importer = Importer(blueprint, {})
134        try:
135            valid, _ = importer.validate(raise_validation_errors=True)
136            if not valid:
137                raise ValidationError("Invalid blueprint")
138        except EntryInvalidError as exc:
139            raise ValidationError(
140                {
141                    exc.entry_id: exc.validation_error.detail,
142                }
143            ) from None
144        return blueprint
class TransactionApplicationResponseSerializer(authentik.core.api.utils.PassiveSerializer):
147class TransactionApplicationResponseSerializer(PassiveSerializer):
148    """Transactional creation response"""
149
150    applied = BooleanField()
151    logs = ListField(child=CharField())

Transactional creation response

applied
logs
class TransactionalApplicationView(rest_framework.views.APIView):
154class TransactionalApplicationView(APIView):
155    """Create provider and application and attach them in a single transaction"""
156
157    permission_classes = [IsAuthenticated]
158
159    @extend_schema(
160        request=TransactionApplicationSerializer(),
161        responses={
162            200: TransactionApplicationResponseSerializer(),
163        },
164    )
165    @validate(TransactionApplicationSerializer)
166    def put(self, request: Request, body: TransactionApplicationSerializer) -> Response:
167        """Convert data into a blueprint, validate it and apply it"""
168        blueprint: Blueprint = body.validated_data
169        check_blueprint_perms(blueprint, request.user, explicit_action="add")
170        importer = Importer(blueprint, {})
171        applied = importer.apply()
172        response = {"applied": False, "logs": []}
173        response["applied"] = applied
174        return Response(response, status=200)

Create provider and application and attach them in a single transaction

permission_classes = [<class 'rest_framework.permissions.IsAuthenticated'>]
@extend_schema(request=TransactionApplicationSerializer(), responses={200: TransactionApplicationResponseSerializer()})
@validate(TransactionApplicationSerializer)
def put( self, request: rest_framework.request.Request, body: TransactionApplicationSerializer) -> rest_framework.response.Response:
159    @extend_schema(
160        request=TransactionApplicationSerializer(),
161        responses={
162            200: TransactionApplicationResponseSerializer(),
163        },
164    )
165    @validate(TransactionApplicationSerializer)
166    def put(self, request: Request, body: TransactionApplicationSerializer) -> Response:
167        """Convert data into a blueprint, validate it and apply it"""
168        blueprint: Blueprint = body.validated_data
169        check_blueprint_perms(blueprint, request.user, explicit_action="add")
170        importer = Importer(blueprint, {})
171        applied = importer.apply()
172        response = {"applied": False, "logs": []}
173        response["applied"] = applied
174        return Response(response, status=200)

Convert data into a blueprint, validate it and apply it