authentik.blueprints.v1.importer

Blueprint importer

  1"""Blueprint importer"""
  2
  3from contextlib import contextmanager
  4from copy import deepcopy
  5from typing import Any
  6
  7from dacite.config import Config
  8from dacite.core import from_dict
  9from dacite.exceptions import DaciteError
 10from deepmerge import always_merger
 11from django.contrib.auth.models import Permission
 12from django.contrib.contenttypes.models import ContentType
 13from django.core.exceptions import FieldError
 14from django.db.models import Model
 15from django.db.models.query_utils import Q
 16from django.db.transaction import atomic
 17from django.db.utils import IntegrityError
 18from guardian.models import RoleObjectPermission
 19from rest_framework.exceptions import ValidationError
 20from rest_framework.serializers import BaseSerializer, Serializer
 21from structlog.stdlib import BoundLogger, get_logger
 22from yaml import load
 23
 24from authentik.blueprints.v1.common import (
 25    Blueprint,
 26    BlueprintEntry,
 27    BlueprintEntryDesiredState,
 28    BlueprintEntryState,
 29    BlueprintLoader,
 30    EntryInvalidError,
 31)
 32from authentik.blueprints.v1.meta.registry import BaseMetaModel, registry
 33from authentik.core.models import (
 34    AuthenticatedSession,
 35    GroupSourceConnection,
 36    PropertyMapping,
 37    Provider,
 38    Session,
 39    Source,
 40    User,
 41    UserSourceConnection,
 42)
 43from authentik.endpoints.models import Connector
 44from authentik.events.logs import LogEvent, capture_logs
 45from authentik.events.utils import cleanse_dict
 46from authentik.flows.models import Stage
 47from authentik.lib.models import InternallyManagedMixin, SerializerModel
 48from authentik.lib.sentry import SentryIgnoredException
 49from authentik.lib.utils.reflection import get_apps
 50from authentik.outposts.models import OutpostServiceConnection
 51from authentik.policies.models import Policy, PolicyBindingModel
 52from authentik.rbac.models import Role
 53
 54# Context set when the serializer is created in a blueprint context
 55# Update website/docs/customize/blueprints/v1/models.md when used
 56SERIALIZER_CONTEXT_BLUEPRINT = "blueprint_entry"
 57
 58
 59def excluded_models() -> list[type[Model]]:
 60    """Return a list of all excluded models that shouldn't be exposed via API
 61    or other means (internal only, base classes, non-used objects, etc)"""
 62
 63    from django.contrib.auth.models import Group as DjangoGroup
 64    from django.contrib.auth.models import User as DjangoUser
 65
 66    return (
 67        # Django only classes
 68        DjangoUser,
 69        DjangoGroup,
 70        ContentType,
 71        Permission,
 72        RoleObjectPermission,
 73        # Base classes
 74        Provider,
 75        Source,
 76        PropertyMapping,
 77        UserSourceConnection,
 78        GroupSourceConnection,
 79        Stage,
 80        OutpostServiceConnection,
 81        Policy,
 82        PolicyBindingModel,
 83        Connector,
 84        # Classes that have other dependencies
 85        Session,
 86        AuthenticatedSession,
 87    )
 88
 89
 90def is_model_allowed(model: type[Model]) -> bool:
 91    """Check if model is allowed"""
 92    return (
 93        model not in excluded_models()
 94        and issubclass(model, SerializerModel | BaseMetaModel)
 95        and not issubclass(model, InternallyManagedMixin)
 96    )
 97
 98
 99class DoRollback(SentryIgnoredException):
100    """Exception to trigger a rollback"""
101
102
103@contextmanager
104def transaction_rollback():
105    """Enters an atomic transaction and always triggers a rollback at the end of the block."""
106    try:
107        with atomic():
108            yield
109            raise DoRollback()
110    except DoRollback:
111        pass
112
113
114def rbac_models() -> dict:
115    models = {}
116    for app in get_apps():
117        for model in app.get_models():
118            if not is_model_allowed(model):
119                continue
120            models[model._meta.model_name] = app.label
121    return models
122
123
124class Importer:
125    """Import Blueprint from raw dict or YAML/JSON"""
126
127    logger: BoundLogger
128    _import: Blueprint
129
130    def __init__(self, blueprint: Blueprint, context: dict | None = None):
131        self.__pk_map: dict[Any, Model] = {}
132        self._import = blueprint
133        self.logger = get_logger()
134        ctx = self.default_context()
135        always_merger.merge(ctx, self._import.context)
136        if context:
137            always_merger.merge(ctx, context)
138        self._import.context = ctx
139
140    def default_context(self):
141        """Default context"""
142        context = {
143            "goauthentik.io/rbac/models": rbac_models(),
144            "goauthentik.io/enterprise/licensed": False,
145        }
146        try:
147            from authentik.enterprise.license import LicenseKey
148
149            context["goauthentik.io/enterprise/licensed"] = (
150                LicenseKey.get_total().status().is_valid,
151            )
152        except ModuleNotFoundError:
153            pass
154        return context
155
156    @staticmethod
157    def from_string(yaml_input: str, context: dict | None = None) -> Importer:
158        """Parse YAML string and create blueprint importer from it"""
159        import_dict = load(yaml_input, BlueprintLoader)
160        try:
161            _import = from_dict(
162                Blueprint, import_dict, config=Config(cast=[BlueprintEntryDesiredState])
163            )
164        except DaciteError as exc:
165            raise EntryInvalidError from exc
166        return Importer(_import, context)
167
168    @property
169    def blueprint(self) -> Blueprint:
170        """Get imported blueprint"""
171        return self._import
172
173    def __update_pks_for_attrs(self, attrs: dict[str, Any]) -> dict[str, Any]:
174        """Replace any value if it is a known primary key of an other object"""
175
176        def updater(value) -> Any:
177            if value in self.__pk_map:
178                self.logger.debug("Updating reference in entry", value=value)
179                return self.__pk_map[value]
180            return value
181
182        for key, value in attrs.items():
183            try:
184                if isinstance(value, dict):
185                    for _, _inner_key in enumerate(value):
186                        value[_inner_key] = updater(value[_inner_key])
187                elif isinstance(value, list):
188                    for idx, _inner_value in enumerate(value):
189                        attrs[key][idx] = updater(_inner_value)
190                else:
191                    attrs[key] = updater(value)
192            except TypeError:
193                continue
194        return attrs
195
196    def __query_from_identifier(self, attrs: dict[str, Any]) -> Q:
197        """Generate an or'd query from all identifiers in an entry"""
198        # Since identifiers can also be pk-references to other objects (see FlowStageBinding)
199        # we have to ensure those references are also replaced
200        main_query = Q()
201        if "pk" in attrs:
202            main_query = Q(pk=attrs["pk"])
203        sub_query = Q()
204        for identifier, value in attrs.items():
205            if identifier == "pk":
206                continue
207            if isinstance(value, dict):
208                sub_query &= Q(**{f"{identifier}__contains": value})
209            else:
210                sub_query &= Q(**{identifier: value})
211
212        return main_query | sub_query
213
214    def _validate_single(self, entry: BlueprintEntry) -> BaseSerializer | None:  # noqa: PLR0915
215        """Validate a single entry"""
216        if not entry.check_all_conditions_match(self._import):
217            self.logger.debug("One or more conditions of this entry are not fulfilled, skipping")
218            return None
219
220        model_app_label, model_name = entry.get_model(self._import).split(".")
221        try:
222            model: type[SerializerModel] = registry.get_model(model_app_label, model_name)
223        except LookupError as exc:
224            raise EntryInvalidError.from_entry(exc, entry) from exc
225        # Don't use isinstance since we don't want to check for inheritance
226        if not is_model_allowed(model):
227            raise EntryInvalidError.from_entry(f"Model {model} not allowed", entry)
228        if issubclass(model, BaseMetaModel):
229            serializer_class: type[Serializer] = model.serializer()
230            serializer = serializer_class(
231                data=entry.get_attrs(self._import),
232                context={
233                    SERIALIZER_CONTEXT_BLUEPRINT: entry,
234                },
235            )
236            try:
237                serializer.is_valid(raise_exception=True)
238            except ValidationError as exc:
239                raise EntryInvalidError.from_entry(
240                    f"Serializer errors {serializer.errors}",
241                    validation_error=exc,
242                    entry=entry,
243                ) from exc
244            return serializer
245
246        # If we try to validate without referencing a possible instance
247        # we'll get a duplicate error, hence we load the model here and return
248        # the full serializer for later usage
249        # Because a model might have multiple unique columns, we chain all identifiers together
250        # to create an OR query.
251        updated_identifiers = self.__update_pks_for_attrs(entry.get_identifiers(self._import))
252        for key, value in list(updated_identifiers.items()):
253            if isinstance(value, dict) and "pk" in value:
254                del updated_identifiers[key]
255                updated_identifiers[f"{key}"] = value["pk"]
256
257        query = self.__query_from_identifier(updated_identifiers)
258        if not query:
259            raise EntryInvalidError.from_entry("No or invalid identifiers", entry)
260
261        try:
262            existing_models = model.objects.filter(query)
263        except FieldError as exc:
264            raise EntryInvalidError.from_entry(f"Invalid identifier field: {exc}", entry) from exc
265
266        serializer_kwargs = {}
267        model_instance = existing_models.first()
268        override_serializer_instance = False
269        if (
270            not isinstance(model(), BaseMetaModel)
271            and model_instance
272            and entry.state != BlueprintEntryDesiredState.MUST_CREATED
273        ):
274            self.logger.debug(
275                "Initialize serializer with instance",
276                model=model,
277                instance=model_instance,
278                pk=model_instance.pk,
279            )
280            serializer_kwargs["instance"] = model_instance
281            serializer_kwargs["partial"] = True
282        elif model_instance and entry.state == BlueprintEntryDesiredState.MUST_CREATED:
283            msg = (
284                f"State is set to {BlueprintEntryDesiredState.MUST_CREATED.value} "
285                "and object exists already",
286            )
287            raise EntryInvalidError.from_entry(
288                ValidationError({k: msg for k in entry.identifiers.keys()}, "unique"),
289                entry,
290            )
291        else:
292            self.logger.debug(
293                "Initialized new serializer instance",
294                model=model,
295                **cleanse_dict(updated_identifiers),
296            )
297            override_serializer_instance = True
298        try:
299            full_data = self.__update_pks_for_attrs(entry.get_attrs(self._import))
300        except ValueError as exc:
301            raise EntryInvalidError.from_entry(exc, entry) from exc
302        always_merger.merge(full_data, updated_identifiers)
303        serializer_kwargs["data"] = full_data
304
305        serializer: Serializer = model().serializer(
306            context={
307                SERIALIZER_CONTEXT_BLUEPRINT: entry,
308            },
309            **serializer_kwargs,
310        )
311        try:
312            serializer.is_valid(raise_exception=True)
313        except ValidationError as exc:
314            raise EntryInvalidError.from_entry(
315                f"Serializer errors {serializer.errors}",
316                validation_error=exc,
317                entry=entry,
318                serializer=serializer,
319            ) from exc
320        if override_serializer_instance:
321            model_instance = model()
322            # pk needs to be set on the model instance otherwise a new one will be generated
323            if "pk" in updated_identifiers:
324                model_instance.pk = updated_identifiers["pk"]
325            serializer.instance = model_instance
326        return serializer
327
328    def _apply_permissions(self, instance: Model, entry: BlueprintEntry):
329        """Apply object-level permissions for an entry"""
330        for perm in entry.get_permissions(self._import):
331            if perm.user is not None:
332                User.objects.get(pk=perm.user).assign_perms_to_managed_role(
333                    perm.permission, instance
334                )
335            if perm.role is not None:
336                role = Role.objects.get(pk=perm.role)
337                role.assign_perms(perm.permission, obj=instance)
338
339    def apply(self) -> bool:
340        """Apply (create/update) models yaml, in database transaction"""
341        try:
342            with atomic():
343                if not self._apply_models():
344                    self.logger.debug("Reverting changes due to error")
345                    raise IntegrityError
346        except IntegrityError:
347            return False
348        self.logger.debug("Committing changes")
349        return True
350
351    def _apply_models(self, raise_errors=False) -> bool:
352        """Apply (create/update) models yaml"""
353        self.__pk_map = {}
354        for entry in self._import.iter_entries():
355            model_app_label, model_name = entry.get_model(self._import).split(".")
356            try:
357                model: type[SerializerModel] = registry.get_model(model_app_label, model_name)
358            except LookupError:
359                self.logger.warning(
360                    "App or Model does not exist", app=model_app_label, model=model_name
361                )
362                return False
363            # Validate each single entry
364            serializer = None
365            try:
366                serializer = self._validate_single(entry)
367            except EntryInvalidError as exc:
368                # For deleting objects we don't need the serializer to be valid
369                if entry.get_state(self._import) == BlueprintEntryDesiredState.ABSENT:
370                    serializer = exc.serializer
371                else:
372                    self.logger.warning(f"Entry invalid: {exc}", entry=entry, error=exc)
373                    if raise_errors:
374                        raise exc
375                    return False
376            if not serializer:
377                continue
378
379            state = entry.get_state(self._import)
380            if state in [
381                BlueprintEntryDesiredState.PRESENT,
382                BlueprintEntryDesiredState.CREATED,
383                BlueprintEntryDesiredState.MUST_CREATED,
384            ]:
385                instance = serializer.instance
386                if (
387                    instance
388                    and not instance._state.adding
389                    and state == BlueprintEntryDesiredState.CREATED
390                ):
391                    self.logger.debug(
392                        "Instance exists, skipping",
393                        model=model,
394                        instance=instance,
395                        pk=instance.pk,
396                    )
397                else:
398                    instance = serializer.save()
399                    self.logger.debug("Updated model", model=instance)
400                if "pk" in entry.identifiers:
401                    self.__pk_map[entry.identifiers["pk"]] = instance.pk
402                entry._state = BlueprintEntryState(instance)
403                self._apply_permissions(instance, entry)
404            elif state == BlueprintEntryDesiredState.ABSENT:
405                instance: Model | None = serializer.instance
406                if instance and instance.pk:
407                    instance.delete()
408                    self.logger.debug("Deleted model", mode=instance)
409                    continue
410                self.logger.debug("Entry to delete with no instance, skipping")
411        return True
412
413    def validate(self, raise_validation_errors=False) -> tuple[bool, list[LogEvent]]:
414        """Validate loaded blueprint export, ensure all models are allowed
415        and serializers have no errors"""
416        self.logger.debug("Starting blueprint import validation")
417        orig_import = deepcopy(self._import)
418        if self._import.version != 1:
419            self.logger.warning("Invalid blueprint version")
420            return False, [LogEvent("Invalid blueprint version", log_level="warning", logger=None)]
421        with (
422            transaction_rollback(),
423            capture_logs() as logs,
424        ):
425            successful = self._apply_models(raise_errors=raise_validation_errors)
426            if not successful:
427                self.logger.warning("Blueprint validation failed")
428        self.logger.debug("Finished blueprint import validation")
429        self._import = orig_import
430        return successful, logs
SERIALIZER_CONTEXT_BLUEPRINT = 'blueprint_entry'
def excluded_models() -> list[type[django.db.models.base.Model]]:
60def excluded_models() -> list[type[Model]]:
61    """Return a list of all excluded models that shouldn't be exposed via API
62    or other means (internal only, base classes, non-used objects, etc)"""
63
64    from django.contrib.auth.models import Group as DjangoGroup
65    from django.contrib.auth.models import User as DjangoUser
66
67    return (
68        # Django only classes
69        DjangoUser,
70        DjangoGroup,
71        ContentType,
72        Permission,
73        RoleObjectPermission,
74        # Base classes
75        Provider,
76        Source,
77        PropertyMapping,
78        UserSourceConnection,
79        GroupSourceConnection,
80        Stage,
81        OutpostServiceConnection,
82        Policy,
83        PolicyBindingModel,
84        Connector,
85        # Classes that have other dependencies
86        Session,
87        AuthenticatedSession,
88    )

Return a list of all excluded models that shouldn't be exposed via API or other means (internal only, base classes, non-used objects, etc)

def is_model_allowed(model: type[django.db.models.base.Model]) -> bool:
91def is_model_allowed(model: type[Model]) -> bool:
92    """Check if model is allowed"""
93    return (
94        model not in excluded_models()
95        and issubclass(model, SerializerModel | BaseMetaModel)
96        and not issubclass(model, InternallyManagedMixin)
97    )

Check if model is allowed

class DoRollback(authentik.lib.sentry.SentryIgnoredException):
100class DoRollback(SentryIgnoredException):
101    """Exception to trigger a rollback"""

Exception to trigger a rollback

@contextmanager
def transaction_rollback():
104@contextmanager
105def transaction_rollback():
106    """Enters an atomic transaction and always triggers a rollback at the end of the block."""
107    try:
108        with atomic():
109            yield
110            raise DoRollback()
111    except DoRollback:
112        pass

Enters an atomic transaction and always triggers a rollback at the end of the block.

def rbac_models() -> dict:
115def rbac_models() -> dict:
116    models = {}
117    for app in get_apps():
118        for model in app.get_models():
119            if not is_model_allowed(model):
120                continue
121            models[model._meta.model_name] = app.label
122    return models
class Importer:
125class Importer:
126    """Import Blueprint from raw dict or YAML/JSON"""
127
128    logger: BoundLogger
129    _import: Blueprint
130
131    def __init__(self, blueprint: Blueprint, context: dict | None = None):
132        self.__pk_map: dict[Any, Model] = {}
133        self._import = blueprint
134        self.logger = get_logger()
135        ctx = self.default_context()
136        always_merger.merge(ctx, self._import.context)
137        if context:
138            always_merger.merge(ctx, context)
139        self._import.context = ctx
140
141    def default_context(self):
142        """Default context"""
143        context = {
144            "goauthentik.io/rbac/models": rbac_models(),
145            "goauthentik.io/enterprise/licensed": False,
146        }
147        try:
148            from authentik.enterprise.license import LicenseKey
149
150            context["goauthentik.io/enterprise/licensed"] = (
151                LicenseKey.get_total().status().is_valid,
152            )
153        except ModuleNotFoundError:
154            pass
155        return context
156
157    @staticmethod
158    def from_string(yaml_input: str, context: dict | None = None) -> Importer:
159        """Parse YAML string and create blueprint importer from it"""
160        import_dict = load(yaml_input, BlueprintLoader)
161        try:
162            _import = from_dict(
163                Blueprint, import_dict, config=Config(cast=[BlueprintEntryDesiredState])
164            )
165        except DaciteError as exc:
166            raise EntryInvalidError from exc
167        return Importer(_import, context)
168
169    @property
170    def blueprint(self) -> Blueprint:
171        """Get imported blueprint"""
172        return self._import
173
174    def __update_pks_for_attrs(self, attrs: dict[str, Any]) -> dict[str, Any]:
175        """Replace any value if it is a known primary key of an other object"""
176
177        def updater(value) -> Any:
178            if value in self.__pk_map:
179                self.logger.debug("Updating reference in entry", value=value)
180                return self.__pk_map[value]
181            return value
182
183        for key, value in attrs.items():
184            try:
185                if isinstance(value, dict):
186                    for _, _inner_key in enumerate(value):
187                        value[_inner_key] = updater(value[_inner_key])
188                elif isinstance(value, list):
189                    for idx, _inner_value in enumerate(value):
190                        attrs[key][idx] = updater(_inner_value)
191                else:
192                    attrs[key] = updater(value)
193            except TypeError:
194                continue
195        return attrs
196
197    def __query_from_identifier(self, attrs: dict[str, Any]) -> Q:
198        """Generate an or'd query from all identifiers in an entry"""
199        # Since identifiers can also be pk-references to other objects (see FlowStageBinding)
200        # we have to ensure those references are also replaced
201        main_query = Q()
202        if "pk" in attrs:
203            main_query = Q(pk=attrs["pk"])
204        sub_query = Q()
205        for identifier, value in attrs.items():
206            if identifier == "pk":
207                continue
208            if isinstance(value, dict):
209                sub_query &= Q(**{f"{identifier}__contains": value})
210            else:
211                sub_query &= Q(**{identifier: value})
212
213        return main_query | sub_query
214
215    def _validate_single(self, entry: BlueprintEntry) -> BaseSerializer | None:  # noqa: PLR0915
216        """Validate a single entry"""
217        if not entry.check_all_conditions_match(self._import):
218            self.logger.debug("One or more conditions of this entry are not fulfilled, skipping")
219            return None
220
221        model_app_label, model_name = entry.get_model(self._import).split(".")
222        try:
223            model: type[SerializerModel] = registry.get_model(model_app_label, model_name)
224        except LookupError as exc:
225            raise EntryInvalidError.from_entry(exc, entry) from exc
226        # Don't use isinstance since we don't want to check for inheritance
227        if not is_model_allowed(model):
228            raise EntryInvalidError.from_entry(f"Model {model} not allowed", entry)
229        if issubclass(model, BaseMetaModel):
230            serializer_class: type[Serializer] = model.serializer()
231            serializer = serializer_class(
232                data=entry.get_attrs(self._import),
233                context={
234                    SERIALIZER_CONTEXT_BLUEPRINT: entry,
235                },
236            )
237            try:
238                serializer.is_valid(raise_exception=True)
239            except ValidationError as exc:
240                raise EntryInvalidError.from_entry(
241                    f"Serializer errors {serializer.errors}",
242                    validation_error=exc,
243                    entry=entry,
244                ) from exc
245            return serializer
246
247        # If we try to validate without referencing a possible instance
248        # we'll get a duplicate error, hence we load the model here and return
249        # the full serializer for later usage
250        # Because a model might have multiple unique columns, we chain all identifiers together
251        # to create an OR query.
252        updated_identifiers = self.__update_pks_for_attrs(entry.get_identifiers(self._import))
253        for key, value in list(updated_identifiers.items()):
254            if isinstance(value, dict) and "pk" in value:
255                del updated_identifiers[key]
256                updated_identifiers[f"{key}"] = value["pk"]
257
258        query = self.__query_from_identifier(updated_identifiers)
259        if not query:
260            raise EntryInvalidError.from_entry("No or invalid identifiers", entry)
261
262        try:
263            existing_models = model.objects.filter(query)
264        except FieldError as exc:
265            raise EntryInvalidError.from_entry(f"Invalid identifier field: {exc}", entry) from exc
266
267        serializer_kwargs = {}
268        model_instance = existing_models.first()
269        override_serializer_instance = False
270        if (
271            not isinstance(model(), BaseMetaModel)
272            and model_instance
273            and entry.state != BlueprintEntryDesiredState.MUST_CREATED
274        ):
275            self.logger.debug(
276                "Initialize serializer with instance",
277                model=model,
278                instance=model_instance,
279                pk=model_instance.pk,
280            )
281            serializer_kwargs["instance"] = model_instance
282            serializer_kwargs["partial"] = True
283        elif model_instance and entry.state == BlueprintEntryDesiredState.MUST_CREATED:
284            msg = (
285                f"State is set to {BlueprintEntryDesiredState.MUST_CREATED.value} "
286                "and object exists already",
287            )
288            raise EntryInvalidError.from_entry(
289                ValidationError({k: msg for k in entry.identifiers.keys()}, "unique"),
290                entry,
291            )
292        else:
293            self.logger.debug(
294                "Initialized new serializer instance",
295                model=model,
296                **cleanse_dict(updated_identifiers),
297            )
298            override_serializer_instance = True
299        try:
300            full_data = self.__update_pks_for_attrs(entry.get_attrs(self._import))
301        except ValueError as exc:
302            raise EntryInvalidError.from_entry(exc, entry) from exc
303        always_merger.merge(full_data, updated_identifiers)
304        serializer_kwargs["data"] = full_data
305
306        serializer: Serializer = model().serializer(
307            context={
308                SERIALIZER_CONTEXT_BLUEPRINT: entry,
309            },
310            **serializer_kwargs,
311        )
312        try:
313            serializer.is_valid(raise_exception=True)
314        except ValidationError as exc:
315            raise EntryInvalidError.from_entry(
316                f"Serializer errors {serializer.errors}",
317                validation_error=exc,
318                entry=entry,
319                serializer=serializer,
320            ) from exc
321        if override_serializer_instance:
322            model_instance = model()
323            # pk needs to be set on the model instance otherwise a new one will be generated
324            if "pk" in updated_identifiers:
325                model_instance.pk = updated_identifiers["pk"]
326            serializer.instance = model_instance
327        return serializer
328
329    def _apply_permissions(self, instance: Model, entry: BlueprintEntry):
330        """Apply object-level permissions for an entry"""
331        for perm in entry.get_permissions(self._import):
332            if perm.user is not None:
333                User.objects.get(pk=perm.user).assign_perms_to_managed_role(
334                    perm.permission, instance
335                )
336            if perm.role is not None:
337                role = Role.objects.get(pk=perm.role)
338                role.assign_perms(perm.permission, obj=instance)
339
340    def apply(self) -> bool:
341        """Apply (create/update) models yaml, in database transaction"""
342        try:
343            with atomic():
344                if not self._apply_models():
345                    self.logger.debug("Reverting changes due to error")
346                    raise IntegrityError
347        except IntegrityError:
348            return False
349        self.logger.debug("Committing changes")
350        return True
351
352    def _apply_models(self, raise_errors=False) -> bool:
353        """Apply (create/update) models yaml"""
354        self.__pk_map = {}
355        for entry in self._import.iter_entries():
356            model_app_label, model_name = entry.get_model(self._import).split(".")
357            try:
358                model: type[SerializerModel] = registry.get_model(model_app_label, model_name)
359            except LookupError:
360                self.logger.warning(
361                    "App or Model does not exist", app=model_app_label, model=model_name
362                )
363                return False
364            # Validate each single entry
365            serializer = None
366            try:
367                serializer = self._validate_single(entry)
368            except EntryInvalidError as exc:
369                # For deleting objects we don't need the serializer to be valid
370                if entry.get_state(self._import) == BlueprintEntryDesiredState.ABSENT:
371                    serializer = exc.serializer
372                else:
373                    self.logger.warning(f"Entry invalid: {exc}", entry=entry, error=exc)
374                    if raise_errors:
375                        raise exc
376                    return False
377            if not serializer:
378                continue
379
380            state = entry.get_state(self._import)
381            if state in [
382                BlueprintEntryDesiredState.PRESENT,
383                BlueprintEntryDesiredState.CREATED,
384                BlueprintEntryDesiredState.MUST_CREATED,
385            ]:
386                instance = serializer.instance
387                if (
388                    instance
389                    and not instance._state.adding
390                    and state == BlueprintEntryDesiredState.CREATED
391                ):
392                    self.logger.debug(
393                        "Instance exists, skipping",
394                        model=model,
395                        instance=instance,
396                        pk=instance.pk,
397                    )
398                else:
399                    instance = serializer.save()
400                    self.logger.debug("Updated model", model=instance)
401                if "pk" in entry.identifiers:
402                    self.__pk_map[entry.identifiers["pk"]] = instance.pk
403                entry._state = BlueprintEntryState(instance)
404                self._apply_permissions(instance, entry)
405            elif state == BlueprintEntryDesiredState.ABSENT:
406                instance: Model | None = serializer.instance
407                if instance and instance.pk:
408                    instance.delete()
409                    self.logger.debug("Deleted model", mode=instance)
410                    continue
411                self.logger.debug("Entry to delete with no instance, skipping")
412        return True
413
414    def validate(self, raise_validation_errors=False) -> tuple[bool, list[LogEvent]]:
415        """Validate loaded blueprint export, ensure all models are allowed
416        and serializers have no errors"""
417        self.logger.debug("Starting blueprint import validation")
418        orig_import = deepcopy(self._import)
419        if self._import.version != 1:
420            self.logger.warning("Invalid blueprint version")
421            return False, [LogEvent("Invalid blueprint version", log_level="warning", logger=None)]
422        with (
423            transaction_rollback(),
424            capture_logs() as logs,
425        ):
426            successful = self._apply_models(raise_errors=raise_validation_errors)
427            if not successful:
428                self.logger.warning("Blueprint validation failed")
429        self.logger.debug("Finished blueprint import validation")
430        self._import = orig_import
431        return successful, logs

Import Blueprint from raw dict or YAML/JSON

Importer( blueprint: authentik.blueprints.v1.common.Blueprint, context: dict | None = None)
131    def __init__(self, blueprint: Blueprint, context: dict | None = None):
132        self.__pk_map: dict[Any, Model] = {}
133        self._import = blueprint
134        self.logger = get_logger()
135        ctx = self.default_context()
136        always_merger.merge(ctx, self._import.context)
137        if context:
138            always_merger.merge(ctx, context)
139        self._import.context = ctx
logger: structlog.stdlib.BoundLogger
def default_context(self):
141    def default_context(self):
142        """Default context"""
143        context = {
144            "goauthentik.io/rbac/models": rbac_models(),
145            "goauthentik.io/enterprise/licensed": False,
146        }
147        try:
148            from authentik.enterprise.license import LicenseKey
149
150            context["goauthentik.io/enterprise/licensed"] = (
151                LicenseKey.get_total().status().is_valid,
152            )
153        except ModuleNotFoundError:
154            pass
155        return context

Default context

@staticmethod
def from_string( yaml_input: str, context: dict | None = None) -> Importer:
157    @staticmethod
158    def from_string(yaml_input: str, context: dict | None = None) -> Importer:
159        """Parse YAML string and create blueprint importer from it"""
160        import_dict = load(yaml_input, BlueprintLoader)
161        try:
162            _import = from_dict(
163                Blueprint, import_dict, config=Config(cast=[BlueprintEntryDesiredState])
164            )
165        except DaciteError as exc:
166            raise EntryInvalidError from exc
167        return Importer(_import, context)

Parse YAML string and create blueprint importer from it

169    @property
170    def blueprint(self) -> Blueprint:
171        """Get imported blueprint"""
172        return self._import

Get imported blueprint

def apply(self) -> bool:
340    def apply(self) -> bool:
341        """Apply (create/update) models yaml, in database transaction"""
342        try:
343            with atomic():
344                if not self._apply_models():
345                    self.logger.debug("Reverting changes due to error")
346                    raise IntegrityError
347        except IntegrityError:
348            return False
349        self.logger.debug("Committing changes")
350        return True

Apply (create/update) models yaml, in database transaction

def validate( self, raise_validation_errors=False) -> tuple[bool, list[authentik.events.logs.LogEvent]]:
414    def validate(self, raise_validation_errors=False) -> tuple[bool, list[LogEvent]]:
415        """Validate loaded blueprint export, ensure all models are allowed
416        and serializers have no errors"""
417        self.logger.debug("Starting blueprint import validation")
418        orig_import = deepcopy(self._import)
419        if self._import.version != 1:
420            self.logger.warning("Invalid blueprint version")
421            return False, [LogEvent("Invalid blueprint version", log_level="warning", logger=None)]
422        with (
423            transaction_rollback(),
424            capture_logs() as logs,
425        ):
426            successful = self._apply_models(raise_errors=raise_validation_errors)
427            if not successful:
428                self.logger.warning("Blueprint validation failed")
429        self.logger.debug("Finished blueprint import validation")
430        self._import = orig_import
431        return successful, logs

Validate loaded blueprint export, ensure all models are allowed and serializers have no errors