authentik.blueprints.v1.common

transfer common classes

  1"""transfer common classes"""
  2
  3from collections import OrderedDict
  4from collections.abc import Generator, Iterable, Mapping
  5from copy import copy
  6from dataclasses import asdict, dataclass, field, is_dataclass
  7from enum import Enum
  8from functools import reduce
  9from json import JSONDecodeError, loads
 10from operator import ixor
 11from os import getenv
 12from typing import Any, Literal
 13from uuid import UUID
 14
 15from deepmerge import always_merger
 16from django.apps import apps
 17from django.db.models import Model, Q
 18from rest_framework.exceptions import ValidationError
 19from rest_framework.fields import Field
 20from rest_framework.serializers import Serializer
 21from structlog.stdlib import get_logger
 22from yaml import SafeDumper, SafeLoader, ScalarNode, SequenceNode
 23
 24from authentik.lib.models import SerializerModel
 25from authentik.lib.sentry import SentryIgnoredException
 26from authentik.policies.models import PolicyBindingModel
 27
 28LOGGER = get_logger()
 29
 30
 31class UNSET:
 32    """Used to test whether a key has not been set."""
 33
 34
 35def get_attrs(obj: SerializerModel) -> dict[str, Any]:
 36    """Get object's attributes via their serializer, and convert it to a normal dict"""
 37    serializer: Serializer = obj.serializer(obj)
 38    data = dict(serializer.data)
 39
 40    for field_name, _field in serializer.fields.items():
 41        _field: Field
 42        if field_name not in data:
 43            continue
 44        if _field.read_only:
 45            data.pop(field_name, None)
 46        if field_name.endswith("_set"):
 47            data.pop(field_name, None)
 48    return data
 49
 50
 51@dataclass
 52class BlueprintEntryState:
 53    """State of a single instance"""
 54
 55    instance: Model | None = None
 56
 57
 58class BlueprintEntryDesiredState(Enum):
 59    """State an entry should be reconciled to"""
 60
 61    ABSENT = "absent"
 62    PRESENT = "present"
 63    CREATED = "created"
 64    MUST_CREATED = "must_created"
 65
 66
 67@dataclass
 68class BlueprintEntryPermission:
 69    """Describe object-level permissions"""
 70
 71    permission: str | YAMLTag
 72    user: int | YAMLTag | None = field(default=None)
 73    role: str | YAMLTag | None = field(default=None)
 74
 75
 76@dataclass
 77class BlueprintEntry:
 78    """Single entry of a blueprint"""
 79
 80    model: str | YAMLTag
 81    state: BlueprintEntryDesiredState | YAMLTag = field(default=BlueprintEntryDesiredState.PRESENT)
 82    conditions: list[Any] = field(default_factory=list)
 83    identifiers: dict[str, Any] = field(default_factory=dict)
 84    attrs: dict[str, Any] | None = field(default_factory=dict)
 85    permissions: list[BlueprintEntryPermission] = field(default_factory=list)
 86
 87    id: str | None = None
 88
 89    _state: BlueprintEntryState = field(default_factory=BlueprintEntryState)
 90
 91    def __post_init__(self, *args, **kwargs) -> None:
 92        self.__tag_contexts: list[YAMLTagContext] = []
 93
 94    @staticmethod
 95    def from_model(model: SerializerModel, *extra_identifier_names: str) -> BlueprintEntry:
 96        """Convert a SerializerModel instance to a blueprint Entry"""
 97        identifiers = {
 98            "pk": model.pk,
 99        }
100        all_attrs = get_attrs(model)
101
102        for extra_identifier_name in extra_identifier_names:
103            identifiers[extra_identifier_name] = all_attrs.pop(extra_identifier_name, None)
104        return BlueprintEntry(
105            identifiers=identifiers,
106            model=f"{model._meta.app_label}.{model._meta.model_name}",
107            attrs=all_attrs,
108        )
109
110    def get_tag_context(
111        self,
112        depth: int = 0,
113        context_tag_type: type[YAMLTagContext] | tuple[YAMLTagContext, ...] | None = None,
114    ) -> YAMLTagContext:
115        """Get a YAMLTagContext object located at a certain depth in the tag tree"""
116        if depth < 0:
117            raise ValueError("depth must be a positive number or zero")
118
119        if context_tag_type:
120            contexts = [x for x in self.__tag_contexts if isinstance(x, context_tag_type)]
121        else:
122            contexts = self.__tag_contexts
123
124        try:
125            return contexts[-(depth + 1)]
126        except IndexError as exc:
127            raise ValueError(f"invalid depth: {depth}. Max depth: {len(contexts) - 1}") from exc
128
129    def tag_resolver(self, value: Any, blueprint: Blueprint) -> Any:
130        """Check if we have any special tags that need handling"""
131        val = copy(value)
132
133        if isinstance(value, YAMLTagContext):
134            self.__tag_contexts.append(value)
135
136        if isinstance(value, YAMLTag):
137            val = value.resolve(self, blueprint)
138
139        if isinstance(value, dict):
140            for key, inner_value in value.items():
141                val[key] = self.tag_resolver(inner_value, blueprint)
142        if isinstance(value, list):
143            for idx, inner_value in enumerate(value):
144                val[idx] = self.tag_resolver(inner_value, blueprint)
145
146        if isinstance(value, YAMLTagContext):
147            self.__tag_contexts.pop()
148
149        return val
150
151    def get_attrs(self, blueprint: Blueprint) -> dict[str, Any]:
152        """Get attributes of this entry, with all yaml tags resolved"""
153        return self.tag_resolver(self.attrs, blueprint)
154
155    def get_identifiers(self, blueprint: Blueprint) -> dict[str, Any]:
156        """Get attributes of this entry, with all yaml tags resolved"""
157        return self.tag_resolver(self.identifiers, blueprint)
158
159    def get_state(self, blueprint: Blueprint) -> BlueprintEntryDesiredState:
160        """Get the blueprint state, with yaml tags resolved if present"""
161        return BlueprintEntryDesiredState(self.tag_resolver(self.state, blueprint))
162
163    def get_model(self, blueprint: Blueprint) -> str:
164        """Get the blueprint model, with yaml tags resolved if present"""
165        return str(self.tag_resolver(self.model, blueprint))
166
167    def get_permissions(self, blueprint: Blueprint) -> Generator[BlueprintEntryPermission]:
168        """Get permissions of this entry, with all yaml tags resolved"""
169        for perm in self.permissions:
170            yield BlueprintEntryPermission(
171                permission=self.tag_resolver(perm.permission, blueprint),
172                user=self.tag_resolver(perm.user, blueprint),
173                role=self.tag_resolver(perm.role, blueprint),
174            )
175
176    def check_all_conditions_match(self, blueprint: Blueprint) -> bool:
177        """Check all conditions of this entry match (evaluate to True)"""
178        return all(self.tag_resolver(self.conditions, blueprint))
179
180
181@dataclass
182class BlueprintMetadata:
183    """Optional blueprint metadata"""
184
185    name: str
186    labels: dict[str, str] = field(default_factory=dict)
187
188
189@dataclass
190class Blueprint:
191    """Dataclass used for a full export"""
192
193    version: int = field(default=1)
194    entries: list[BlueprintEntry] | dict[str, list[BlueprintEntry]] = field(default_factory=list)
195    context: dict = field(default_factory=dict)
196
197    metadata: BlueprintMetadata | None = field(default=None)
198
199    def iter_entries(self) -> Iterable[BlueprintEntry]:
200        if isinstance(self.entries, dict):
201            for _section, entries in self.entries.items():
202                yield from entries
203        else:
204            yield from self.entries
205
206
207class YAMLTag:
208    """Base class for all YAML Tags"""
209
210    def __repr__(self) -> str:
211        return str(self.resolve(BlueprintEntry(""), Blueprint()))
212
213    def resolve(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any:
214        """Implement yaml tag logic"""
215        raise NotImplementedError
216
217
218class YAMLTagContext:
219    """Base class for all YAML Tag Contexts"""
220
221    def get_context(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any:
222        """Implement yaml tag context logic"""
223        raise NotImplementedError
224
225
226class KeyOf(YAMLTag):
227    """Reference another object by their ID"""
228
229    id_from: str
230
231    def __init__(self, loader: BlueprintLoader, node: ScalarNode) -> None:
232        super().__init__()
233        self.id_from = node.value
234
235    def resolve(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any:
236        for _entry in blueprint.iter_entries():
237            if _entry.id == self.id_from and _entry._state.instance:
238                # Special handling for PolicyBindingModels, as they'll have a different PK
239                # which is used when creating policy bindings
240                if (
241                    isinstance(_entry._state.instance, PolicyBindingModel)
242                    and entry.model.lower() == "authentik_policies.policybinding"
243                ):
244                    return _entry._state.instance.pbm_uuid
245                return _entry._state.instance.pk
246        raise EntryInvalidError.from_entry(
247            f"KeyOf: failed to find entry with `id` of `{self.id_from}` and a model instance", entry
248        )
249
250
251class Env(YAMLTag):
252    """Lookup environment variable with optional default"""
253
254    key: str
255    default: Any | None
256
257    def __init__(self, loader: BlueprintLoader, node: ScalarNode | SequenceNode) -> None:
258        super().__init__()
259        self.default = None
260        if isinstance(node, ScalarNode):
261            self.key = node.value
262        if isinstance(node, SequenceNode):
263            self.key = loader.construct_object(node.value[0])
264            self.default = loader.construct_object(node.value[1])
265
266    def resolve(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any:
267        return getenv(self.key) or self.default
268
269
270class File(YAMLTag):
271    """Lookup file with optional default"""
272
273    path: str
274    default: Any | None
275
276    def __init__(self, loader: BlueprintLoader, node: ScalarNode | SequenceNode) -> None:
277        super().__init__()
278        self.default = None
279        if isinstance(node, ScalarNode):
280            self.path = node.value
281        if isinstance(node, SequenceNode):
282            self.path = loader.construct_object(node.value[0])
283            self.default = loader.construct_object(node.value[1])
284
285    def resolve(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any:
286        try:
287            with open(self.path, encoding="utf8") as _file:
288                return _file.read().strip()
289        except OSError as exc:
290            LOGGER.warning(
291                "Failed to read file. Falling back to default value",
292                path=self.path,
293                exc=exc,
294            )
295            return self.default
296
297
298class Context(YAMLTag):
299    """Lookup key from instance context"""
300
301    key: str
302    default: Any | None
303
304    def __init__(self, loader: BlueprintLoader, node: ScalarNode | SequenceNode) -> None:
305        super().__init__()
306        self.default = None
307        if isinstance(node, ScalarNode):
308            self.key = node.value
309        if isinstance(node, SequenceNode):
310            self.key = loader.construct_object(node.value[0])
311            self.default = loader.construct_object(node.value[1])
312
313    def resolve(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any:
314        value = self.default
315        if self.key in blueprint.context:
316            value = blueprint.context[self.key]
317        if isinstance(value, YAMLTag):
318            return value.resolve(entry, blueprint)
319        return value
320
321
322class ParseJSON(YAMLTag):
323    """Parse JSON from context/env/etc value"""
324
325    raw: str
326
327    def __init__(self, loader: BlueprintLoader, node: ScalarNode) -> None:
328        super().__init__()
329        self.raw = node.value
330
331    def resolve(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any:
332        try:
333            return loads(self.raw)
334        except JSONDecodeError as exc:
335            raise EntryInvalidError.from_entry(exc, entry) from exc
336
337
338class Format(YAMLTag):
339    """Format a string"""
340
341    format_string: str
342    args: list[Any]
343
344    def __init__(self, loader: BlueprintLoader, node: SequenceNode) -> None:
345        super().__init__()
346        self.format_string = loader.construct_object(node.value[0])
347        self.args = []
348        for raw_node in node.value[1:]:
349            self.args.append(loader.construct_object(raw_node))
350
351    def resolve(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any:
352        args = []
353        for arg in self.args:
354            if isinstance(arg, YAMLTag):
355                args.append(arg.resolve(entry, blueprint))
356            else:
357                args.append(arg)
358
359        try:
360            return self.format_string % tuple(args)
361        except TypeError as exc:
362            raise EntryInvalidError.from_entry(exc, entry) from exc
363
364
365class Find(YAMLTag):
366    """Find any object primary key"""
367
368    model_name: str | YAMLTag
369    conditions: list[list]
370
371    def __init__(self, loader: BlueprintLoader, node: SequenceNode) -> None:
372        super().__init__()
373        self.model_name = loader.construct_object(node.value[0])
374        self.conditions = []
375        for raw_node in node.value[1:]:
376            values = []
377            for node_values in raw_node.value:
378                values.append(loader.construct_object(node_values))
379            self.conditions.append(values)
380
381    def _get_instance(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any:
382        if isinstance(self.model_name, YAMLTag):
383            model_name = self.model_name.resolve(entry, blueprint)
384        else:
385            model_name = self.model_name
386
387        try:
388            model_class = apps.get_model(*model_name.split("."))
389        except LookupError as exc:
390            raise EntryInvalidError.from_entry(exc, entry) from exc
391
392        query = Q()
393        for cond in self.conditions:
394            if isinstance(cond[0], YAMLTag):
395                query_key = cond[0].resolve(entry, blueprint)
396            else:
397                query_key = cond[0]
398            if isinstance(cond[1], YAMLTag):
399                query_value = cond[1].resolve(entry, blueprint)
400            else:
401                query_value = cond[1]
402            query &= Q(**{query_key: query_value})
403        return model_class.objects.filter(query).first()
404
405    def resolve(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any:
406        instance = self._get_instance(entry, blueprint)
407        if instance:
408            return instance.pk
409        return None
410
411
412class FindObject(Find):
413    """Find any object"""
414
415    def resolve(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any:
416        instance = self._get_instance(entry, blueprint)
417        if not instance:
418            return None
419        if not isinstance(instance, SerializerModel):
420            raise EntryInvalidError.from_entry(
421                f"Model {self.model_name} is not resolvable through FindObject", entry
422            )
423        return instance.serializer(instance=instance).data
424
425
426class Condition(YAMLTag):
427    """Convert all values to a single boolean"""
428
429    mode: Literal["AND", "NAND", "OR", "NOR", "XOR", "XNOR"]
430    args: list[Any]
431
432    _COMPARATORS = {
433        # Using all and any here instead of from operator import iand, ior
434        # to improve performance
435        "AND": all,
436        "NAND": lambda args: not all(args),
437        "OR": any,
438        "NOR": lambda args: not any(args),
439        "XOR": lambda args: reduce(ixor, args) if len(args) > 1 else args[0],
440        "XNOR": lambda args: not (reduce(ixor, args) if len(args) > 1 else args[0]),
441    }
442
443    def __init__(self, loader: BlueprintLoader, node: SequenceNode) -> None:
444        super().__init__()
445        self.mode = loader.construct_object(node.value[0])
446        self.args = []
447        for raw_node in node.value[1:]:
448            self.args.append(loader.construct_object(raw_node))
449
450    def resolve(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any:
451        args = []
452        for arg in self.args:
453            if isinstance(arg, YAMLTag):
454                args.append(arg.resolve(entry, blueprint))
455            else:
456                args.append(arg)
457
458        if not args:
459            raise EntryInvalidError.from_entry(
460                "At least one value is required after mode selection.", entry
461            )
462
463        try:
464            comparator = self._COMPARATORS[self.mode.upper()]
465            return comparator(tuple(bool(x) for x in args))
466        except (TypeError, KeyError) as exc:
467            raise EntryInvalidError.from_entry(exc, entry) from exc
468
469
470class If(YAMLTag):
471    """Select YAML to use based on condition"""
472
473    condition: Any
474    when_true: Any
475    when_false: Any
476
477    def __init__(self, loader: BlueprintLoader, node: SequenceNode) -> None:
478        super().__init__()
479        self.condition = loader.construct_object(node.value[0])
480        if len(node.value) == 1:
481            self.when_true = True
482            self.when_false = False
483        else:
484            self.when_true = loader.construct_object(node.value[1])
485            self.when_false = loader.construct_object(node.value[2])
486
487    def resolve(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any:
488        if isinstance(self.condition, YAMLTag):
489            condition = self.condition.resolve(entry, blueprint)
490        else:
491            condition = self.condition
492
493        try:
494            return entry.tag_resolver(
495                self.when_true if condition else self.when_false,
496                blueprint,
497            )
498        except TypeError as exc:
499            raise EntryInvalidError.from_entry(exc, entry) from exc
500
501
502class Enumerate(YAMLTag, YAMLTagContext):
503    """Iterate over an iterable."""
504
505    iterable: YAMLTag | Iterable
506    item_body: Any
507    output_body: Literal["SEQ", "MAP"]
508
509    _OUTPUT_BODIES = {
510        "SEQ": (list, lambda a, b: [*a, b]),
511        "MAP": (
512            dict,
513            lambda a, b: always_merger.merge(a, {b[0]: b[1]} if isinstance(b, tuple | list) else b),
514        ),
515    }
516
517    def __init__(self, loader: BlueprintLoader, node: SequenceNode) -> None:
518        super().__init__()
519        self.iterable = loader.construct_object(node.value[0])
520        self.output_body = loader.construct_object(node.value[1])
521        self.item_body = loader.construct_object(node.value[2])
522        self.__current_context: tuple[Any, Any] = tuple()
523
524    def get_context(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any:
525        return self.__current_context
526
527    def resolve(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any:
528        if isinstance(self.iterable, EnumeratedItem) and self.iterable.depth == 0:
529            raise EntryInvalidError.from_entry(
530                f"{self.__class__.__name__} tag's iterable references this tag's context. "
531                "This is a noop. Check you are setting depth bigger than 0.",
532                entry,
533            )
534
535        if isinstance(self.iterable, YAMLTag):
536            iterable = self.iterable.resolve(entry, blueprint)
537        else:
538            iterable = self.iterable
539
540        if not isinstance(iterable, Iterable):
541            raise EntryInvalidError.from_entry(
542                f"{self.__class__.__name__}'s iterable must be an iterable "
543                "such as a sequence or a mapping",
544                entry,
545            )
546
547        if isinstance(iterable, Mapping):
548            iterable = tuple(iterable.items())
549        else:
550            iterable = tuple(enumerate(iterable))
551
552        try:
553            output_class, add_fn = self._OUTPUT_BODIES[self.output_body.upper()]
554        except KeyError as exc:
555            raise EntryInvalidError.from_entry(exc, entry) from exc
556
557        result = output_class()
558
559        self.__current_context = tuple()
560
561        try:
562            for item in iterable:
563                self.__current_context = item
564                resolved_body = entry.tag_resolver(self.item_body, blueprint)
565                result = add_fn(result, resolved_body)
566                if not isinstance(result, output_class):
567                    raise EntryInvalidError.from_entry(
568                        f"Invalid {self.__class__.__name__} item found: {resolved_body}", entry
569                    )
570        finally:
571            self.__current_context = tuple()
572
573        return result
574
575
576class EnumeratedItem(YAMLTag):
577    """Get the current item value and index provided by an Enumerate tag context"""
578
579    depth: int
580
581    _SUPPORTED_CONTEXT_TAGS = (Enumerate,)
582
583    def __init__(self, _loader: BlueprintLoader, node: ScalarNode) -> None:
584        super().__init__()
585        self.depth = int(node.value)
586
587    def resolve(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any:
588        try:
589            context_tag: Enumerate = entry.get_tag_context(
590                depth=self.depth,
591                context_tag_type=EnumeratedItem._SUPPORTED_CONTEXT_TAGS,
592            )
593        except ValueError as exc:
594            if self.depth == 0:
595                raise EntryInvalidError.from_entry(
596                    f"{self.__class__.__name__} tags are only usable "
597                    f"inside an {Enumerate.__name__} tag",
598                    entry,
599                ) from exc
600
601            raise EntryInvalidError.from_entry(
602                f"{self.__class__.__name__} tag: {exc}", entry
603            ) from exc
604
605        return context_tag.get_context(entry, blueprint)
606
607
608class Index(EnumeratedItem):
609    """Get the current item index provided by an Enumerate tag context"""
610
611    def resolve(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any:
612        context = super().resolve(entry, blueprint)
613
614        try:
615            return context[0]
616        except IndexError as exc:  # pragma: no cover
617            raise EntryInvalidError.from_entry(f"Empty/invalid context: {context}", entry) from exc
618
619
620class Value(EnumeratedItem):
621    """Get the current item value provided by an Enumerate tag context"""
622
623    def resolve(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any:
624        context = super().resolve(entry, blueprint)
625
626        try:
627            return context[1]
628        except IndexError as exc:  # pragma: no cover
629            raise EntryInvalidError.from_entry(f"Empty/invalid context: {context}", entry) from exc
630
631
632class AtIndex(YAMLTag):
633    """Get value at index of a sequence or mapping"""
634
635    obj: YAMLTag | dict | list | tuple
636    attribute: int | str | YAMLTag
637    default: Any | UNSET
638
639    def __init__(self, loader: BlueprintLoader, node: SequenceNode) -> None:
640        super().__init__()
641        self.obj = loader.construct_object(node.value[0])
642        self.attribute = loader.construct_object(node.value[1])
643        if len(node.value) == 2:  # noqa: PLR2004
644            self.default = UNSET
645        else:
646            self.default = loader.construct_object(node.value[2])
647
648    def resolve(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any:
649        if isinstance(self.obj, YAMLTag):
650            obj = self.obj.resolve(entry, blueprint)
651        else:
652            obj = self.obj
653        if isinstance(self.attribute, YAMLTag):
654            attribute = self.attribute.resolve(entry, blueprint)
655        else:
656            attribute = self.attribute
657
658        if isinstance(obj, list | tuple):
659            try:
660                return obj[attribute]
661            except TypeError as exc:
662                raise EntryInvalidError.from_entry(
663                    f"Invalid index for list: {attribute}", entry
664                ) from exc
665            except IndexError as exc:
666                if self.default is UNSET:
667                    raise EntryInvalidError.from_entry(
668                        f"Index out of range: {attribute}", entry
669                    ) from exc
670                return self.default
671        if attribute in obj:
672            return obj[attribute]
673        else:
674            if self.default is UNSET:
675                raise EntryInvalidError.from_entry(f"Key does not exist: {attribute}", entry)
676            return self.default
677
678
679class BlueprintDumper(SafeDumper):
680    """Dump dataclasses to yaml"""
681
682    default_flow_style = False
683
684    def __init__(self, *args, **kwargs):
685        super().__init__(*args, **kwargs)
686        self.add_representer(UUID, lambda self, data: self.represent_str(str(data)))
687        self.add_representer(OrderedDict, lambda self, data: self.represent_dict(dict(data)))
688        self.add_representer(Enum, lambda self, data: self.represent_str(data.value))
689        self.add_representer(
690            BlueprintEntryDesiredState, lambda self, data: self.represent_str(data.value)
691        )
692        self.add_representer(None, lambda self, data: self.represent_str(str(data)))
693
694    def ignore_aliases(self, data):
695        """Don't use any YAML anchors"""
696        return True
697
698    def represent(self, data) -> None:
699        if is_dataclass(data):
700
701            def factory(items):
702                final_dict = dict(items)
703                # Remove internal state variables
704                final_dict.pop("_state", None)
705                # Future-proof to only remove the ID if we don't set a value
706                if "id" in final_dict and final_dict.get("id") is None:
707                    final_dict.pop("id")
708                return final_dict
709
710            data = asdict(data, dict_factory=factory)
711        return super().represent(data)
712
713
714class BlueprintLoader(SafeLoader):
715    """Loader for blueprints with custom tag support"""
716
717    def __init__(self, *args, **kwargs):
718        super().__init__(*args, **kwargs)
719        self.add_constructor("!KeyOf", KeyOf)
720        self.add_constructor("!Find", Find)
721        self.add_constructor("!FindObject", FindObject)
722        self.add_constructor("!Context", Context)
723        self.add_constructor("!Format", Format)
724        self.add_constructor("!Condition", Condition)
725        self.add_constructor("!If", If)
726        self.add_constructor("!Env", Env)
727        self.add_constructor("!File", File)
728        self.add_constructor("!Enumerate", Enumerate)
729        self.add_constructor("!Value", Value)
730        self.add_constructor("!Index", Index)
731        self.add_constructor("!AtIndex", AtIndex)
732        self.add_constructor("!ParseJSON", ParseJSON)
733
734
735class EntryInvalidError(SentryIgnoredException):
736    """Error raised when an entry is invalid"""
737
738    entry_model: str | None
739    entry_id: str | None
740    validation_error: ValidationError | None
741    serializer: Serializer | None = None
742
743    def __init__(
744        self, *args: object, validation_error: ValidationError | None = None, **kwargs
745    ) -> None:
746        super().__init__(*args)
747        self.entry_model = None
748        self.entry_id = None
749        self.validation_error = validation_error
750        for key, value in kwargs.items():
751            setattr(self, key, value)
752
753    @staticmethod
754    def from_entry(
755        msg_or_exc: str | Exception, entry: BlueprintEntry, *args, **kwargs
756    ) -> EntryInvalidError:
757        """Create EntryInvalidError with the context of an entry"""
758        error = EntryInvalidError(msg_or_exc, *args, **kwargs)
759        if isinstance(msg_or_exc, ValidationError):
760            error.validation_error = msg_or_exc
761        # Make sure the model and id are strings, depending where the error happens
762        # they might still be YAMLTag instances
763        error.entry_model = str(entry.model)
764        error.entry_id = str(entry.id)
765        return error
LOGGER = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
class UNSET:
32class UNSET:
33    """Used to test whether a key has not been set."""

Used to test whether a key has not been set.

def get_attrs(obj: authentik.lib.models.SerializerModel) -> dict[str, typing.Any]:
36def get_attrs(obj: SerializerModel) -> dict[str, Any]:
37    """Get object's attributes via their serializer, and convert it to a normal dict"""
38    serializer: Serializer = obj.serializer(obj)
39    data = dict(serializer.data)
40
41    for field_name, _field in serializer.fields.items():
42        _field: Field
43        if field_name not in data:
44            continue
45        if _field.read_only:
46            data.pop(field_name, None)
47        if field_name.endswith("_set"):
48            data.pop(field_name, None)
49    return data

Get object's attributes via their serializer, and convert it to a normal dict

@dataclass
class BlueprintEntryState:
52@dataclass
53class BlueprintEntryState:
54    """State of a single instance"""
55
56    instance: Model | None = None

State of a single instance

BlueprintEntryState(instance: django.db.models.base.Model | None = None)
instance: django.db.models.base.Model | None = None
class BlueprintEntryDesiredState(enum.Enum):
59class BlueprintEntryDesiredState(Enum):
60    """State an entry should be reconciled to"""
61
62    ABSENT = "absent"
63    PRESENT = "present"
64    CREATED = "created"
65    MUST_CREATED = "must_created"

State an entry should be reconciled to

ABSENT = <BlueprintEntryDesiredState.ABSENT: 'absent'>
PRESENT = <BlueprintEntryDesiredState.PRESENT: 'present'>
CREATED = <BlueprintEntryDesiredState.CREATED: 'created'>
MUST_CREATED = <BlueprintEntryDesiredState.MUST_CREATED: 'must_created'>
@dataclass
class BlueprintEntryPermission:
68@dataclass
69class BlueprintEntryPermission:
70    """Describe object-level permissions"""
71
72    permission: str | YAMLTag
73    user: int | YAMLTag | None = field(default=None)
74    role: str | YAMLTag | None = field(default=None)

Describe object-level permissions

BlueprintEntryPermission( permission: str | YAMLTag, user: int | YAMLTag | None = None, role: str | YAMLTag | None = None)
permission: str | YAMLTag
user: int | YAMLTag | None = None
role: str | YAMLTag | None = None
@dataclass
class BlueprintEntry:
 77@dataclass
 78class BlueprintEntry:
 79    """Single entry of a blueprint"""
 80
 81    model: str | YAMLTag
 82    state: BlueprintEntryDesiredState | YAMLTag = field(default=BlueprintEntryDesiredState.PRESENT)
 83    conditions: list[Any] = field(default_factory=list)
 84    identifiers: dict[str, Any] = field(default_factory=dict)
 85    attrs: dict[str, Any] | None = field(default_factory=dict)
 86    permissions: list[BlueprintEntryPermission] = field(default_factory=list)
 87
 88    id: str | None = None
 89
 90    _state: BlueprintEntryState = field(default_factory=BlueprintEntryState)
 91
 92    def __post_init__(self, *args, **kwargs) -> None:
 93        self.__tag_contexts: list[YAMLTagContext] = []
 94
 95    @staticmethod
 96    def from_model(model: SerializerModel, *extra_identifier_names: str) -> BlueprintEntry:
 97        """Convert a SerializerModel instance to a blueprint Entry"""
 98        identifiers = {
 99            "pk": model.pk,
100        }
101        all_attrs = get_attrs(model)
102
103        for extra_identifier_name in extra_identifier_names:
104            identifiers[extra_identifier_name] = all_attrs.pop(extra_identifier_name, None)
105        return BlueprintEntry(
106            identifiers=identifiers,
107            model=f"{model._meta.app_label}.{model._meta.model_name}",
108            attrs=all_attrs,
109        )
110
111    def get_tag_context(
112        self,
113        depth: int = 0,
114        context_tag_type: type[YAMLTagContext] | tuple[YAMLTagContext, ...] | None = None,
115    ) -> YAMLTagContext:
116        """Get a YAMLTagContext object located at a certain depth in the tag tree"""
117        if depth < 0:
118            raise ValueError("depth must be a positive number or zero")
119
120        if context_tag_type:
121            contexts = [x for x in self.__tag_contexts if isinstance(x, context_tag_type)]
122        else:
123            contexts = self.__tag_contexts
124
125        try:
126            return contexts[-(depth + 1)]
127        except IndexError as exc:
128            raise ValueError(f"invalid depth: {depth}. Max depth: {len(contexts) - 1}") from exc
129
130    def tag_resolver(self, value: Any, blueprint: Blueprint) -> Any:
131        """Check if we have any special tags that need handling"""
132        val = copy(value)
133
134        if isinstance(value, YAMLTagContext):
135            self.__tag_contexts.append(value)
136
137        if isinstance(value, YAMLTag):
138            val = value.resolve(self, blueprint)
139
140        if isinstance(value, dict):
141            for key, inner_value in value.items():
142                val[key] = self.tag_resolver(inner_value, blueprint)
143        if isinstance(value, list):
144            for idx, inner_value in enumerate(value):
145                val[idx] = self.tag_resolver(inner_value, blueprint)
146
147        if isinstance(value, YAMLTagContext):
148            self.__tag_contexts.pop()
149
150        return val
151
152    def get_attrs(self, blueprint: Blueprint) -> dict[str, Any]:
153        """Get attributes of this entry, with all yaml tags resolved"""
154        return self.tag_resolver(self.attrs, blueprint)
155
156    def get_identifiers(self, blueprint: Blueprint) -> dict[str, Any]:
157        """Get attributes of this entry, with all yaml tags resolved"""
158        return self.tag_resolver(self.identifiers, blueprint)
159
160    def get_state(self, blueprint: Blueprint) -> BlueprintEntryDesiredState:
161        """Get the blueprint state, with yaml tags resolved if present"""
162        return BlueprintEntryDesiredState(self.tag_resolver(self.state, blueprint))
163
164    def get_model(self, blueprint: Blueprint) -> str:
165        """Get the blueprint model, with yaml tags resolved if present"""
166        return str(self.tag_resolver(self.model, blueprint))
167
168    def get_permissions(self, blueprint: Blueprint) -> Generator[BlueprintEntryPermission]:
169        """Get permissions of this entry, with all yaml tags resolved"""
170        for perm in self.permissions:
171            yield BlueprintEntryPermission(
172                permission=self.tag_resolver(perm.permission, blueprint),
173                user=self.tag_resolver(perm.user, blueprint),
174                role=self.tag_resolver(perm.role, blueprint),
175            )
176
177    def check_all_conditions_match(self, blueprint: Blueprint) -> bool:
178        """Check all conditions of this entry match (evaluate to True)"""
179        return all(self.tag_resolver(self.conditions, blueprint))

Single entry of a blueprint

BlueprintEntry( model: str | YAMLTag, state: BlueprintEntryDesiredState | YAMLTag = <BlueprintEntryDesiredState.PRESENT: 'present'>, conditions: list[typing.Any] = <factory>, identifiers: dict[str, typing.Any] = <factory>, attrs: dict[str, Any] | None = <factory>, permissions: list[BlueprintEntryPermission] = <factory>, id: str | None = None, _state: BlueprintEntryState = <factory>)
model: str | YAMLTag
conditions: list[typing.Any]
identifiers: dict[str, typing.Any]
attrs: dict[str, Any] | None
permissions: list[BlueprintEntryPermission]
id: str | None = None
@staticmethod
def from_model( model: authentik.lib.models.SerializerModel, *extra_identifier_names: str) -> BlueprintEntry:
 95    @staticmethod
 96    def from_model(model: SerializerModel, *extra_identifier_names: str) -> BlueprintEntry:
 97        """Convert a SerializerModel instance to a blueprint Entry"""
 98        identifiers = {
 99            "pk": model.pk,
100        }
101        all_attrs = get_attrs(model)
102
103        for extra_identifier_name in extra_identifier_names:
104            identifiers[extra_identifier_name] = all_attrs.pop(extra_identifier_name, None)
105        return BlueprintEntry(
106            identifiers=identifiers,
107            model=f"{model._meta.app_label}.{model._meta.model_name}",
108            attrs=all_attrs,
109        )

Convert a SerializerModel instance to a blueprint Entry

def get_tag_context( self, depth: int = 0, context_tag_type: type[YAMLTagContext] | tuple[YAMLTagContext, ...] | None = None) -> YAMLTagContext:
111    def get_tag_context(
112        self,
113        depth: int = 0,
114        context_tag_type: type[YAMLTagContext] | tuple[YAMLTagContext, ...] | None = None,
115    ) -> YAMLTagContext:
116        """Get a YAMLTagContext object located at a certain depth in the tag tree"""
117        if depth < 0:
118            raise ValueError("depth must be a positive number or zero")
119
120        if context_tag_type:
121            contexts = [x for x in self.__tag_contexts if isinstance(x, context_tag_type)]
122        else:
123            contexts = self.__tag_contexts
124
125        try:
126            return contexts[-(depth + 1)]
127        except IndexError as exc:
128            raise ValueError(f"invalid depth: {depth}. Max depth: {len(contexts) - 1}") from exc

Get a YAMLTagContext object located at a certain depth in the tag tree

def tag_resolver( self, value: Any, blueprint: Blueprint) -> Any:
130    def tag_resolver(self, value: Any, blueprint: Blueprint) -> Any:
131        """Check if we have any special tags that need handling"""
132        val = copy(value)
133
134        if isinstance(value, YAMLTagContext):
135            self.__tag_contexts.append(value)
136
137        if isinstance(value, YAMLTag):
138            val = value.resolve(self, blueprint)
139
140        if isinstance(value, dict):
141            for key, inner_value in value.items():
142                val[key] = self.tag_resolver(inner_value, blueprint)
143        if isinstance(value, list):
144            for idx, inner_value in enumerate(value):
145                val[idx] = self.tag_resolver(inner_value, blueprint)
146
147        if isinstance(value, YAMLTagContext):
148            self.__tag_contexts.pop()
149
150        return val

Check if we have any special tags that need handling

def get_attrs( self, blueprint: Blueprint) -> dict[str, typing.Any]:
152    def get_attrs(self, blueprint: Blueprint) -> dict[str, Any]:
153        """Get attributes of this entry, with all yaml tags resolved"""
154        return self.tag_resolver(self.attrs, blueprint)

Get attributes of this entry, with all yaml tags resolved

def get_identifiers( self, blueprint: Blueprint) -> dict[str, typing.Any]:
156    def get_identifiers(self, blueprint: Blueprint) -> dict[str, Any]:
157        """Get attributes of this entry, with all yaml tags resolved"""
158        return self.tag_resolver(self.identifiers, blueprint)

Get attributes of this entry, with all yaml tags resolved

def get_state( self, blueprint: Blueprint) -> BlueprintEntryDesiredState:
160    def get_state(self, blueprint: Blueprint) -> BlueprintEntryDesiredState:
161        """Get the blueprint state, with yaml tags resolved if present"""
162        return BlueprintEntryDesiredState(self.tag_resolver(self.state, blueprint))

Get the blueprint state, with yaml tags resolved if present

def get_model(self, blueprint: Blueprint) -> str:
164    def get_model(self, blueprint: Blueprint) -> str:
165        """Get the blueprint model, with yaml tags resolved if present"""
166        return str(self.tag_resolver(self.model, blueprint))

Get the blueprint model, with yaml tags resolved if present

def get_permissions( self, blueprint: Blueprint) -> Generator[BlueprintEntryPermission]:
168    def get_permissions(self, blueprint: Blueprint) -> Generator[BlueprintEntryPermission]:
169        """Get permissions of this entry, with all yaml tags resolved"""
170        for perm in self.permissions:
171            yield BlueprintEntryPermission(
172                permission=self.tag_resolver(perm.permission, blueprint),
173                user=self.tag_resolver(perm.user, blueprint),
174                role=self.tag_resolver(perm.role, blueprint),
175            )

Get permissions of this entry, with all yaml tags resolved

def check_all_conditions_match(self, blueprint: Blueprint) -> bool:
177    def check_all_conditions_match(self, blueprint: Blueprint) -> bool:
178        """Check all conditions of this entry match (evaluate to True)"""
179        return all(self.tag_resolver(self.conditions, blueprint))

Check all conditions of this entry match (evaluate to True)

@dataclass
class BlueprintMetadata:
182@dataclass
183class BlueprintMetadata:
184    """Optional blueprint metadata"""
185
186    name: str
187    labels: dict[str, str] = field(default_factory=dict)

Optional blueprint metadata

BlueprintMetadata(name: str, labels: dict[str, str] = <factory>)
name: str
labels: dict[str, str]
@dataclass
class Blueprint:
190@dataclass
191class Blueprint:
192    """Dataclass used for a full export"""
193
194    version: int = field(default=1)
195    entries: list[BlueprintEntry] | dict[str, list[BlueprintEntry]] = field(default_factory=list)
196    context: dict = field(default_factory=dict)
197
198    metadata: BlueprintMetadata | None = field(default=None)
199
200    def iter_entries(self) -> Iterable[BlueprintEntry]:
201        if isinstance(self.entries, dict):
202            for _section, entries in self.entries.items():
203                yield from entries
204        else:
205            yield from self.entries

Dataclass used for a full export

Blueprint( version: int = 1, entries: list[BlueprintEntry] | dict[str, list[BlueprintEntry]] = <factory>, context: dict = <factory>, metadata: BlueprintMetadata | None = None)
version: int = 1
entries: list[BlueprintEntry] | dict[str, list[BlueprintEntry]]
context: dict
metadata: BlueprintMetadata | None = None
def iter_entries(self) -> Iterable[BlueprintEntry]:
200    def iter_entries(self) -> Iterable[BlueprintEntry]:
201        if isinstance(self.entries, dict):
202            for _section, entries in self.entries.items():
203                yield from entries
204        else:
205            yield from self.entries
class YAMLTag:
208class YAMLTag:
209    """Base class for all YAML Tags"""
210
211    def __repr__(self) -> str:
212        return str(self.resolve(BlueprintEntry(""), Blueprint()))
213
214    def resolve(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any:
215        """Implement yaml tag logic"""
216        raise NotImplementedError

Base class for all YAML Tags

def resolve( self, entry: BlueprintEntry, blueprint: Blueprint) -> Any:
214    def resolve(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any:
215        """Implement yaml tag logic"""
216        raise NotImplementedError

Implement yaml tag logic

class YAMLTagContext:
219class YAMLTagContext:
220    """Base class for all YAML Tag Contexts"""
221
222    def get_context(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any:
223        """Implement yaml tag context logic"""
224        raise NotImplementedError

Base class for all YAML Tag Contexts

def get_context( self, entry: BlueprintEntry, blueprint: Blueprint) -> Any:
222    def get_context(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any:
223        """Implement yaml tag context logic"""
224        raise NotImplementedError

Implement yaml tag context logic

class KeyOf(YAMLTag):
227class KeyOf(YAMLTag):
228    """Reference another object by their ID"""
229
230    id_from: str
231
232    def __init__(self, loader: BlueprintLoader, node: ScalarNode) -> None:
233        super().__init__()
234        self.id_from = node.value
235
236    def resolve(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any:
237        for _entry in blueprint.iter_entries():
238            if _entry.id == self.id_from and _entry._state.instance:
239                # Special handling for PolicyBindingModels, as they'll have a different PK
240                # which is used when creating policy bindings
241                if (
242                    isinstance(_entry._state.instance, PolicyBindingModel)
243                    and entry.model.lower() == "authentik_policies.policybinding"
244                ):
245                    return _entry._state.instance.pbm_uuid
246                return _entry._state.instance.pk
247        raise EntryInvalidError.from_entry(
248            f"KeyOf: failed to find entry with `id` of `{self.id_from}` and a model instance", entry
249        )

Reference another object by their ID

KeyOf( loader: BlueprintLoader, node: yaml.nodes.ScalarNode)
232    def __init__(self, loader: BlueprintLoader, node: ScalarNode) -> None:
233        super().__init__()
234        self.id_from = node.value
id_from: str
def resolve( self, entry: BlueprintEntry, blueprint: Blueprint) -> Any:
236    def resolve(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any:
237        for _entry in blueprint.iter_entries():
238            if _entry.id == self.id_from and _entry._state.instance:
239                # Special handling for PolicyBindingModels, as they'll have a different PK
240                # which is used when creating policy bindings
241                if (
242                    isinstance(_entry._state.instance, PolicyBindingModel)
243                    and entry.model.lower() == "authentik_policies.policybinding"
244                ):
245                    return _entry._state.instance.pbm_uuid
246                return _entry._state.instance.pk
247        raise EntryInvalidError.from_entry(
248            f"KeyOf: failed to find entry with `id` of `{self.id_from}` and a model instance", entry
249        )

Implement yaml tag logic

class Env(YAMLTag):
252class Env(YAMLTag):
253    """Lookup environment variable with optional default"""
254
255    key: str
256    default: Any | None
257
258    def __init__(self, loader: BlueprintLoader, node: ScalarNode | SequenceNode) -> None:
259        super().__init__()
260        self.default = None
261        if isinstance(node, ScalarNode):
262            self.key = node.value
263        if isinstance(node, SequenceNode):
264            self.key = loader.construct_object(node.value[0])
265            self.default = loader.construct_object(node.value[1])
266
267    def resolve(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any:
268        return getenv(self.key) or self.default

Lookup environment variable with optional default

Env( loader: BlueprintLoader, node: yaml.nodes.ScalarNode | yaml.nodes.SequenceNode)
258    def __init__(self, loader: BlueprintLoader, node: ScalarNode | SequenceNode) -> None:
259        super().__init__()
260        self.default = None
261        if isinstance(node, ScalarNode):
262            self.key = node.value
263        if isinstance(node, SequenceNode):
264            self.key = loader.construct_object(node.value[0])
265            self.default = loader.construct_object(node.value[1])
key: str
default: Any | None
def resolve( self, entry: BlueprintEntry, blueprint: Blueprint) -> Any:
267    def resolve(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any:
268        return getenv(self.key) or self.default

Implement yaml tag logic

class File(YAMLTag):
271class File(YAMLTag):
272    """Lookup file with optional default"""
273
274    path: str
275    default: Any | None
276
277    def __init__(self, loader: BlueprintLoader, node: ScalarNode | SequenceNode) -> None:
278        super().__init__()
279        self.default = None
280        if isinstance(node, ScalarNode):
281            self.path = node.value
282        if isinstance(node, SequenceNode):
283            self.path = loader.construct_object(node.value[0])
284            self.default = loader.construct_object(node.value[1])
285
286    def resolve(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any:
287        try:
288            with open(self.path, encoding="utf8") as _file:
289                return _file.read().strip()
290        except OSError as exc:
291            LOGGER.warning(
292                "Failed to read file. Falling back to default value",
293                path=self.path,
294                exc=exc,
295            )
296            return self.default

Lookup file with optional default

File( loader: BlueprintLoader, node: yaml.nodes.ScalarNode | yaml.nodes.SequenceNode)
277    def __init__(self, loader: BlueprintLoader, node: ScalarNode | SequenceNode) -> None:
278        super().__init__()
279        self.default = None
280        if isinstance(node, ScalarNode):
281            self.path = node.value
282        if isinstance(node, SequenceNode):
283            self.path = loader.construct_object(node.value[0])
284            self.default = loader.construct_object(node.value[1])
path: str
default: Any | None
def resolve( self, entry: BlueprintEntry, blueprint: Blueprint) -> Any:
286    def resolve(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any:
287        try:
288            with open(self.path, encoding="utf8") as _file:
289                return _file.read().strip()
290        except OSError as exc:
291            LOGGER.warning(
292                "Failed to read file. Falling back to default value",
293                path=self.path,
294                exc=exc,
295            )
296            return self.default

Implement yaml tag logic

class Context(YAMLTag):
299class Context(YAMLTag):
300    """Lookup key from instance context"""
301
302    key: str
303    default: Any | None
304
305    def __init__(self, loader: BlueprintLoader, node: ScalarNode | SequenceNode) -> None:
306        super().__init__()
307        self.default = None
308        if isinstance(node, ScalarNode):
309            self.key = node.value
310        if isinstance(node, SequenceNode):
311            self.key = loader.construct_object(node.value[0])
312            self.default = loader.construct_object(node.value[1])
313
314    def resolve(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any:
315        value = self.default
316        if self.key in blueprint.context:
317            value = blueprint.context[self.key]
318        if isinstance(value, YAMLTag):
319            return value.resolve(entry, blueprint)
320        return value

Lookup key from instance context

Context( loader: BlueprintLoader, node: yaml.nodes.ScalarNode | yaml.nodes.SequenceNode)
305    def __init__(self, loader: BlueprintLoader, node: ScalarNode | SequenceNode) -> None:
306        super().__init__()
307        self.default = None
308        if isinstance(node, ScalarNode):
309            self.key = node.value
310        if isinstance(node, SequenceNode):
311            self.key = loader.construct_object(node.value[0])
312            self.default = loader.construct_object(node.value[1])
key: str
default: Any | None
def resolve( self, entry: BlueprintEntry, blueprint: Blueprint) -> Any:
314    def resolve(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any:
315        value = self.default
316        if self.key in blueprint.context:
317            value = blueprint.context[self.key]
318        if isinstance(value, YAMLTag):
319            return value.resolve(entry, blueprint)
320        return value

Implement yaml tag logic

class ParseJSON(YAMLTag):
323class ParseJSON(YAMLTag):
324    """Parse JSON from context/env/etc value"""
325
326    raw: str
327
328    def __init__(self, loader: BlueprintLoader, node: ScalarNode) -> None:
329        super().__init__()
330        self.raw = node.value
331
332    def resolve(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any:
333        try:
334            return loads(self.raw)
335        except JSONDecodeError as exc:
336            raise EntryInvalidError.from_entry(exc, entry) from exc

Parse JSON from context/env/etc value

ParseJSON( loader: BlueprintLoader, node: yaml.nodes.ScalarNode)
328    def __init__(self, loader: BlueprintLoader, node: ScalarNode) -> None:
329        super().__init__()
330        self.raw = node.value
raw: str
def resolve( self, entry: BlueprintEntry, blueprint: Blueprint) -> Any:
332    def resolve(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any:
333        try:
334            return loads(self.raw)
335        except JSONDecodeError as exc:
336            raise EntryInvalidError.from_entry(exc, entry) from exc

Implement yaml tag logic

class Format(YAMLTag):
339class Format(YAMLTag):
340    """Format a string"""
341
342    format_string: str
343    args: list[Any]
344
345    def __init__(self, loader: BlueprintLoader, node: SequenceNode) -> None:
346        super().__init__()
347        self.format_string = loader.construct_object(node.value[0])
348        self.args = []
349        for raw_node in node.value[1:]:
350            self.args.append(loader.construct_object(raw_node))
351
352    def resolve(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any:
353        args = []
354        for arg in self.args:
355            if isinstance(arg, YAMLTag):
356                args.append(arg.resolve(entry, blueprint))
357            else:
358                args.append(arg)
359
360        try:
361            return self.format_string % tuple(args)
362        except TypeError as exc:
363            raise EntryInvalidError.from_entry(exc, entry) from exc

Format a string

Format( loader: BlueprintLoader, node: yaml.nodes.SequenceNode)
345    def __init__(self, loader: BlueprintLoader, node: SequenceNode) -> None:
346        super().__init__()
347        self.format_string = loader.construct_object(node.value[0])
348        self.args = []
349        for raw_node in node.value[1:]:
350            self.args.append(loader.construct_object(raw_node))
format_string: str
args: list[typing.Any]
def resolve( self, entry: BlueprintEntry, blueprint: Blueprint) -> Any:
352    def resolve(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any:
353        args = []
354        for arg in self.args:
355            if isinstance(arg, YAMLTag):
356                args.append(arg.resolve(entry, blueprint))
357            else:
358                args.append(arg)
359
360        try:
361            return self.format_string % tuple(args)
362        except TypeError as exc:
363            raise EntryInvalidError.from_entry(exc, entry) from exc

Implement yaml tag logic

class Find(YAMLTag):
366class Find(YAMLTag):
367    """Find any object primary key"""
368
369    model_name: str | YAMLTag
370    conditions: list[list]
371
372    def __init__(self, loader: BlueprintLoader, node: SequenceNode) -> None:
373        super().__init__()
374        self.model_name = loader.construct_object(node.value[0])
375        self.conditions = []
376        for raw_node in node.value[1:]:
377            values = []
378            for node_values in raw_node.value:
379                values.append(loader.construct_object(node_values))
380            self.conditions.append(values)
381
382    def _get_instance(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any:
383        if isinstance(self.model_name, YAMLTag):
384            model_name = self.model_name.resolve(entry, blueprint)
385        else:
386            model_name = self.model_name
387
388        try:
389            model_class = apps.get_model(*model_name.split("."))
390        except LookupError as exc:
391            raise EntryInvalidError.from_entry(exc, entry) from exc
392
393        query = Q()
394        for cond in self.conditions:
395            if isinstance(cond[0], YAMLTag):
396                query_key = cond[0].resolve(entry, blueprint)
397            else:
398                query_key = cond[0]
399            if isinstance(cond[1], YAMLTag):
400                query_value = cond[1].resolve(entry, blueprint)
401            else:
402                query_value = cond[1]
403            query &= Q(**{query_key: query_value})
404        return model_class.objects.filter(query).first()
405
406    def resolve(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any:
407        instance = self._get_instance(entry, blueprint)
408        if instance:
409            return instance.pk
410        return None

Find any object primary key

Find( loader: BlueprintLoader, node: yaml.nodes.SequenceNode)
372    def __init__(self, loader: BlueprintLoader, node: SequenceNode) -> None:
373        super().__init__()
374        self.model_name = loader.construct_object(node.value[0])
375        self.conditions = []
376        for raw_node in node.value[1:]:
377            values = []
378            for node_values in raw_node.value:
379                values.append(loader.construct_object(node_values))
380            self.conditions.append(values)
model_name: str | YAMLTag
conditions: list[list]
def resolve( self, entry: BlueprintEntry, blueprint: Blueprint) -> Any:
406    def resolve(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any:
407        instance = self._get_instance(entry, blueprint)
408        if instance:
409            return instance.pk
410        return None

Implement yaml tag logic

class FindObject(Find):
413class FindObject(Find):
414    """Find any object"""
415
416    def resolve(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any:
417        instance = self._get_instance(entry, blueprint)
418        if not instance:
419            return None
420        if not isinstance(instance, SerializerModel):
421            raise EntryInvalidError.from_entry(
422                f"Model {self.model_name} is not resolvable through FindObject", entry
423            )
424        return instance.serializer(instance=instance).data

Find any object

def resolve( self, entry: BlueprintEntry, blueprint: Blueprint) -> Any:
416    def resolve(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any:
417        instance = self._get_instance(entry, blueprint)
418        if not instance:
419            return None
420        if not isinstance(instance, SerializerModel):
421            raise EntryInvalidError.from_entry(
422                f"Model {self.model_name} is not resolvable through FindObject", entry
423            )
424        return instance.serializer(instance=instance).data

Implement yaml tag logic

Inherited Members
Find
Find
model_name
conditions
class Condition(YAMLTag):
427class Condition(YAMLTag):
428    """Convert all values to a single boolean"""
429
430    mode: Literal["AND", "NAND", "OR", "NOR", "XOR", "XNOR"]
431    args: list[Any]
432
433    _COMPARATORS = {
434        # Using all and any here instead of from operator import iand, ior
435        # to improve performance
436        "AND": all,
437        "NAND": lambda args: not all(args),
438        "OR": any,
439        "NOR": lambda args: not any(args),
440        "XOR": lambda args: reduce(ixor, args) if len(args) > 1 else args[0],
441        "XNOR": lambda args: not (reduce(ixor, args) if len(args) > 1 else args[0]),
442    }
443
444    def __init__(self, loader: BlueprintLoader, node: SequenceNode) -> None:
445        super().__init__()
446        self.mode = loader.construct_object(node.value[0])
447        self.args = []
448        for raw_node in node.value[1:]:
449            self.args.append(loader.construct_object(raw_node))
450
451    def resolve(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any:
452        args = []
453        for arg in self.args:
454            if isinstance(arg, YAMLTag):
455                args.append(arg.resolve(entry, blueprint))
456            else:
457                args.append(arg)
458
459        if not args:
460            raise EntryInvalidError.from_entry(
461                "At least one value is required after mode selection.", entry
462            )
463
464        try:
465            comparator = self._COMPARATORS[self.mode.upper()]
466            return comparator(tuple(bool(x) for x in args))
467        except (TypeError, KeyError) as exc:
468            raise EntryInvalidError.from_entry(exc, entry) from exc

Convert all values to a single boolean

Condition( loader: BlueprintLoader, node: yaml.nodes.SequenceNode)
444    def __init__(self, loader: BlueprintLoader, node: SequenceNode) -> None:
445        super().__init__()
446        self.mode = loader.construct_object(node.value[0])
447        self.args = []
448        for raw_node in node.value[1:]:
449            self.args.append(loader.construct_object(raw_node))
mode: Literal['AND', 'NAND', 'OR', 'NOR', 'XOR', 'XNOR']
args: list[typing.Any]
def resolve( self, entry: BlueprintEntry, blueprint: Blueprint) -> Any:
451    def resolve(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any:
452        args = []
453        for arg in self.args:
454            if isinstance(arg, YAMLTag):
455                args.append(arg.resolve(entry, blueprint))
456            else:
457                args.append(arg)
458
459        if not args:
460            raise EntryInvalidError.from_entry(
461                "At least one value is required after mode selection.", entry
462            )
463
464        try:
465            comparator = self._COMPARATORS[self.mode.upper()]
466            return comparator(tuple(bool(x) for x in args))
467        except (TypeError, KeyError) as exc:
468            raise EntryInvalidError.from_entry(exc, entry) from exc

Implement yaml tag logic

class If(YAMLTag):
471class If(YAMLTag):
472    """Select YAML to use based on condition"""
473
474    condition: Any
475    when_true: Any
476    when_false: Any
477
478    def __init__(self, loader: BlueprintLoader, node: SequenceNode) -> None:
479        super().__init__()
480        self.condition = loader.construct_object(node.value[0])
481        if len(node.value) == 1:
482            self.when_true = True
483            self.when_false = False
484        else:
485            self.when_true = loader.construct_object(node.value[1])
486            self.when_false = loader.construct_object(node.value[2])
487
488    def resolve(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any:
489        if isinstance(self.condition, YAMLTag):
490            condition = self.condition.resolve(entry, blueprint)
491        else:
492            condition = self.condition
493
494        try:
495            return entry.tag_resolver(
496                self.when_true if condition else self.when_false,
497                blueprint,
498            )
499        except TypeError as exc:
500            raise EntryInvalidError.from_entry(exc, entry) from exc

Select YAML to use based on condition

If( loader: BlueprintLoader, node: yaml.nodes.SequenceNode)
478    def __init__(self, loader: BlueprintLoader, node: SequenceNode) -> None:
479        super().__init__()
480        self.condition = loader.construct_object(node.value[0])
481        if len(node.value) == 1:
482            self.when_true = True
483            self.when_false = False
484        else:
485            self.when_true = loader.construct_object(node.value[1])
486            self.when_false = loader.construct_object(node.value[2])
condition: Any
when_true: Any
when_false: Any
def resolve( self, entry: BlueprintEntry, blueprint: Blueprint) -> Any:
488    def resolve(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any:
489        if isinstance(self.condition, YAMLTag):
490            condition = self.condition.resolve(entry, blueprint)
491        else:
492            condition = self.condition
493
494        try:
495            return entry.tag_resolver(
496                self.when_true if condition else self.when_false,
497                blueprint,
498            )
499        except TypeError as exc:
500            raise EntryInvalidError.from_entry(exc, entry) from exc

Implement yaml tag logic

class Enumerate(YAMLTag, YAMLTagContext):
503class Enumerate(YAMLTag, YAMLTagContext):
504    """Iterate over an iterable."""
505
506    iterable: YAMLTag | Iterable
507    item_body: Any
508    output_body: Literal["SEQ", "MAP"]
509
510    _OUTPUT_BODIES = {
511        "SEQ": (list, lambda a, b: [*a, b]),
512        "MAP": (
513            dict,
514            lambda a, b: always_merger.merge(a, {b[0]: b[1]} if isinstance(b, tuple | list) else b),
515        ),
516    }
517
518    def __init__(self, loader: BlueprintLoader, node: SequenceNode) -> None:
519        super().__init__()
520        self.iterable = loader.construct_object(node.value[0])
521        self.output_body = loader.construct_object(node.value[1])
522        self.item_body = loader.construct_object(node.value[2])
523        self.__current_context: tuple[Any, Any] = tuple()
524
525    def get_context(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any:
526        return self.__current_context
527
528    def resolve(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any:
529        if isinstance(self.iterable, EnumeratedItem) and self.iterable.depth == 0:
530            raise EntryInvalidError.from_entry(
531                f"{self.__class__.__name__} tag's iterable references this tag's context. "
532                "This is a noop. Check you are setting depth bigger than 0.",
533                entry,
534            )
535
536        if isinstance(self.iterable, YAMLTag):
537            iterable = self.iterable.resolve(entry, blueprint)
538        else:
539            iterable = self.iterable
540
541        if not isinstance(iterable, Iterable):
542            raise EntryInvalidError.from_entry(
543                f"{self.__class__.__name__}'s iterable must be an iterable "
544                "such as a sequence or a mapping",
545                entry,
546            )
547
548        if isinstance(iterable, Mapping):
549            iterable = tuple(iterable.items())
550        else:
551            iterable = tuple(enumerate(iterable))
552
553        try:
554            output_class, add_fn = self._OUTPUT_BODIES[self.output_body.upper()]
555        except KeyError as exc:
556            raise EntryInvalidError.from_entry(exc, entry) from exc
557
558        result = output_class()
559
560        self.__current_context = tuple()
561
562        try:
563            for item in iterable:
564                self.__current_context = item
565                resolved_body = entry.tag_resolver(self.item_body, blueprint)
566                result = add_fn(result, resolved_body)
567                if not isinstance(result, output_class):
568                    raise EntryInvalidError.from_entry(
569                        f"Invalid {self.__class__.__name__} item found: {resolved_body}", entry
570                    )
571        finally:
572            self.__current_context = tuple()
573
574        return result

Iterate over an iterable.

Enumerate( loader: BlueprintLoader, node: yaml.nodes.SequenceNode)
518    def __init__(self, loader: BlueprintLoader, node: SequenceNode) -> None:
519        super().__init__()
520        self.iterable = loader.construct_object(node.value[0])
521        self.output_body = loader.construct_object(node.value[1])
522        self.item_body = loader.construct_object(node.value[2])
523        self.__current_context: tuple[Any, Any] = tuple()
iterable: YAMLTag | Iterable
item_body: Any
output_body: Literal['SEQ', 'MAP']
def get_context( self, entry: BlueprintEntry, blueprint: Blueprint) -> Any:
525    def get_context(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any:
526        return self.__current_context

Implement yaml tag context logic

def resolve( self, entry: BlueprintEntry, blueprint: Blueprint) -> Any:
528    def resolve(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any:
529        if isinstance(self.iterable, EnumeratedItem) and self.iterable.depth == 0:
530            raise EntryInvalidError.from_entry(
531                f"{self.__class__.__name__} tag's iterable references this tag's context. "
532                "This is a noop. Check you are setting depth bigger than 0.",
533                entry,
534            )
535
536        if isinstance(self.iterable, YAMLTag):
537            iterable = self.iterable.resolve(entry, blueprint)
538        else:
539            iterable = self.iterable
540
541        if not isinstance(iterable, Iterable):
542            raise EntryInvalidError.from_entry(
543                f"{self.__class__.__name__}'s iterable must be an iterable "
544                "such as a sequence or a mapping",
545                entry,
546            )
547
548        if isinstance(iterable, Mapping):
549            iterable = tuple(iterable.items())
550        else:
551            iterable = tuple(enumerate(iterable))
552
553        try:
554            output_class, add_fn = self._OUTPUT_BODIES[self.output_body.upper()]
555        except KeyError as exc:
556            raise EntryInvalidError.from_entry(exc, entry) from exc
557
558        result = output_class()
559
560        self.__current_context = tuple()
561
562        try:
563            for item in iterable:
564                self.__current_context = item
565                resolved_body = entry.tag_resolver(self.item_body, blueprint)
566                result = add_fn(result, resolved_body)
567                if not isinstance(result, output_class):
568                    raise EntryInvalidError.from_entry(
569                        f"Invalid {self.__class__.__name__} item found: {resolved_body}", entry
570                    )
571        finally:
572            self.__current_context = tuple()
573
574        return result

Implement yaml tag logic

class EnumeratedItem(YAMLTag):
577class EnumeratedItem(YAMLTag):
578    """Get the current item value and index provided by an Enumerate tag context"""
579
580    depth: int
581
582    _SUPPORTED_CONTEXT_TAGS = (Enumerate,)
583
584    def __init__(self, _loader: BlueprintLoader, node: ScalarNode) -> None:
585        super().__init__()
586        self.depth = int(node.value)
587
588    def resolve(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any:
589        try:
590            context_tag: Enumerate = entry.get_tag_context(
591                depth=self.depth,
592                context_tag_type=EnumeratedItem._SUPPORTED_CONTEXT_TAGS,
593            )
594        except ValueError as exc:
595            if self.depth == 0:
596                raise EntryInvalidError.from_entry(
597                    f"{self.__class__.__name__} tags are only usable "
598                    f"inside an {Enumerate.__name__} tag",
599                    entry,
600                ) from exc
601
602            raise EntryInvalidError.from_entry(
603                f"{self.__class__.__name__} tag: {exc}", entry
604            ) from exc
605
606        return context_tag.get_context(entry, blueprint)

Get the current item value and index provided by an Enumerate tag context

EnumeratedItem( _loader: BlueprintLoader, node: yaml.nodes.ScalarNode)
584    def __init__(self, _loader: BlueprintLoader, node: ScalarNode) -> None:
585        super().__init__()
586        self.depth = int(node.value)
depth: int
def resolve( self, entry: BlueprintEntry, blueprint: Blueprint) -> Any:
588    def resolve(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any:
589        try:
590            context_tag: Enumerate = entry.get_tag_context(
591                depth=self.depth,
592                context_tag_type=EnumeratedItem._SUPPORTED_CONTEXT_TAGS,
593            )
594        except ValueError as exc:
595            if self.depth == 0:
596                raise EntryInvalidError.from_entry(
597                    f"{self.__class__.__name__} tags are only usable "
598                    f"inside an {Enumerate.__name__} tag",
599                    entry,
600                ) from exc
601
602            raise EntryInvalidError.from_entry(
603                f"{self.__class__.__name__} tag: {exc}", entry
604            ) from exc
605
606        return context_tag.get_context(entry, blueprint)

Implement yaml tag logic

class Index(EnumeratedItem):
609class Index(EnumeratedItem):
610    """Get the current item index provided by an Enumerate tag context"""
611
612    def resolve(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any:
613        context = super().resolve(entry, blueprint)
614
615        try:
616            return context[0]
617        except IndexError as exc:  # pragma: no cover
618            raise EntryInvalidError.from_entry(f"Empty/invalid context: {context}", entry) from exc

Get the current item index provided by an Enumerate tag context

def resolve( self, entry: BlueprintEntry, blueprint: Blueprint) -> Any:
612    def resolve(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any:
613        context = super().resolve(entry, blueprint)
614
615        try:
616            return context[0]
617        except IndexError as exc:  # pragma: no cover
618            raise EntryInvalidError.from_entry(f"Empty/invalid context: {context}", entry) from exc

Implement yaml tag logic

Inherited Members
EnumeratedItem
EnumeratedItem
depth
class Value(EnumeratedItem):
621class Value(EnumeratedItem):
622    """Get the current item value provided by an Enumerate tag context"""
623
624    def resolve(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any:
625        context = super().resolve(entry, blueprint)
626
627        try:
628            return context[1]
629        except IndexError as exc:  # pragma: no cover
630            raise EntryInvalidError.from_entry(f"Empty/invalid context: {context}", entry) from exc

Get the current item value provided by an Enumerate tag context

def resolve( self, entry: BlueprintEntry, blueprint: Blueprint) -> Any:
624    def resolve(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any:
625        context = super().resolve(entry, blueprint)
626
627        try:
628            return context[1]
629        except IndexError as exc:  # pragma: no cover
630            raise EntryInvalidError.from_entry(f"Empty/invalid context: {context}", entry) from exc

Implement yaml tag logic

Inherited Members
EnumeratedItem
EnumeratedItem
depth
class AtIndex(YAMLTag):
633class AtIndex(YAMLTag):
634    """Get value at index of a sequence or mapping"""
635
636    obj: YAMLTag | dict | list | tuple
637    attribute: int | str | YAMLTag
638    default: Any | UNSET
639
640    def __init__(self, loader: BlueprintLoader, node: SequenceNode) -> None:
641        super().__init__()
642        self.obj = loader.construct_object(node.value[0])
643        self.attribute = loader.construct_object(node.value[1])
644        if len(node.value) == 2:  # noqa: PLR2004
645            self.default = UNSET
646        else:
647            self.default = loader.construct_object(node.value[2])
648
649    def resolve(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any:
650        if isinstance(self.obj, YAMLTag):
651            obj = self.obj.resolve(entry, blueprint)
652        else:
653            obj = self.obj
654        if isinstance(self.attribute, YAMLTag):
655            attribute = self.attribute.resolve(entry, blueprint)
656        else:
657            attribute = self.attribute
658
659        if isinstance(obj, list | tuple):
660            try:
661                return obj[attribute]
662            except TypeError as exc:
663                raise EntryInvalidError.from_entry(
664                    f"Invalid index for list: {attribute}", entry
665                ) from exc
666            except IndexError as exc:
667                if self.default is UNSET:
668                    raise EntryInvalidError.from_entry(
669                        f"Index out of range: {attribute}", entry
670                    ) from exc
671                return self.default
672        if attribute in obj:
673            return obj[attribute]
674        else:
675            if self.default is UNSET:
676                raise EntryInvalidError.from_entry(f"Key does not exist: {attribute}", entry)
677            return self.default

Get value at index of a sequence or mapping

AtIndex( loader: BlueprintLoader, node: yaml.nodes.SequenceNode)
640    def __init__(self, loader: BlueprintLoader, node: SequenceNode) -> None:
641        super().__init__()
642        self.obj = loader.construct_object(node.value[0])
643        self.attribute = loader.construct_object(node.value[1])
644        if len(node.value) == 2:  # noqa: PLR2004
645            self.default = UNSET
646        else:
647            self.default = loader.construct_object(node.value[2])
obj: YAMLTag | dict | list | tuple
attribute: int | str | YAMLTag
default: Any | UNSET
def resolve( self, entry: BlueprintEntry, blueprint: Blueprint) -> Any:
649    def resolve(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any:
650        if isinstance(self.obj, YAMLTag):
651            obj = self.obj.resolve(entry, blueprint)
652        else:
653            obj = self.obj
654        if isinstance(self.attribute, YAMLTag):
655            attribute = self.attribute.resolve(entry, blueprint)
656        else:
657            attribute = self.attribute
658
659        if isinstance(obj, list | tuple):
660            try:
661                return obj[attribute]
662            except TypeError as exc:
663                raise EntryInvalidError.from_entry(
664                    f"Invalid index for list: {attribute}", entry
665                ) from exc
666            except IndexError as exc:
667                if self.default is UNSET:
668                    raise EntryInvalidError.from_entry(
669                        f"Index out of range: {attribute}", entry
670                    ) from exc
671                return self.default
672        if attribute in obj:
673            return obj[attribute]
674        else:
675            if self.default is UNSET:
676                raise EntryInvalidError.from_entry(f"Key does not exist: {attribute}", entry)
677            return self.default

Implement yaml tag logic

class BlueprintDumper(yaml.dumper.SafeDumper):
680class BlueprintDumper(SafeDumper):
681    """Dump dataclasses to yaml"""
682
683    default_flow_style = False
684
685    def __init__(self, *args, **kwargs):
686        super().__init__(*args, **kwargs)
687        self.add_representer(UUID, lambda self, data: self.represent_str(str(data)))
688        self.add_representer(OrderedDict, lambda self, data: self.represent_dict(dict(data)))
689        self.add_representer(Enum, lambda self, data: self.represent_str(data.value))
690        self.add_representer(
691            BlueprintEntryDesiredState, lambda self, data: self.represent_str(data.value)
692        )
693        self.add_representer(None, lambda self, data: self.represent_str(str(data)))
694
695    def ignore_aliases(self, data):
696        """Don't use any YAML anchors"""
697        return True
698
699    def represent(self, data) -> None:
700        if is_dataclass(data):
701
702            def factory(items):
703                final_dict = dict(items)
704                # Remove internal state variables
705                final_dict.pop("_state", None)
706                # Future-proof to only remove the ID if we don't set a value
707                if "id" in final_dict and final_dict.get("id") is None:
708                    final_dict.pop("id")
709                return final_dict
710
711            data = asdict(data, dict_factory=factory)
712        return super().represent(data)

Dump dataclasses to yaml

BlueprintDumper(*args, **kwargs)
685    def __init__(self, *args, **kwargs):
686        super().__init__(*args, **kwargs)
687        self.add_representer(UUID, lambda self, data: self.represent_str(str(data)))
688        self.add_representer(OrderedDict, lambda self, data: self.represent_dict(dict(data)))
689        self.add_representer(Enum, lambda self, data: self.represent_str(data.value))
690        self.add_representer(
691            BlueprintEntryDesiredState, lambda self, data: self.represent_str(data.value)
692        )
693        self.add_representer(None, lambda self, data: self.represent_str(str(data)))
default_flow_style = False
def ignore_aliases(self, data):
695    def ignore_aliases(self, data):
696        """Don't use any YAML anchors"""
697        return True

Don't use any YAML anchors

def represent(self, data) -> None:
699    def represent(self, data) -> None:
700        if is_dataclass(data):
701
702            def factory(items):
703                final_dict = dict(items)
704                # Remove internal state variables
705                final_dict.pop("_state", None)
706                # Future-proof to only remove the ID if we don't set a value
707                if "id" in final_dict and final_dict.get("id") is None:
708                    final_dict.pop("id")
709                return final_dict
710
711            data = asdict(data, dict_factory=factory)
712        return super().represent(data)
class BlueprintLoader(yaml.loader.SafeLoader):
715class BlueprintLoader(SafeLoader):
716    """Loader for blueprints with custom tag support"""
717
718    def __init__(self, *args, **kwargs):
719        super().__init__(*args, **kwargs)
720        self.add_constructor("!KeyOf", KeyOf)
721        self.add_constructor("!Find", Find)
722        self.add_constructor("!FindObject", FindObject)
723        self.add_constructor("!Context", Context)
724        self.add_constructor("!Format", Format)
725        self.add_constructor("!Condition", Condition)
726        self.add_constructor("!If", If)
727        self.add_constructor("!Env", Env)
728        self.add_constructor("!File", File)
729        self.add_constructor("!Enumerate", Enumerate)
730        self.add_constructor("!Value", Value)
731        self.add_constructor("!Index", Index)
732        self.add_constructor("!AtIndex", AtIndex)
733        self.add_constructor("!ParseJSON", ParseJSON)

Loader for blueprints with custom tag support

BlueprintLoader(*args, **kwargs)
718    def __init__(self, *args, **kwargs):
719        super().__init__(*args, **kwargs)
720        self.add_constructor("!KeyOf", KeyOf)
721        self.add_constructor("!Find", Find)
722        self.add_constructor("!FindObject", FindObject)
723        self.add_constructor("!Context", Context)
724        self.add_constructor("!Format", Format)
725        self.add_constructor("!Condition", Condition)
726        self.add_constructor("!If", If)
727        self.add_constructor("!Env", Env)
728        self.add_constructor("!File", File)
729        self.add_constructor("!Enumerate", Enumerate)
730        self.add_constructor("!Value", Value)
731        self.add_constructor("!Index", Index)
732        self.add_constructor("!AtIndex", AtIndex)
733        self.add_constructor("!ParseJSON", ParseJSON)

Initialize the scanner.

class EntryInvalidError(authentik.lib.sentry.SentryIgnoredException):
736class EntryInvalidError(SentryIgnoredException):
737    """Error raised when an entry is invalid"""
738
739    entry_model: str | None
740    entry_id: str | None
741    validation_error: ValidationError | None
742    serializer: Serializer | None = None
743
744    def __init__(
745        self, *args: object, validation_error: ValidationError | None = None, **kwargs
746    ) -> None:
747        super().__init__(*args)
748        self.entry_model = None
749        self.entry_id = None
750        self.validation_error = validation_error
751        for key, value in kwargs.items():
752            setattr(self, key, value)
753
754    @staticmethod
755    def from_entry(
756        msg_or_exc: str | Exception, entry: BlueprintEntry, *args, **kwargs
757    ) -> EntryInvalidError:
758        """Create EntryInvalidError with the context of an entry"""
759        error = EntryInvalidError(msg_or_exc, *args, **kwargs)
760        if isinstance(msg_or_exc, ValidationError):
761            error.validation_error = msg_or_exc
762        # Make sure the model and id are strings, depending where the error happens
763        # they might still be YAMLTag instances
764        error.entry_model = str(entry.model)
765        error.entry_id = str(entry.id)
766        return error

Error raised when an entry is invalid

EntryInvalidError( *args: object, validation_error: rest_framework.exceptions.ValidationError | None = None, **kwargs)
744    def __init__(
745        self, *args: object, validation_error: ValidationError | None = None, **kwargs
746    ) -> None:
747        super().__init__(*args)
748        self.entry_model = None
749        self.entry_id = None
750        self.validation_error = validation_error
751        for key, value in kwargs.items():
752            setattr(self, key, value)
entry_model: str | None
entry_id: str | None
validation_error: rest_framework.exceptions.ValidationError | None
serializer: rest_framework.serializers.Serializer | None = None
@staticmethod
def from_entry( msg_or_exc: str | Exception, entry: BlueprintEntry, *args, **kwargs) -> EntryInvalidError:
754    @staticmethod
755    def from_entry(
756        msg_or_exc: str | Exception, entry: BlueprintEntry, *args, **kwargs
757    ) -> EntryInvalidError:
758        """Create EntryInvalidError with the context of an entry"""
759        error = EntryInvalidError(msg_or_exc, *args, **kwargs)
760        if isinstance(msg_or_exc, ValidationError):
761            error.validation_error = msg_or_exc
762        # Make sure the model and id are strings, depending where the error happens
763        # they might still be YAMLTag instances
764        error.entry_model = str(entry.model)
765        error.entry_id = str(entry.id)
766        return error

Create EntryInvalidError with the context of an entry