authentik.blueprints.v1.importer

Blueprint importer

  1"""Blueprint importer"""
  2
  3from contextlib import contextmanager
  4from copy import deepcopy
  5from typing import Any
  6
  7from dacite.config import Config
  8from dacite.core import from_dict
  9from dacite.exceptions import DaciteError
 10from deepmerge import always_merger
 11from django.contrib.auth.models import Permission
 12from django.contrib.contenttypes.models import ContentType
 13from django.core.exceptions import FieldError
 14from django.db.models import Model
 15from django.db.models.query_utils import Q
 16from django.db.transaction import atomic
 17from django.db.utils import IntegrityError
 18from guardian.models import RoleObjectPermission
 19from rest_framework.exceptions import ValidationError
 20from rest_framework.serializers import BaseSerializer, Serializer
 21from structlog.stdlib import BoundLogger, get_logger
 22from yaml import load
 23
 24from authentik.blueprints.v1.common import (
 25    Blueprint,
 26    BlueprintEntry,
 27    BlueprintEntryDesiredState,
 28    BlueprintEntryState,
 29    BlueprintLoader,
 30    EntryInvalidError,
 31)
 32from authentik.blueprints.v1.meta.registry import BaseMetaModel, registry
 33from authentik.core.models import (
 34    AuthenticatedSession,
 35    GroupSourceConnection,
 36    PropertyMapping,
 37    Provider,
 38    Session,
 39    Source,
 40    User,
 41    UserSourceConnection,
 42)
 43from authentik.endpoints.models import Connector
 44from authentik.events.logs import LogEvent, capture_logs
 45from authentik.events.utils import cleanse_dict
 46from authentik.flows.models import Stage
 47from authentik.lib.models import InternallyManagedMixin, SerializerModel
 48from authentik.lib.sentry import SentryIgnoredException
 49from authentik.lib.utils.reflection import get_apps
 50from authentik.outposts.models import OutpostServiceConnection
 51from authentik.policies.models import Policy, PolicyBindingModel
 52from authentik.rbac.models import Role
 53
 54# Context set when the serializer is created in a blueprint context
 55# Update website/docs/customize/blueprints/v1/models.md when used
 56SERIALIZER_CONTEXT_BLUEPRINT = "blueprint_entry"
 57
 58
 59def excluded_models() -> list[type[Model]]:
 60    """Return a list of all excluded models that shouldn't be exposed via API
 61    or other means (internal only, base classes, non-used objects, etc)"""
 62
 63    from django.contrib.auth.models import Group as DjangoGroup
 64    from django.contrib.auth.models import User as DjangoUser
 65
 66    return (
 67        # Django only classes
 68        DjangoUser,
 69        DjangoGroup,
 70        ContentType,
 71        Permission,
 72        RoleObjectPermission,
 73        # Base classes
 74        Provider,
 75        Source,
 76        PropertyMapping,
 77        UserSourceConnection,
 78        GroupSourceConnection,
 79        Stage,
 80        OutpostServiceConnection,
 81        Policy,
 82        PolicyBindingModel,
 83        Connector,
 84        # Classes that have other dependencies
 85        Session,
 86        AuthenticatedSession,
 87    )
 88
 89
 90def is_model_allowed(model: type[Model]) -> bool:
 91    """Check if model is allowed"""
 92    return (
 93        model not in excluded_models()
 94        and issubclass(model, SerializerModel | BaseMetaModel)
 95        and not issubclass(model, InternallyManagedMixin)
 96    )
 97
 98
 99class DoRollback(SentryIgnoredException):
100    """Exception to trigger a rollback"""
101
102
103@contextmanager
104def transaction_rollback():
105    """Enters an atomic transaction and always triggers a rollback at the end of the block."""
106    try:
107        with atomic():
108            yield
109            raise DoRollback()
110    except DoRollback:
111        pass
112
113
114def rbac_models() -> dict:
115    models = {}
116    for app in get_apps():
117        for model in app.get_models():
118            if not is_model_allowed(model):
119                continue
120            models[model._meta.model_name] = app.label
121    return models
122
123
124class Importer:
125    """Import Blueprint from raw dict or YAML/JSON"""
126
127    logger: BoundLogger
128    _import: Blueprint
129
130    def __init__(self, blueprint: Blueprint, context: dict | None = None):
131        self.__pk_map: dict[Any, Model] = {}
132        self._import = blueprint
133        self.logger = get_logger()
134        ctx = self.default_context()
135        always_merger.merge(ctx, self._import.context)
136        if context:
137            always_merger.merge(ctx, context)
138        self._import.context = ctx
139
140    def default_context(self):
141        """Default context"""
142        context = {
143            "goauthentik.io/rbac/models": rbac_models(),
144            "goauthentik.io/enterprise/licensed": False,
145        }
146        try:
147            from authentik.enterprise.license import LicenseKey
148
149            context["goauthentik.io/enterprise/licensed"] = LicenseKey.get_total().status().is_valid
150        except ModuleNotFoundError:
151            pass
152        return context
153
154    @staticmethod
155    def from_string(yaml_input: str, context: dict | None = None) -> Importer:
156        """Parse YAML string and create blueprint importer from it"""
157        import_dict = load(yaml_input, BlueprintLoader)
158        try:
159            _import = from_dict(
160                Blueprint, import_dict, config=Config(cast=[BlueprintEntryDesiredState])
161            )
162        except DaciteError as exc:
163            raise EntryInvalidError from exc
164        return Importer(_import, context)
165
166    @property
167    def blueprint(self) -> Blueprint:
168        """Get imported blueprint"""
169        return self._import
170
171    def __update_pks_for_attrs(self, attrs: dict[str, Any]) -> dict[str, Any]:
172        """Replace any value if it is a known primary key of an other object"""
173
174        def updater(value) -> Any:
175            if value in self.__pk_map:
176                self.logger.debug("Updating reference in entry", value=value)
177                return self.__pk_map[value]
178            return value
179
180        for key, value in attrs.items():
181            try:
182                if isinstance(value, dict):
183                    for _, _inner_key in enumerate(value):
184                        value[_inner_key] = updater(value[_inner_key])
185                elif isinstance(value, list):
186                    for idx, _inner_value in enumerate(value):
187                        attrs[key][idx] = updater(_inner_value)
188                else:
189                    attrs[key] = updater(value)
190            except TypeError:
191                continue
192        return attrs
193
194    def __query_from_identifier(self, attrs: dict[str, Any]) -> Q:
195        """Generate an or'd query from all identifiers in an entry"""
196        # Since identifiers can also be pk-references to other objects (see FlowStageBinding)
197        # we have to ensure those references are also replaced
198        main_query = Q()
199        if "pk" in attrs:
200            main_query = Q(pk=attrs["pk"])
201        sub_query = Q()
202        for identifier, value in attrs.items():
203            if identifier == "pk":
204                continue
205            if isinstance(value, dict):
206                sub_query &= Q(**{f"{identifier}__contains": value})
207            else:
208                sub_query &= Q(**{identifier: value})
209
210        return main_query | sub_query
211
212    def _validate_single(self, entry: BlueprintEntry) -> BaseSerializer | None:  # noqa: PLR0915
213        """Validate a single entry"""
214        if not entry.check_all_conditions_match(self._import):
215            self.logger.debug("One or more conditions of this entry are not fulfilled, skipping")
216            return None
217
218        model_app_label, model_name = entry.get_model(self._import).split(".")
219        try:
220            model: type[SerializerModel] = registry.get_model(model_app_label, model_name)
221        except LookupError as exc:
222            raise EntryInvalidError.from_entry(exc, entry) from exc
223        # Don't use isinstance since we don't want to check for inheritance
224        if not is_model_allowed(model):
225            raise EntryInvalidError.from_entry(f"Model {model} not allowed", entry)
226        if issubclass(model, BaseMetaModel):
227            serializer_class: type[Serializer] = model.serializer()
228            serializer = serializer_class(
229                data=entry.get_attrs(self._import),
230                context={
231                    SERIALIZER_CONTEXT_BLUEPRINT: entry,
232                },
233            )
234            try:
235                serializer.is_valid(raise_exception=True)
236            except ValidationError as exc:
237                raise EntryInvalidError.from_entry(
238                    f"Serializer errors {serializer.errors}",
239                    validation_error=exc,
240                    entry=entry,
241                ) from exc
242            return serializer
243
244        # If we try to validate without referencing a possible instance
245        # we'll get a duplicate error, hence we load the model here and return
246        # the full serializer for later usage
247        # Because a model might have multiple unique columns, we chain all identifiers together
248        # to create an OR query.
249        updated_identifiers = self.__update_pks_for_attrs(entry.get_identifiers(self._import))
250        for key, value in list(updated_identifiers.items()):
251            if isinstance(value, dict) and "pk" in value:
252                del updated_identifiers[key]
253                updated_identifiers[f"{key}"] = value["pk"]
254
255        query = self.__query_from_identifier(updated_identifiers)
256        if not query:
257            raise EntryInvalidError.from_entry("No or invalid identifiers", entry)
258
259        try:
260            existing_models = model.objects.filter(query)
261        except FieldError as exc:
262            raise EntryInvalidError.from_entry(f"Invalid identifier field: {exc}", entry) from exc
263
264        serializer_kwargs = {}
265        model_instance = existing_models.first()
266        override_serializer_instance = False
267        if (
268            not isinstance(model(), BaseMetaModel)
269            and model_instance
270            and entry.state != BlueprintEntryDesiredState.MUST_CREATED
271        ):
272            self.logger.debug(
273                "Initialize serializer with instance",
274                model=model,
275                instance=model_instance,
276                pk=model_instance.pk,
277            )
278            serializer_kwargs["instance"] = model_instance
279            serializer_kwargs["partial"] = True
280        elif model_instance and entry.state == BlueprintEntryDesiredState.MUST_CREATED:
281            msg = (
282                f"State is set to {BlueprintEntryDesiredState.MUST_CREATED.value} "
283                "and object exists already",
284            )
285            raise EntryInvalidError.from_entry(
286                ValidationError({k: msg for k in entry.identifiers.keys()}, "unique"),
287                entry,
288            )
289        else:
290            self.logger.debug(
291                "Initialized new serializer instance",
292                model=model,
293                **cleanse_dict(updated_identifiers),
294            )
295            override_serializer_instance = True
296        try:
297            full_data = self.__update_pks_for_attrs(entry.get_attrs(self._import))
298        except ValueError as exc:
299            raise EntryInvalidError.from_entry(exc, entry) from exc
300        always_merger.merge(full_data, updated_identifiers)
301        serializer_kwargs["data"] = full_data
302
303        serializer: Serializer = model().serializer(
304            context={
305                SERIALIZER_CONTEXT_BLUEPRINT: entry,
306            },
307            **serializer_kwargs,
308        )
309        try:
310            serializer.is_valid(raise_exception=True)
311        except ValidationError as exc:
312            raise EntryInvalidError.from_entry(
313                f"Serializer errors {serializer.errors}",
314                validation_error=exc,
315                entry=entry,
316                serializer=serializer,
317            ) from exc
318        if override_serializer_instance:
319            model_instance = model()
320            # pk needs to be set on the model instance otherwise a new one will be generated
321            if "pk" in updated_identifiers:
322                model_instance.pk = updated_identifiers["pk"]
323            serializer.instance = model_instance
324        return serializer
325
326    def _apply_permissions(self, instance: Model, entry: BlueprintEntry):
327        """Apply object-level permissions for an entry"""
328        for perm in entry.get_permissions(self._import):
329            if perm.user is not None:
330                User.objects.get(pk=perm.user).assign_perms_to_managed_role(
331                    perm.permission, instance
332                )
333            if perm.role is not None:
334                role = Role.objects.get(pk=perm.role)
335                role.assign_perms(perm.permission, obj=instance)
336
337    def apply(self) -> bool:
338        """Apply (create/update) models yaml, in database transaction"""
339        try:
340            with atomic():
341                if not self._apply_models():
342                    self.logger.debug("Reverting changes due to error")
343                    raise IntegrityError
344        except IntegrityError:
345            return False
346        self.logger.debug("Committing changes")
347        return True
348
349    def _apply_models(self, raise_errors=False) -> bool:
350        """Apply (create/update) models yaml"""
351        self.__pk_map = {}
352        for entry in self._import.iter_entries():
353            model_app_label, model_name = entry.get_model(self._import).split(".")
354            try:
355                model: type[SerializerModel] = registry.get_model(model_app_label, model_name)
356            except LookupError:
357                self.logger.warning(
358                    "App or Model does not exist", app=model_app_label, model=model_name
359                )
360                return False
361            # Validate each single entry
362            serializer = None
363            try:
364                serializer = self._validate_single(entry)
365            except EntryInvalidError as exc:
366                # For deleting objects we don't need the serializer to be valid
367                if entry.get_state(self._import) == BlueprintEntryDesiredState.ABSENT:
368                    serializer = exc.serializer
369                else:
370                    self.logger.warning(f"Entry invalid: {exc}", entry=entry, error=exc)
371                    if raise_errors:
372                        raise exc
373                    return False
374            if not serializer:
375                continue
376
377            state = entry.get_state(self._import)
378            if state in [
379                BlueprintEntryDesiredState.PRESENT,
380                BlueprintEntryDesiredState.CREATED,
381                BlueprintEntryDesiredState.MUST_CREATED,
382            ]:
383                instance = serializer.instance
384                if (
385                    instance
386                    and not instance._state.adding
387                    and state == BlueprintEntryDesiredState.CREATED
388                ):
389                    self.logger.debug(
390                        "Instance exists, skipping",
391                        model=model,
392                        instance=instance,
393                        pk=instance.pk,
394                    )
395                else:
396                    instance = serializer.save()
397                    self.logger.debug("Updated model", model=instance)
398                if "pk" in entry.identifiers:
399                    self.__pk_map[entry.identifiers["pk"]] = instance.pk
400                entry._state = BlueprintEntryState(instance)
401                self._apply_permissions(instance, entry)
402            elif state == BlueprintEntryDesiredState.ABSENT:
403                instance: Model | None = serializer.instance
404                if instance and instance.pk:
405                    instance.delete()
406                    self.logger.debug("Deleted model", mode=instance)
407                    continue
408                self.logger.debug("Entry to delete with no instance, skipping")
409        return True
410
411    def validate(self, raise_validation_errors=False) -> tuple[bool, list[LogEvent]]:
412        """Validate loaded blueprint export, ensure all models are allowed
413        and serializers have no errors"""
414        self.logger.debug("Starting blueprint import validation")
415        orig_import = deepcopy(self._import)
416        if self._import.version != 1:
417            self.logger.warning("Invalid blueprint version")
418            return False, [LogEvent("Invalid blueprint version", log_level="warning", logger=None)]
419        with (
420            transaction_rollback(),
421            capture_logs() as logs,
422        ):
423            successful = self._apply_models(raise_errors=raise_validation_errors)
424            if not successful:
425                self.logger.warning("Blueprint validation failed")
426        self.logger.debug("Finished blueprint import validation")
427        self._import = orig_import
428        return successful, logs
SERIALIZER_CONTEXT_BLUEPRINT = 'blueprint_entry'
def excluded_models() -> list[type[django.db.models.base.Model]]:
60def excluded_models() -> list[type[Model]]:
61    """Return a list of all excluded models that shouldn't be exposed via API
62    or other means (internal only, base classes, non-used objects, etc)"""
63
64    from django.contrib.auth.models import Group as DjangoGroup
65    from django.contrib.auth.models import User as DjangoUser
66
67    return (
68        # Django only classes
69        DjangoUser,
70        DjangoGroup,
71        ContentType,
72        Permission,
73        RoleObjectPermission,
74        # Base classes
75        Provider,
76        Source,
77        PropertyMapping,
78        UserSourceConnection,
79        GroupSourceConnection,
80        Stage,
81        OutpostServiceConnection,
82        Policy,
83        PolicyBindingModel,
84        Connector,
85        # Classes that have other dependencies
86        Session,
87        AuthenticatedSession,
88    )

Return a list of all excluded models that shouldn't be exposed via API or other means (internal only, base classes, non-used objects, etc)

def is_model_allowed(model: type[django.db.models.base.Model]) -> bool:
91def is_model_allowed(model: type[Model]) -> bool:
92    """Check if model is allowed"""
93    return (
94        model not in excluded_models()
95        and issubclass(model, SerializerModel | BaseMetaModel)
96        and not issubclass(model, InternallyManagedMixin)
97    )

Check if model is allowed

class DoRollback(authentik.lib.sentry.SentryIgnoredException):
100class DoRollback(SentryIgnoredException):
101    """Exception to trigger a rollback"""

Exception to trigger a rollback

@contextmanager
def transaction_rollback():
104@contextmanager
105def transaction_rollback():
106    """Enters an atomic transaction and always triggers a rollback at the end of the block."""
107    try:
108        with atomic():
109            yield
110            raise DoRollback()
111    except DoRollback:
112        pass

Enters an atomic transaction and always triggers a rollback at the end of the block.

def rbac_models() -> dict:
115def rbac_models() -> dict:
116    models = {}
117    for app in get_apps():
118        for model in app.get_models():
119            if not is_model_allowed(model):
120                continue
121            models[model._meta.model_name] = app.label
122    return models
class Importer:
125class Importer:
126    """Import Blueprint from raw dict or YAML/JSON"""
127
128    logger: BoundLogger
129    _import: Blueprint
130
131    def __init__(self, blueprint: Blueprint, context: dict | None = None):
132        self.__pk_map: dict[Any, Model] = {}
133        self._import = blueprint
134        self.logger = get_logger()
135        ctx = self.default_context()
136        always_merger.merge(ctx, self._import.context)
137        if context:
138            always_merger.merge(ctx, context)
139        self._import.context = ctx
140
141    def default_context(self):
142        """Default context"""
143        context = {
144            "goauthentik.io/rbac/models": rbac_models(),
145            "goauthentik.io/enterprise/licensed": False,
146        }
147        try:
148            from authentik.enterprise.license import LicenseKey
149
150            context["goauthentik.io/enterprise/licensed"] = LicenseKey.get_total().status().is_valid
151        except ModuleNotFoundError:
152            pass
153        return context
154
155    @staticmethod
156    def from_string(yaml_input: str, context: dict | None = None) -> Importer:
157        """Parse YAML string and create blueprint importer from it"""
158        import_dict = load(yaml_input, BlueprintLoader)
159        try:
160            _import = from_dict(
161                Blueprint, import_dict, config=Config(cast=[BlueprintEntryDesiredState])
162            )
163        except DaciteError as exc:
164            raise EntryInvalidError from exc
165        return Importer(_import, context)
166
167    @property
168    def blueprint(self) -> Blueprint:
169        """Get imported blueprint"""
170        return self._import
171
172    def __update_pks_for_attrs(self, attrs: dict[str, Any]) -> dict[str, Any]:
173        """Replace any value if it is a known primary key of an other object"""
174
175        def updater(value) -> Any:
176            if value in self.__pk_map:
177                self.logger.debug("Updating reference in entry", value=value)
178                return self.__pk_map[value]
179            return value
180
181        for key, value in attrs.items():
182            try:
183                if isinstance(value, dict):
184                    for _, _inner_key in enumerate(value):
185                        value[_inner_key] = updater(value[_inner_key])
186                elif isinstance(value, list):
187                    for idx, _inner_value in enumerate(value):
188                        attrs[key][idx] = updater(_inner_value)
189                else:
190                    attrs[key] = updater(value)
191            except TypeError:
192                continue
193        return attrs
194
195    def __query_from_identifier(self, attrs: dict[str, Any]) -> Q:
196        """Generate an or'd query from all identifiers in an entry"""
197        # Since identifiers can also be pk-references to other objects (see FlowStageBinding)
198        # we have to ensure those references are also replaced
199        main_query = Q()
200        if "pk" in attrs:
201            main_query = Q(pk=attrs["pk"])
202        sub_query = Q()
203        for identifier, value in attrs.items():
204            if identifier == "pk":
205                continue
206            if isinstance(value, dict):
207                sub_query &= Q(**{f"{identifier}__contains": value})
208            else:
209                sub_query &= Q(**{identifier: value})
210
211        return main_query | sub_query
212
213    def _validate_single(self, entry: BlueprintEntry) -> BaseSerializer | None:  # noqa: PLR0915
214        """Validate a single entry"""
215        if not entry.check_all_conditions_match(self._import):
216            self.logger.debug("One or more conditions of this entry are not fulfilled, skipping")
217            return None
218
219        model_app_label, model_name = entry.get_model(self._import).split(".")
220        try:
221            model: type[SerializerModel] = registry.get_model(model_app_label, model_name)
222        except LookupError as exc:
223            raise EntryInvalidError.from_entry(exc, entry) from exc
224        # Don't use isinstance since we don't want to check for inheritance
225        if not is_model_allowed(model):
226            raise EntryInvalidError.from_entry(f"Model {model} not allowed", entry)
227        if issubclass(model, BaseMetaModel):
228            serializer_class: type[Serializer] = model.serializer()
229            serializer = serializer_class(
230                data=entry.get_attrs(self._import),
231                context={
232                    SERIALIZER_CONTEXT_BLUEPRINT: entry,
233                },
234            )
235            try:
236                serializer.is_valid(raise_exception=True)
237            except ValidationError as exc:
238                raise EntryInvalidError.from_entry(
239                    f"Serializer errors {serializer.errors}",
240                    validation_error=exc,
241                    entry=entry,
242                ) from exc
243            return serializer
244
245        # If we try to validate without referencing a possible instance
246        # we'll get a duplicate error, hence we load the model here and return
247        # the full serializer for later usage
248        # Because a model might have multiple unique columns, we chain all identifiers together
249        # to create an OR query.
250        updated_identifiers = self.__update_pks_for_attrs(entry.get_identifiers(self._import))
251        for key, value in list(updated_identifiers.items()):
252            if isinstance(value, dict) and "pk" in value:
253                del updated_identifiers[key]
254                updated_identifiers[f"{key}"] = value["pk"]
255
256        query = self.__query_from_identifier(updated_identifiers)
257        if not query:
258            raise EntryInvalidError.from_entry("No or invalid identifiers", entry)
259
260        try:
261            existing_models = model.objects.filter(query)
262        except FieldError as exc:
263            raise EntryInvalidError.from_entry(f"Invalid identifier field: {exc}", entry) from exc
264
265        serializer_kwargs = {}
266        model_instance = existing_models.first()
267        override_serializer_instance = False
268        if (
269            not isinstance(model(), BaseMetaModel)
270            and model_instance
271            and entry.state != BlueprintEntryDesiredState.MUST_CREATED
272        ):
273            self.logger.debug(
274                "Initialize serializer with instance",
275                model=model,
276                instance=model_instance,
277                pk=model_instance.pk,
278            )
279            serializer_kwargs["instance"] = model_instance
280            serializer_kwargs["partial"] = True
281        elif model_instance and entry.state == BlueprintEntryDesiredState.MUST_CREATED:
282            msg = (
283                f"State is set to {BlueprintEntryDesiredState.MUST_CREATED.value} "
284                "and object exists already",
285            )
286            raise EntryInvalidError.from_entry(
287                ValidationError({k: msg for k in entry.identifiers.keys()}, "unique"),
288                entry,
289            )
290        else:
291            self.logger.debug(
292                "Initialized new serializer instance",
293                model=model,
294                **cleanse_dict(updated_identifiers),
295            )
296            override_serializer_instance = True
297        try:
298            full_data = self.__update_pks_for_attrs(entry.get_attrs(self._import))
299        except ValueError as exc:
300            raise EntryInvalidError.from_entry(exc, entry) from exc
301        always_merger.merge(full_data, updated_identifiers)
302        serializer_kwargs["data"] = full_data
303
304        serializer: Serializer = model().serializer(
305            context={
306                SERIALIZER_CONTEXT_BLUEPRINT: entry,
307            },
308            **serializer_kwargs,
309        )
310        try:
311            serializer.is_valid(raise_exception=True)
312        except ValidationError as exc:
313            raise EntryInvalidError.from_entry(
314                f"Serializer errors {serializer.errors}",
315                validation_error=exc,
316                entry=entry,
317                serializer=serializer,
318            ) from exc
319        if override_serializer_instance:
320            model_instance = model()
321            # pk needs to be set on the model instance otherwise a new one will be generated
322            if "pk" in updated_identifiers:
323                model_instance.pk = updated_identifiers["pk"]
324            serializer.instance = model_instance
325        return serializer
326
327    def _apply_permissions(self, instance: Model, entry: BlueprintEntry):
328        """Apply object-level permissions for an entry"""
329        for perm in entry.get_permissions(self._import):
330            if perm.user is not None:
331                User.objects.get(pk=perm.user).assign_perms_to_managed_role(
332                    perm.permission, instance
333                )
334            if perm.role is not None:
335                role = Role.objects.get(pk=perm.role)
336                role.assign_perms(perm.permission, obj=instance)
337
338    def apply(self) -> bool:
339        """Apply (create/update) models yaml, in database transaction"""
340        try:
341            with atomic():
342                if not self._apply_models():
343                    self.logger.debug("Reverting changes due to error")
344                    raise IntegrityError
345        except IntegrityError:
346            return False
347        self.logger.debug("Committing changes")
348        return True
349
350    def _apply_models(self, raise_errors=False) -> bool:
351        """Apply (create/update) models yaml"""
352        self.__pk_map = {}
353        for entry in self._import.iter_entries():
354            model_app_label, model_name = entry.get_model(self._import).split(".")
355            try:
356                model: type[SerializerModel] = registry.get_model(model_app_label, model_name)
357            except LookupError:
358                self.logger.warning(
359                    "App or Model does not exist", app=model_app_label, model=model_name
360                )
361                return False
362            # Validate each single entry
363            serializer = None
364            try:
365                serializer = self._validate_single(entry)
366            except EntryInvalidError as exc:
367                # For deleting objects we don't need the serializer to be valid
368                if entry.get_state(self._import) == BlueprintEntryDesiredState.ABSENT:
369                    serializer = exc.serializer
370                else:
371                    self.logger.warning(f"Entry invalid: {exc}", entry=entry, error=exc)
372                    if raise_errors:
373                        raise exc
374                    return False
375            if not serializer:
376                continue
377
378            state = entry.get_state(self._import)
379            if state in [
380                BlueprintEntryDesiredState.PRESENT,
381                BlueprintEntryDesiredState.CREATED,
382                BlueprintEntryDesiredState.MUST_CREATED,
383            ]:
384                instance = serializer.instance
385                if (
386                    instance
387                    and not instance._state.adding
388                    and state == BlueprintEntryDesiredState.CREATED
389                ):
390                    self.logger.debug(
391                        "Instance exists, skipping",
392                        model=model,
393                        instance=instance,
394                        pk=instance.pk,
395                    )
396                else:
397                    instance = serializer.save()
398                    self.logger.debug("Updated model", model=instance)
399                if "pk" in entry.identifiers:
400                    self.__pk_map[entry.identifiers["pk"]] = instance.pk
401                entry._state = BlueprintEntryState(instance)
402                self._apply_permissions(instance, entry)
403            elif state == BlueprintEntryDesiredState.ABSENT:
404                instance: Model | None = serializer.instance
405                if instance and instance.pk:
406                    instance.delete()
407                    self.logger.debug("Deleted model", mode=instance)
408                    continue
409                self.logger.debug("Entry to delete with no instance, skipping")
410        return True
411
412    def validate(self, raise_validation_errors=False) -> tuple[bool, list[LogEvent]]:
413        """Validate loaded blueprint export, ensure all models are allowed
414        and serializers have no errors"""
415        self.logger.debug("Starting blueprint import validation")
416        orig_import = deepcopy(self._import)
417        if self._import.version != 1:
418            self.logger.warning("Invalid blueprint version")
419            return False, [LogEvent("Invalid blueprint version", log_level="warning", logger=None)]
420        with (
421            transaction_rollback(),
422            capture_logs() as logs,
423        ):
424            successful = self._apply_models(raise_errors=raise_validation_errors)
425            if not successful:
426                self.logger.warning("Blueprint validation failed")
427        self.logger.debug("Finished blueprint import validation")
428        self._import = orig_import
429        return successful, logs

Import Blueprint from raw dict or YAML/JSON

Importer( blueprint: authentik.blueprints.v1.common.Blueprint, context: dict | None = None)
131    def __init__(self, blueprint: Blueprint, context: dict | None = None):
132        self.__pk_map: dict[Any, Model] = {}
133        self._import = blueprint
134        self.logger = get_logger()
135        ctx = self.default_context()
136        always_merger.merge(ctx, self._import.context)
137        if context:
138            always_merger.merge(ctx, context)
139        self._import.context = ctx
logger: structlog.stdlib.BoundLogger
def default_context(self):
141    def default_context(self):
142        """Default context"""
143        context = {
144            "goauthentik.io/rbac/models": rbac_models(),
145            "goauthentik.io/enterprise/licensed": False,
146        }
147        try:
148            from authentik.enterprise.license import LicenseKey
149
150            context["goauthentik.io/enterprise/licensed"] = LicenseKey.get_total().status().is_valid
151        except ModuleNotFoundError:
152            pass
153        return context

Default context

@staticmethod
def from_string( yaml_input: str, context: dict | None = None) -> Importer:
155    @staticmethod
156    def from_string(yaml_input: str, context: dict | None = None) -> Importer:
157        """Parse YAML string and create blueprint importer from it"""
158        import_dict = load(yaml_input, BlueprintLoader)
159        try:
160            _import = from_dict(
161                Blueprint, import_dict, config=Config(cast=[BlueprintEntryDesiredState])
162            )
163        except DaciteError as exc:
164            raise EntryInvalidError from exc
165        return Importer(_import, context)

Parse YAML string and create blueprint importer from it

167    @property
168    def blueprint(self) -> Blueprint:
169        """Get imported blueprint"""
170        return self._import

Get imported blueprint

def apply(self) -> bool:
338    def apply(self) -> bool:
339        """Apply (create/update) models yaml, in database transaction"""
340        try:
341            with atomic():
342                if not self._apply_models():
343                    self.logger.debug("Reverting changes due to error")
344                    raise IntegrityError
345        except IntegrityError:
346            return False
347        self.logger.debug("Committing changes")
348        return True

Apply (create/update) models yaml, in database transaction

def validate( self, raise_validation_errors=False) -> tuple[bool, list[authentik.events.logs.LogEvent]]:
412    def validate(self, raise_validation_errors=False) -> tuple[bool, list[LogEvent]]:
413        """Validate loaded blueprint export, ensure all models are allowed
414        and serializers have no errors"""
415        self.logger.debug("Starting blueprint import validation")
416        orig_import = deepcopy(self._import)
417        if self._import.version != 1:
418            self.logger.warning("Invalid blueprint version")
419            return False, [LogEvent("Invalid blueprint version", log_level="warning", logger=None)]
420        with (
421            transaction_rollback(),
422            capture_logs() as logs,
423        ):
424            successful = self._apply_models(raise_errors=raise_validation_errors)
425            if not successful:
426                self.logger.warning("Blueprint validation failed")
427        self.logger.debug("Finished blueprint import validation")
428        self._import = orig_import
429        return successful, logs

Validate loaded blueprint export, ensure all models are allowed and serializers have no errors