authentik.blueprints.v1.common
transfer common classes
1"""transfer common classes""" 2 3from collections import OrderedDict 4from collections.abc import Generator, Iterable, Mapping 5from copy import copy 6from dataclasses import asdict, dataclass, field, is_dataclass 7from enum import Enum 8from functools import reduce 9from json import JSONDecodeError, loads 10from operator import ixor 11from os import getenv 12from typing import Any, Literal 13from uuid import UUID 14 15from deepmerge import always_merger 16from django.apps import apps 17from django.db.models import Model, Q 18from rest_framework.exceptions import ValidationError 19from rest_framework.fields import Field 20from rest_framework.serializers import Serializer 21from structlog.stdlib import get_logger 22from yaml import SafeDumper, SafeLoader, ScalarNode, SequenceNode 23 24from authentik.lib.models import SerializerModel 25from authentik.lib.sentry import SentryIgnoredException 26from authentik.policies.models import PolicyBindingModel 27 28LOGGER = get_logger() 29 30 31class UNSET: 32 """Used to test whether a key has not been set.""" 33 34 35def get_attrs(obj: SerializerModel) -> dict[str, Any]: 36 """Get object's attributes via their serializer, and convert it to a normal dict""" 37 serializer: Serializer = obj.serializer(obj) 38 data = dict(serializer.data) 39 40 for field_name, _field in serializer.fields.items(): 41 _field: Field 42 if field_name not in data: 43 continue 44 if _field.read_only: 45 data.pop(field_name, None) 46 if field_name.endswith("_set"): 47 data.pop(field_name, None) 48 return data 49 50 51@dataclass 52class BlueprintEntryState: 53 """State of a single instance""" 54 55 instance: Model | None = None 56 57 58class BlueprintEntryDesiredState(Enum): 59 """State an entry should be reconciled to""" 60 61 ABSENT = "absent" 62 PRESENT = "present" 63 CREATED = "created" 64 MUST_CREATED = "must_created" 65 66 67@dataclass 68class BlueprintEntryPermission: 69 """Describe object-level permissions""" 70 71 permission: str | YAMLTag 72 user: int | YAMLTag | None = field(default=None) 73 role: str | YAMLTag | None = field(default=None) 74 75 76@dataclass 77class BlueprintEntry: 78 """Single entry of a blueprint""" 79 80 model: str | YAMLTag 81 state: BlueprintEntryDesiredState | YAMLTag = field(default=BlueprintEntryDesiredState.PRESENT) 82 conditions: list[Any] = field(default_factory=list) 83 identifiers: dict[str, Any] = field(default_factory=dict) 84 attrs: dict[str, Any] | None = field(default_factory=dict) 85 permissions: list[BlueprintEntryPermission] = field(default_factory=list) 86 87 id: str | None = None 88 89 _state: BlueprintEntryState = field(default_factory=BlueprintEntryState) 90 91 def __post_init__(self, *args, **kwargs) -> None: 92 self.__tag_contexts: list[YAMLTagContext] = [] 93 94 @staticmethod 95 def from_model(model: SerializerModel, *extra_identifier_names: str) -> BlueprintEntry: 96 """Convert a SerializerModel instance to a blueprint Entry""" 97 identifiers = { 98 "pk": model.pk, 99 } 100 all_attrs = get_attrs(model) 101 102 for extra_identifier_name in extra_identifier_names: 103 identifiers[extra_identifier_name] = all_attrs.pop(extra_identifier_name, None) 104 return BlueprintEntry( 105 identifiers=identifiers, 106 model=f"{model._meta.app_label}.{model._meta.model_name}", 107 attrs=all_attrs, 108 ) 109 110 def get_tag_context( 111 self, 112 depth: int = 0, 113 context_tag_type: type[YAMLTagContext] | tuple[YAMLTagContext, ...] | None = None, 114 ) -> YAMLTagContext: 115 """Get a YAMLTagContext object located at a certain depth in the tag tree""" 116 if depth < 0: 117 raise ValueError("depth must be a positive number or zero") 118 119 if context_tag_type: 120 contexts = [x for x in self.__tag_contexts if isinstance(x, context_tag_type)] 121 else: 122 contexts = self.__tag_contexts 123 124 try: 125 return contexts[-(depth + 1)] 126 except IndexError as exc: 127 raise ValueError(f"invalid depth: {depth}. Max depth: {len(contexts) - 1}") from exc 128 129 def tag_resolver(self, value: Any, blueprint: Blueprint) -> Any: 130 """Check if we have any special tags that need handling""" 131 val = copy(value) 132 133 if isinstance(value, YAMLTagContext): 134 self.__tag_contexts.append(value) 135 136 if isinstance(value, YAMLTag): 137 val = value.resolve(self, blueprint) 138 139 if isinstance(value, dict): 140 for key, inner_value in value.items(): 141 val[key] = self.tag_resolver(inner_value, blueprint) 142 if isinstance(value, list): 143 for idx, inner_value in enumerate(value): 144 val[idx] = self.tag_resolver(inner_value, blueprint) 145 146 if isinstance(value, YAMLTagContext): 147 self.__tag_contexts.pop() 148 149 return val 150 151 def get_attrs(self, blueprint: Blueprint) -> dict[str, Any]: 152 """Get attributes of this entry, with all yaml tags resolved""" 153 return self.tag_resolver(self.attrs, blueprint) 154 155 def get_identifiers(self, blueprint: Blueprint) -> dict[str, Any]: 156 """Get attributes of this entry, with all yaml tags resolved""" 157 return self.tag_resolver(self.identifiers, blueprint) 158 159 def get_state(self, blueprint: Blueprint) -> BlueprintEntryDesiredState: 160 """Get the blueprint state, with yaml tags resolved if present""" 161 return BlueprintEntryDesiredState(self.tag_resolver(self.state, blueprint)) 162 163 def get_model(self, blueprint: Blueprint) -> str: 164 """Get the blueprint model, with yaml tags resolved if present""" 165 return str(self.tag_resolver(self.model, blueprint)) 166 167 def get_permissions(self, blueprint: Blueprint) -> Generator[BlueprintEntryPermission]: 168 """Get permissions of this entry, with all yaml tags resolved""" 169 for perm in self.permissions: 170 yield BlueprintEntryPermission( 171 permission=self.tag_resolver(perm.permission, blueprint), 172 user=self.tag_resolver(perm.user, blueprint), 173 role=self.tag_resolver(perm.role, blueprint), 174 ) 175 176 def check_all_conditions_match(self, blueprint: Blueprint) -> bool: 177 """Check all conditions of this entry match (evaluate to True)""" 178 return all(self.tag_resolver(self.conditions, blueprint)) 179 180 181@dataclass 182class BlueprintMetadata: 183 """Optional blueprint metadata""" 184 185 name: str 186 labels: dict[str, str] = field(default_factory=dict) 187 188 189@dataclass 190class Blueprint: 191 """Dataclass used for a full export""" 192 193 version: int = field(default=1) 194 entries: list[BlueprintEntry] | dict[str, list[BlueprintEntry]] = field(default_factory=list) 195 context: dict = field(default_factory=dict) 196 197 metadata: BlueprintMetadata | None = field(default=None) 198 199 def iter_entries(self) -> Iterable[BlueprintEntry]: 200 if isinstance(self.entries, dict): 201 for _section, entries in self.entries.items(): 202 yield from entries 203 else: 204 yield from self.entries 205 206 207class YAMLTag: 208 """Base class for all YAML Tags""" 209 210 def __repr__(self) -> str: 211 return str(self.resolve(BlueprintEntry(""), Blueprint())) 212 213 def resolve(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any: 214 """Implement yaml tag logic""" 215 raise NotImplementedError 216 217 218class YAMLTagContext: 219 """Base class for all YAML Tag Contexts""" 220 221 def get_context(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any: 222 """Implement yaml tag context logic""" 223 raise NotImplementedError 224 225 226class KeyOf(YAMLTag): 227 """Reference another object by their ID""" 228 229 id_from: str 230 231 def __init__(self, loader: BlueprintLoader, node: ScalarNode) -> None: 232 super().__init__() 233 self.id_from = node.value 234 235 def resolve(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any: 236 for _entry in blueprint.iter_entries(): 237 if _entry.id == self.id_from and _entry._state.instance: 238 # Special handling for PolicyBindingModels, as they'll have a different PK 239 # which is used when creating policy bindings 240 if ( 241 isinstance(_entry._state.instance, PolicyBindingModel) 242 and entry.model.lower() == "authentik_policies.policybinding" 243 ): 244 return _entry._state.instance.pbm_uuid 245 return _entry._state.instance.pk 246 raise EntryInvalidError.from_entry( 247 f"KeyOf: failed to find entry with `id` of `{self.id_from}` and a model instance", entry 248 ) 249 250 251class Env(YAMLTag): 252 """Lookup environment variable with optional default""" 253 254 key: str 255 default: Any | None 256 257 def __init__(self, loader: BlueprintLoader, node: ScalarNode | SequenceNode) -> None: 258 super().__init__() 259 self.default = None 260 if isinstance(node, ScalarNode): 261 self.key = node.value 262 if isinstance(node, SequenceNode): 263 self.key = loader.construct_object(node.value[0]) 264 self.default = loader.construct_object(node.value[1]) 265 266 def resolve(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any: 267 return getenv(self.key) or self.default 268 269 270class File(YAMLTag): 271 """Lookup file with optional default""" 272 273 path: str 274 default: Any | None 275 276 def __init__(self, loader: BlueprintLoader, node: ScalarNode | SequenceNode) -> None: 277 super().__init__() 278 self.default = None 279 if isinstance(node, ScalarNode): 280 self.path = node.value 281 if isinstance(node, SequenceNode): 282 self.path = loader.construct_object(node.value[0]) 283 self.default = loader.construct_object(node.value[1]) 284 285 def resolve(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any: 286 try: 287 with open(self.path, encoding="utf8") as _file: 288 return _file.read().strip() 289 except OSError as exc: 290 LOGGER.warning( 291 "Failed to read file. Falling back to default value", 292 path=self.path, 293 exc=exc, 294 ) 295 return self.default 296 297 298class Context(YAMLTag): 299 """Lookup key from instance context""" 300 301 key: str 302 default: Any | None 303 304 def __init__(self, loader: BlueprintLoader, node: ScalarNode | SequenceNode) -> None: 305 super().__init__() 306 self.default = None 307 if isinstance(node, ScalarNode): 308 self.key = node.value 309 if isinstance(node, SequenceNode): 310 self.key = loader.construct_object(node.value[0]) 311 self.default = loader.construct_object(node.value[1]) 312 313 def resolve(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any: 314 value = self.default 315 if self.key in blueprint.context: 316 value = blueprint.context[self.key] 317 if isinstance(value, YAMLTag): 318 return value.resolve(entry, blueprint) 319 return value 320 321 322class ParseJSON(YAMLTag): 323 """Parse JSON from context/env/etc value""" 324 325 raw: str 326 327 def __init__(self, loader: BlueprintLoader, node: ScalarNode) -> None: 328 super().__init__() 329 self.raw = node.value 330 331 def resolve(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any: 332 try: 333 return loads(self.raw) 334 except JSONDecodeError as exc: 335 raise EntryInvalidError.from_entry(exc, entry) from exc 336 337 338class Format(YAMLTag): 339 """Format a string""" 340 341 format_string: str 342 args: list[Any] 343 344 def __init__(self, loader: BlueprintLoader, node: SequenceNode) -> None: 345 super().__init__() 346 self.format_string = loader.construct_object(node.value[0]) 347 self.args = [] 348 for raw_node in node.value[1:]: 349 self.args.append(loader.construct_object(raw_node)) 350 351 def resolve(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any: 352 args = [] 353 for arg in self.args: 354 if isinstance(arg, YAMLTag): 355 args.append(arg.resolve(entry, blueprint)) 356 else: 357 args.append(arg) 358 359 try: 360 return self.format_string % tuple(args) 361 except TypeError as exc: 362 raise EntryInvalidError.from_entry(exc, entry) from exc 363 364 365class Find(YAMLTag): 366 """Find any object primary key""" 367 368 model_name: str | YAMLTag 369 conditions: list[list] 370 371 def __init__(self, loader: BlueprintLoader, node: SequenceNode) -> None: 372 super().__init__() 373 self.model_name = loader.construct_object(node.value[0]) 374 self.conditions = [] 375 for raw_node in node.value[1:]: 376 values = [] 377 for node_values in raw_node.value: 378 values.append(loader.construct_object(node_values)) 379 self.conditions.append(values) 380 381 def _get_instance(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any: 382 if isinstance(self.model_name, YAMLTag): 383 model_name = self.model_name.resolve(entry, blueprint) 384 else: 385 model_name = self.model_name 386 387 try: 388 model_class = apps.get_model(*model_name.split(".")) 389 except LookupError as exc: 390 raise EntryInvalidError.from_entry(exc, entry) from exc 391 392 query = Q() 393 for cond in self.conditions: 394 if isinstance(cond[0], YAMLTag): 395 query_key = cond[0].resolve(entry, blueprint) 396 else: 397 query_key = cond[0] 398 if isinstance(cond[1], YAMLTag): 399 query_value = cond[1].resolve(entry, blueprint) 400 else: 401 query_value = cond[1] 402 query &= Q(**{query_key: query_value}) 403 return model_class.objects.filter(query).first() 404 405 def resolve(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any: 406 instance = self._get_instance(entry, blueprint) 407 if instance: 408 return instance.pk 409 return None 410 411 412class FindObject(Find): 413 """Find any object""" 414 415 def resolve(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any: 416 instance = self._get_instance(entry, blueprint) 417 if not instance: 418 return None 419 if not isinstance(instance, SerializerModel): 420 raise EntryInvalidError.from_entry( 421 f"Model {self.model_name} is not resolvable through FindObject", entry 422 ) 423 return instance.serializer(instance=instance).data 424 425 426class Condition(YAMLTag): 427 """Convert all values to a single boolean""" 428 429 mode: Literal["AND", "NAND", "OR", "NOR", "XOR", "XNOR"] 430 args: list[Any] 431 432 _COMPARATORS = { 433 # Using all and any here instead of from operator import iand, ior 434 # to improve performance 435 "AND": all, 436 "NAND": lambda args: not all(args), 437 "OR": any, 438 "NOR": lambda args: not any(args), 439 "XOR": lambda args: reduce(ixor, args) if len(args) > 1 else args[0], 440 "XNOR": lambda args: not (reduce(ixor, args) if len(args) > 1 else args[0]), 441 } 442 443 def __init__(self, loader: BlueprintLoader, node: SequenceNode) -> None: 444 super().__init__() 445 self.mode = loader.construct_object(node.value[0]) 446 self.args = [] 447 for raw_node in node.value[1:]: 448 self.args.append(loader.construct_object(raw_node)) 449 450 def resolve(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any: 451 args = [] 452 for arg in self.args: 453 if isinstance(arg, YAMLTag): 454 args.append(arg.resolve(entry, blueprint)) 455 else: 456 args.append(arg) 457 458 if not args: 459 raise EntryInvalidError.from_entry( 460 "At least one value is required after mode selection.", entry 461 ) 462 463 try: 464 comparator = self._COMPARATORS[self.mode.upper()] 465 return comparator(tuple(bool(x) for x in args)) 466 except (TypeError, KeyError) as exc: 467 raise EntryInvalidError.from_entry(exc, entry) from exc 468 469 470class If(YAMLTag): 471 """Select YAML to use based on condition""" 472 473 condition: Any 474 when_true: Any 475 when_false: Any 476 477 def __init__(self, loader: BlueprintLoader, node: SequenceNode) -> None: 478 super().__init__() 479 self.condition = loader.construct_object(node.value[0]) 480 if len(node.value) == 1: 481 self.when_true = True 482 self.when_false = False 483 else: 484 self.when_true = loader.construct_object(node.value[1]) 485 self.when_false = loader.construct_object(node.value[2]) 486 487 def resolve(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any: 488 if isinstance(self.condition, YAMLTag): 489 condition = self.condition.resolve(entry, blueprint) 490 else: 491 condition = self.condition 492 493 try: 494 return entry.tag_resolver( 495 self.when_true if condition else self.when_false, 496 blueprint, 497 ) 498 except TypeError as exc: 499 raise EntryInvalidError.from_entry(exc, entry) from exc 500 501 502class Enumerate(YAMLTag, YAMLTagContext): 503 """Iterate over an iterable.""" 504 505 iterable: YAMLTag | Iterable 506 item_body: Any 507 output_body: Literal["SEQ", "MAP"] 508 509 _OUTPUT_BODIES = { 510 "SEQ": (list, lambda a, b: [*a, b]), 511 "MAP": ( 512 dict, 513 lambda a, b: always_merger.merge(a, {b[0]: b[1]} if isinstance(b, tuple | list) else b), 514 ), 515 } 516 517 def __init__(self, loader: BlueprintLoader, node: SequenceNode) -> None: 518 super().__init__() 519 self.iterable = loader.construct_object(node.value[0]) 520 self.output_body = loader.construct_object(node.value[1]) 521 self.item_body = loader.construct_object(node.value[2]) 522 self.__current_context: tuple[Any, Any] = tuple() 523 524 def get_context(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any: 525 return self.__current_context 526 527 def resolve(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any: 528 if isinstance(self.iterable, EnumeratedItem) and self.iterable.depth == 0: 529 raise EntryInvalidError.from_entry( 530 f"{self.__class__.__name__} tag's iterable references this tag's context. " 531 "This is a noop. Check you are setting depth bigger than 0.", 532 entry, 533 ) 534 535 if isinstance(self.iterable, YAMLTag): 536 iterable = self.iterable.resolve(entry, blueprint) 537 else: 538 iterable = self.iterable 539 540 if not isinstance(iterable, Iterable): 541 raise EntryInvalidError.from_entry( 542 f"{self.__class__.__name__}'s iterable must be an iterable " 543 "such as a sequence or a mapping", 544 entry, 545 ) 546 547 if isinstance(iterable, Mapping): 548 iterable = tuple(iterable.items()) 549 else: 550 iterable = tuple(enumerate(iterable)) 551 552 try: 553 output_class, add_fn = self._OUTPUT_BODIES[self.output_body.upper()] 554 except KeyError as exc: 555 raise EntryInvalidError.from_entry(exc, entry) from exc 556 557 result = output_class() 558 559 self.__current_context = tuple() 560 561 try: 562 for item in iterable: 563 self.__current_context = item 564 resolved_body = entry.tag_resolver(self.item_body, blueprint) 565 result = add_fn(result, resolved_body) 566 if not isinstance(result, output_class): 567 raise EntryInvalidError.from_entry( 568 f"Invalid {self.__class__.__name__} item found: {resolved_body}", entry 569 ) 570 finally: 571 self.__current_context = tuple() 572 573 return result 574 575 576class EnumeratedItem(YAMLTag): 577 """Get the current item value and index provided by an Enumerate tag context""" 578 579 depth: int 580 581 _SUPPORTED_CONTEXT_TAGS = (Enumerate,) 582 583 def __init__(self, _loader: BlueprintLoader, node: ScalarNode) -> None: 584 super().__init__() 585 self.depth = int(node.value) 586 587 def resolve(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any: 588 try: 589 context_tag: Enumerate = entry.get_tag_context( 590 depth=self.depth, 591 context_tag_type=EnumeratedItem._SUPPORTED_CONTEXT_TAGS, 592 ) 593 except ValueError as exc: 594 if self.depth == 0: 595 raise EntryInvalidError.from_entry( 596 f"{self.__class__.__name__} tags are only usable " 597 f"inside an {Enumerate.__name__} tag", 598 entry, 599 ) from exc 600 601 raise EntryInvalidError.from_entry( 602 f"{self.__class__.__name__} tag: {exc}", entry 603 ) from exc 604 605 return context_tag.get_context(entry, blueprint) 606 607 608class Index(EnumeratedItem): 609 """Get the current item index provided by an Enumerate tag context""" 610 611 def resolve(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any: 612 context = super().resolve(entry, blueprint) 613 614 try: 615 return context[0] 616 except IndexError as exc: # pragma: no cover 617 raise EntryInvalidError.from_entry(f"Empty/invalid context: {context}", entry) from exc 618 619 620class Value(EnumeratedItem): 621 """Get the current item value provided by an Enumerate tag context""" 622 623 def resolve(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any: 624 context = super().resolve(entry, blueprint) 625 626 try: 627 return context[1] 628 except IndexError as exc: # pragma: no cover 629 raise EntryInvalidError.from_entry(f"Empty/invalid context: {context}", entry) from exc 630 631 632class AtIndex(YAMLTag): 633 """Get value at index of a sequence or mapping""" 634 635 obj: YAMLTag | dict | list | tuple 636 attribute: int | str | YAMLTag 637 default: Any | UNSET 638 639 def __init__(self, loader: BlueprintLoader, node: SequenceNode) -> None: 640 super().__init__() 641 self.obj = loader.construct_object(node.value[0]) 642 self.attribute = loader.construct_object(node.value[1]) 643 if len(node.value) == 2: # noqa: PLR2004 644 self.default = UNSET 645 else: 646 self.default = loader.construct_object(node.value[2]) 647 648 def resolve(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any: 649 if isinstance(self.obj, YAMLTag): 650 obj = self.obj.resolve(entry, blueprint) 651 else: 652 obj = self.obj 653 if isinstance(self.attribute, YAMLTag): 654 attribute = self.attribute.resolve(entry, blueprint) 655 else: 656 attribute = self.attribute 657 658 if isinstance(obj, list | tuple): 659 try: 660 return obj[attribute] 661 except TypeError as exc: 662 raise EntryInvalidError.from_entry( 663 f"Invalid index for list: {attribute}", entry 664 ) from exc 665 except IndexError as exc: 666 if self.default is UNSET: 667 raise EntryInvalidError.from_entry( 668 f"Index out of range: {attribute}", entry 669 ) from exc 670 return self.default 671 if attribute in obj: 672 return obj[attribute] 673 else: 674 if self.default is UNSET: 675 raise EntryInvalidError.from_entry(f"Key does not exist: {attribute}", entry) 676 return self.default 677 678 679class BlueprintDumper(SafeDumper): 680 """Dump dataclasses to yaml""" 681 682 default_flow_style = False 683 684 def __init__(self, *args, **kwargs): 685 super().__init__(*args, **kwargs) 686 self.add_representer(UUID, lambda self, data: self.represent_str(str(data))) 687 self.add_representer(OrderedDict, lambda self, data: self.represent_dict(dict(data))) 688 self.add_representer(Enum, lambda self, data: self.represent_str(data.value)) 689 self.add_representer( 690 BlueprintEntryDesiredState, lambda self, data: self.represent_str(data.value) 691 ) 692 self.add_representer(None, lambda self, data: self.represent_str(str(data))) 693 694 def ignore_aliases(self, data): 695 """Don't use any YAML anchors""" 696 return True 697 698 def represent(self, data) -> None: 699 if is_dataclass(data): 700 701 def factory(items): 702 final_dict = dict(items) 703 # Remove internal state variables 704 final_dict.pop("_state", None) 705 # Future-proof to only remove the ID if we don't set a value 706 if "id" in final_dict and final_dict.get("id") is None: 707 final_dict.pop("id") 708 return final_dict 709 710 data = asdict(data, dict_factory=factory) 711 return super().represent(data) 712 713 714class BlueprintLoader(SafeLoader): 715 """Loader for blueprints with custom tag support""" 716 717 def __init__(self, *args, **kwargs): 718 super().__init__(*args, **kwargs) 719 self.add_constructor("!KeyOf", KeyOf) 720 self.add_constructor("!Find", Find) 721 self.add_constructor("!FindObject", FindObject) 722 self.add_constructor("!Context", Context) 723 self.add_constructor("!Format", Format) 724 self.add_constructor("!Condition", Condition) 725 self.add_constructor("!If", If) 726 self.add_constructor("!Env", Env) 727 self.add_constructor("!File", File) 728 self.add_constructor("!Enumerate", Enumerate) 729 self.add_constructor("!Value", Value) 730 self.add_constructor("!Index", Index) 731 self.add_constructor("!AtIndex", AtIndex) 732 self.add_constructor("!ParseJSON", ParseJSON) 733 734 735class EntryInvalidError(SentryIgnoredException): 736 """Error raised when an entry is invalid""" 737 738 entry_model: str | None 739 entry_id: str | None 740 validation_error: ValidationError | None 741 serializer: Serializer | None = None 742 743 def __init__( 744 self, *args: object, validation_error: ValidationError | None = None, **kwargs 745 ) -> None: 746 super().__init__(*args) 747 self.entry_model = None 748 self.entry_id = None 749 self.validation_error = validation_error 750 for key, value in kwargs.items(): 751 setattr(self, key, value) 752 753 @staticmethod 754 def from_entry( 755 msg_or_exc: str | Exception, entry: BlueprintEntry, *args, **kwargs 756 ) -> EntryInvalidError: 757 """Create EntryInvalidError with the context of an entry""" 758 error = EntryInvalidError(msg_or_exc, *args, **kwargs) 759 if isinstance(msg_or_exc, ValidationError): 760 error.validation_error = msg_or_exc 761 # Make sure the model and id are strings, depending where the error happens 762 # they might still be YAMLTag instances 763 error.entry_model = str(entry.model) 764 error.entry_id = str(entry.id) 765 return error
Used to test whether a key has not been set.
36def get_attrs(obj: SerializerModel) -> dict[str, Any]: 37 """Get object's attributes via their serializer, and convert it to a normal dict""" 38 serializer: Serializer = obj.serializer(obj) 39 data = dict(serializer.data) 40 41 for field_name, _field in serializer.fields.items(): 42 _field: Field 43 if field_name not in data: 44 continue 45 if _field.read_only: 46 data.pop(field_name, None) 47 if field_name.endswith("_set"): 48 data.pop(field_name, None) 49 return data
Get object's attributes via their serializer, and convert it to a normal dict
52@dataclass 53class BlueprintEntryState: 54 """State of a single instance""" 55 56 instance: Model | None = None
State of a single instance
59class BlueprintEntryDesiredState(Enum): 60 """State an entry should be reconciled to""" 61 62 ABSENT = "absent" 63 PRESENT = "present" 64 CREATED = "created" 65 MUST_CREATED = "must_created"
State an entry should be reconciled to
68@dataclass 69class BlueprintEntryPermission: 70 """Describe object-level permissions""" 71 72 permission: str | YAMLTag 73 user: int | YAMLTag | None = field(default=None) 74 role: str | YAMLTag | None = field(default=None)
Describe object-level permissions
77@dataclass 78class BlueprintEntry: 79 """Single entry of a blueprint""" 80 81 model: str | YAMLTag 82 state: BlueprintEntryDesiredState | YAMLTag = field(default=BlueprintEntryDesiredState.PRESENT) 83 conditions: list[Any] = field(default_factory=list) 84 identifiers: dict[str, Any] = field(default_factory=dict) 85 attrs: dict[str, Any] | None = field(default_factory=dict) 86 permissions: list[BlueprintEntryPermission] = field(default_factory=list) 87 88 id: str | None = None 89 90 _state: BlueprintEntryState = field(default_factory=BlueprintEntryState) 91 92 def __post_init__(self, *args, **kwargs) -> None: 93 self.__tag_contexts: list[YAMLTagContext] = [] 94 95 @staticmethod 96 def from_model(model: SerializerModel, *extra_identifier_names: str) -> BlueprintEntry: 97 """Convert a SerializerModel instance to a blueprint Entry""" 98 identifiers = { 99 "pk": model.pk, 100 } 101 all_attrs = get_attrs(model) 102 103 for extra_identifier_name in extra_identifier_names: 104 identifiers[extra_identifier_name] = all_attrs.pop(extra_identifier_name, None) 105 return BlueprintEntry( 106 identifiers=identifiers, 107 model=f"{model._meta.app_label}.{model._meta.model_name}", 108 attrs=all_attrs, 109 ) 110 111 def get_tag_context( 112 self, 113 depth: int = 0, 114 context_tag_type: type[YAMLTagContext] | tuple[YAMLTagContext, ...] | None = None, 115 ) -> YAMLTagContext: 116 """Get a YAMLTagContext object located at a certain depth in the tag tree""" 117 if depth < 0: 118 raise ValueError("depth must be a positive number or zero") 119 120 if context_tag_type: 121 contexts = [x for x in self.__tag_contexts if isinstance(x, context_tag_type)] 122 else: 123 contexts = self.__tag_contexts 124 125 try: 126 return contexts[-(depth + 1)] 127 except IndexError as exc: 128 raise ValueError(f"invalid depth: {depth}. Max depth: {len(contexts) - 1}") from exc 129 130 def tag_resolver(self, value: Any, blueprint: Blueprint) -> Any: 131 """Check if we have any special tags that need handling""" 132 val = copy(value) 133 134 if isinstance(value, YAMLTagContext): 135 self.__tag_contexts.append(value) 136 137 if isinstance(value, YAMLTag): 138 val = value.resolve(self, blueprint) 139 140 if isinstance(value, dict): 141 for key, inner_value in value.items(): 142 val[key] = self.tag_resolver(inner_value, blueprint) 143 if isinstance(value, list): 144 for idx, inner_value in enumerate(value): 145 val[idx] = self.tag_resolver(inner_value, blueprint) 146 147 if isinstance(value, YAMLTagContext): 148 self.__tag_contexts.pop() 149 150 return val 151 152 def get_attrs(self, blueprint: Blueprint) -> dict[str, Any]: 153 """Get attributes of this entry, with all yaml tags resolved""" 154 return self.tag_resolver(self.attrs, blueprint) 155 156 def get_identifiers(self, blueprint: Blueprint) -> dict[str, Any]: 157 """Get attributes of this entry, with all yaml tags resolved""" 158 return self.tag_resolver(self.identifiers, blueprint) 159 160 def get_state(self, blueprint: Blueprint) -> BlueprintEntryDesiredState: 161 """Get the blueprint state, with yaml tags resolved if present""" 162 return BlueprintEntryDesiredState(self.tag_resolver(self.state, blueprint)) 163 164 def get_model(self, blueprint: Blueprint) -> str: 165 """Get the blueprint model, with yaml tags resolved if present""" 166 return str(self.tag_resolver(self.model, blueprint)) 167 168 def get_permissions(self, blueprint: Blueprint) -> Generator[BlueprintEntryPermission]: 169 """Get permissions of this entry, with all yaml tags resolved""" 170 for perm in self.permissions: 171 yield BlueprintEntryPermission( 172 permission=self.tag_resolver(perm.permission, blueprint), 173 user=self.tag_resolver(perm.user, blueprint), 174 role=self.tag_resolver(perm.role, blueprint), 175 ) 176 177 def check_all_conditions_match(self, blueprint: Blueprint) -> bool: 178 """Check all conditions of this entry match (evaluate to True)""" 179 return all(self.tag_resolver(self.conditions, blueprint))
Single entry of a blueprint
95 @staticmethod 96 def from_model(model: SerializerModel, *extra_identifier_names: str) -> BlueprintEntry: 97 """Convert a SerializerModel instance to a blueprint Entry""" 98 identifiers = { 99 "pk": model.pk, 100 } 101 all_attrs = get_attrs(model) 102 103 for extra_identifier_name in extra_identifier_names: 104 identifiers[extra_identifier_name] = all_attrs.pop(extra_identifier_name, None) 105 return BlueprintEntry( 106 identifiers=identifiers, 107 model=f"{model._meta.app_label}.{model._meta.model_name}", 108 attrs=all_attrs, 109 )
Convert a SerializerModel instance to a blueprint Entry
111 def get_tag_context( 112 self, 113 depth: int = 0, 114 context_tag_type: type[YAMLTagContext] | tuple[YAMLTagContext, ...] | None = None, 115 ) -> YAMLTagContext: 116 """Get a YAMLTagContext object located at a certain depth in the tag tree""" 117 if depth < 0: 118 raise ValueError("depth must be a positive number or zero") 119 120 if context_tag_type: 121 contexts = [x for x in self.__tag_contexts if isinstance(x, context_tag_type)] 122 else: 123 contexts = self.__tag_contexts 124 125 try: 126 return contexts[-(depth + 1)] 127 except IndexError as exc: 128 raise ValueError(f"invalid depth: {depth}. Max depth: {len(contexts) - 1}") from exc
Get a YAMLTagContext object located at a certain depth in the tag tree
130 def tag_resolver(self, value: Any, blueprint: Blueprint) -> Any: 131 """Check if we have any special tags that need handling""" 132 val = copy(value) 133 134 if isinstance(value, YAMLTagContext): 135 self.__tag_contexts.append(value) 136 137 if isinstance(value, YAMLTag): 138 val = value.resolve(self, blueprint) 139 140 if isinstance(value, dict): 141 for key, inner_value in value.items(): 142 val[key] = self.tag_resolver(inner_value, blueprint) 143 if isinstance(value, list): 144 for idx, inner_value in enumerate(value): 145 val[idx] = self.tag_resolver(inner_value, blueprint) 146 147 if isinstance(value, YAMLTagContext): 148 self.__tag_contexts.pop() 149 150 return val
Check if we have any special tags that need handling
152 def get_attrs(self, blueprint: Blueprint) -> dict[str, Any]: 153 """Get attributes of this entry, with all yaml tags resolved""" 154 return self.tag_resolver(self.attrs, blueprint)
Get attributes of this entry, with all yaml tags resolved
156 def get_identifiers(self, blueprint: Blueprint) -> dict[str, Any]: 157 """Get attributes of this entry, with all yaml tags resolved""" 158 return self.tag_resolver(self.identifiers, blueprint)
Get attributes of this entry, with all yaml tags resolved
160 def get_state(self, blueprint: Blueprint) -> BlueprintEntryDesiredState: 161 """Get the blueprint state, with yaml tags resolved if present""" 162 return BlueprintEntryDesiredState(self.tag_resolver(self.state, blueprint))
Get the blueprint state, with yaml tags resolved if present
164 def get_model(self, blueprint: Blueprint) -> str: 165 """Get the blueprint model, with yaml tags resolved if present""" 166 return str(self.tag_resolver(self.model, blueprint))
Get the blueprint model, with yaml tags resolved if present
168 def get_permissions(self, blueprint: Blueprint) -> Generator[BlueprintEntryPermission]: 169 """Get permissions of this entry, with all yaml tags resolved""" 170 for perm in self.permissions: 171 yield BlueprintEntryPermission( 172 permission=self.tag_resolver(perm.permission, blueprint), 173 user=self.tag_resolver(perm.user, blueprint), 174 role=self.tag_resolver(perm.role, blueprint), 175 )
Get permissions of this entry, with all yaml tags resolved
182@dataclass 183class BlueprintMetadata: 184 """Optional blueprint metadata""" 185 186 name: str 187 labels: dict[str, str] = field(default_factory=dict)
Optional blueprint metadata
190@dataclass 191class Blueprint: 192 """Dataclass used for a full export""" 193 194 version: int = field(default=1) 195 entries: list[BlueprintEntry] | dict[str, list[BlueprintEntry]] = field(default_factory=list) 196 context: dict = field(default_factory=dict) 197 198 metadata: BlueprintMetadata | None = field(default=None) 199 200 def iter_entries(self) -> Iterable[BlueprintEntry]: 201 if isinstance(self.entries, dict): 202 for _section, entries in self.entries.items(): 203 yield from entries 204 else: 205 yield from self.entries
Dataclass used for a full export
208class YAMLTag: 209 """Base class for all YAML Tags""" 210 211 def __repr__(self) -> str: 212 return str(self.resolve(BlueprintEntry(""), Blueprint())) 213 214 def resolve(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any: 215 """Implement yaml tag logic""" 216 raise NotImplementedError
Base class for all YAML Tags
219class YAMLTagContext: 220 """Base class for all YAML Tag Contexts""" 221 222 def get_context(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any: 223 """Implement yaml tag context logic""" 224 raise NotImplementedError
Base class for all YAML Tag Contexts
227class KeyOf(YAMLTag): 228 """Reference another object by their ID""" 229 230 id_from: str 231 232 def __init__(self, loader: BlueprintLoader, node: ScalarNode) -> None: 233 super().__init__() 234 self.id_from = node.value 235 236 def resolve(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any: 237 for _entry in blueprint.iter_entries(): 238 if _entry.id == self.id_from and _entry._state.instance: 239 # Special handling for PolicyBindingModels, as they'll have a different PK 240 # which is used when creating policy bindings 241 if ( 242 isinstance(_entry._state.instance, PolicyBindingModel) 243 and entry.model.lower() == "authentik_policies.policybinding" 244 ): 245 return _entry._state.instance.pbm_uuid 246 return _entry._state.instance.pk 247 raise EntryInvalidError.from_entry( 248 f"KeyOf: failed to find entry with `id` of `{self.id_from}` and a model instance", entry 249 )
Reference another object by their ID
236 def resolve(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any: 237 for _entry in blueprint.iter_entries(): 238 if _entry.id == self.id_from and _entry._state.instance: 239 # Special handling for PolicyBindingModels, as they'll have a different PK 240 # which is used when creating policy bindings 241 if ( 242 isinstance(_entry._state.instance, PolicyBindingModel) 243 and entry.model.lower() == "authentik_policies.policybinding" 244 ): 245 return _entry._state.instance.pbm_uuid 246 return _entry._state.instance.pk 247 raise EntryInvalidError.from_entry( 248 f"KeyOf: failed to find entry with `id` of `{self.id_from}` and a model instance", entry 249 )
Implement yaml tag logic
252class Env(YAMLTag): 253 """Lookup environment variable with optional default""" 254 255 key: str 256 default: Any | None 257 258 def __init__(self, loader: BlueprintLoader, node: ScalarNode | SequenceNode) -> None: 259 super().__init__() 260 self.default = None 261 if isinstance(node, ScalarNode): 262 self.key = node.value 263 if isinstance(node, SequenceNode): 264 self.key = loader.construct_object(node.value[0]) 265 self.default = loader.construct_object(node.value[1]) 266 267 def resolve(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any: 268 return getenv(self.key) or self.default
Lookup environment variable with optional default
258 def __init__(self, loader: BlueprintLoader, node: ScalarNode | SequenceNode) -> None: 259 super().__init__() 260 self.default = None 261 if isinstance(node, ScalarNode): 262 self.key = node.value 263 if isinstance(node, SequenceNode): 264 self.key = loader.construct_object(node.value[0]) 265 self.default = loader.construct_object(node.value[1])
271class File(YAMLTag): 272 """Lookup file with optional default""" 273 274 path: str 275 default: Any | None 276 277 def __init__(self, loader: BlueprintLoader, node: ScalarNode | SequenceNode) -> None: 278 super().__init__() 279 self.default = None 280 if isinstance(node, ScalarNode): 281 self.path = node.value 282 if isinstance(node, SequenceNode): 283 self.path = loader.construct_object(node.value[0]) 284 self.default = loader.construct_object(node.value[1]) 285 286 def resolve(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any: 287 try: 288 with open(self.path, encoding="utf8") as _file: 289 return _file.read().strip() 290 except OSError as exc: 291 LOGGER.warning( 292 "Failed to read file. Falling back to default value", 293 path=self.path, 294 exc=exc, 295 ) 296 return self.default
Lookup file with optional default
277 def __init__(self, loader: BlueprintLoader, node: ScalarNode | SequenceNode) -> None: 278 super().__init__() 279 self.default = None 280 if isinstance(node, ScalarNode): 281 self.path = node.value 282 if isinstance(node, SequenceNode): 283 self.path = loader.construct_object(node.value[0]) 284 self.default = loader.construct_object(node.value[1])
286 def resolve(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any: 287 try: 288 with open(self.path, encoding="utf8") as _file: 289 return _file.read().strip() 290 except OSError as exc: 291 LOGGER.warning( 292 "Failed to read file. Falling back to default value", 293 path=self.path, 294 exc=exc, 295 ) 296 return self.default
Implement yaml tag logic
299class Context(YAMLTag): 300 """Lookup key from instance context""" 301 302 key: str 303 default: Any | None 304 305 def __init__(self, loader: BlueprintLoader, node: ScalarNode | SequenceNode) -> None: 306 super().__init__() 307 self.default = None 308 if isinstance(node, ScalarNode): 309 self.key = node.value 310 if isinstance(node, SequenceNode): 311 self.key = loader.construct_object(node.value[0]) 312 self.default = loader.construct_object(node.value[1]) 313 314 def resolve(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any: 315 value = self.default 316 if self.key in blueprint.context: 317 value = blueprint.context[self.key] 318 if isinstance(value, YAMLTag): 319 return value.resolve(entry, blueprint) 320 return value
Lookup key from instance context
305 def __init__(self, loader: BlueprintLoader, node: ScalarNode | SequenceNode) -> None: 306 super().__init__() 307 self.default = None 308 if isinstance(node, ScalarNode): 309 self.key = node.value 310 if isinstance(node, SequenceNode): 311 self.key = loader.construct_object(node.value[0]) 312 self.default = loader.construct_object(node.value[1])
323class ParseJSON(YAMLTag): 324 """Parse JSON from context/env/etc value""" 325 326 raw: str 327 328 def __init__(self, loader: BlueprintLoader, node: ScalarNode) -> None: 329 super().__init__() 330 self.raw = node.value 331 332 def resolve(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any: 333 try: 334 return loads(self.raw) 335 except JSONDecodeError as exc: 336 raise EntryInvalidError.from_entry(exc, entry) from exc
Parse JSON from context/env/etc value
339class Format(YAMLTag): 340 """Format a string""" 341 342 format_string: str 343 args: list[Any] 344 345 def __init__(self, loader: BlueprintLoader, node: SequenceNode) -> None: 346 super().__init__() 347 self.format_string = loader.construct_object(node.value[0]) 348 self.args = [] 349 for raw_node in node.value[1:]: 350 self.args.append(loader.construct_object(raw_node)) 351 352 def resolve(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any: 353 args = [] 354 for arg in self.args: 355 if isinstance(arg, YAMLTag): 356 args.append(arg.resolve(entry, blueprint)) 357 else: 358 args.append(arg) 359 360 try: 361 return self.format_string % tuple(args) 362 except TypeError as exc: 363 raise EntryInvalidError.from_entry(exc, entry) from exc
Format a string
352 def resolve(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any: 353 args = [] 354 for arg in self.args: 355 if isinstance(arg, YAMLTag): 356 args.append(arg.resolve(entry, blueprint)) 357 else: 358 args.append(arg) 359 360 try: 361 return self.format_string % tuple(args) 362 except TypeError as exc: 363 raise EntryInvalidError.from_entry(exc, entry) from exc
Implement yaml tag logic
366class Find(YAMLTag): 367 """Find any object primary key""" 368 369 model_name: str | YAMLTag 370 conditions: list[list] 371 372 def __init__(self, loader: BlueprintLoader, node: SequenceNode) -> None: 373 super().__init__() 374 self.model_name = loader.construct_object(node.value[0]) 375 self.conditions = [] 376 for raw_node in node.value[1:]: 377 values = [] 378 for node_values in raw_node.value: 379 values.append(loader.construct_object(node_values)) 380 self.conditions.append(values) 381 382 def _get_instance(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any: 383 if isinstance(self.model_name, YAMLTag): 384 model_name = self.model_name.resolve(entry, blueprint) 385 else: 386 model_name = self.model_name 387 388 try: 389 model_class = apps.get_model(*model_name.split(".")) 390 except LookupError as exc: 391 raise EntryInvalidError.from_entry(exc, entry) from exc 392 393 query = Q() 394 for cond in self.conditions: 395 if isinstance(cond[0], YAMLTag): 396 query_key = cond[0].resolve(entry, blueprint) 397 else: 398 query_key = cond[0] 399 if isinstance(cond[1], YAMLTag): 400 query_value = cond[1].resolve(entry, blueprint) 401 else: 402 query_value = cond[1] 403 query &= Q(**{query_key: query_value}) 404 return model_class.objects.filter(query).first() 405 406 def resolve(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any: 407 instance = self._get_instance(entry, blueprint) 408 if instance: 409 return instance.pk 410 return None
Find any object primary key
372 def __init__(self, loader: BlueprintLoader, node: SequenceNode) -> None: 373 super().__init__() 374 self.model_name = loader.construct_object(node.value[0]) 375 self.conditions = [] 376 for raw_node in node.value[1:]: 377 values = [] 378 for node_values in raw_node.value: 379 values.append(loader.construct_object(node_values)) 380 self.conditions.append(values)
413class FindObject(Find): 414 """Find any object""" 415 416 def resolve(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any: 417 instance = self._get_instance(entry, blueprint) 418 if not instance: 419 return None 420 if not isinstance(instance, SerializerModel): 421 raise EntryInvalidError.from_entry( 422 f"Model {self.model_name} is not resolvable through FindObject", entry 423 ) 424 return instance.serializer(instance=instance).data
Find any object
416 def resolve(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any: 417 instance = self._get_instance(entry, blueprint) 418 if not instance: 419 return None 420 if not isinstance(instance, SerializerModel): 421 raise EntryInvalidError.from_entry( 422 f"Model {self.model_name} is not resolvable through FindObject", entry 423 ) 424 return instance.serializer(instance=instance).data
Implement yaml tag logic
Inherited Members
427class Condition(YAMLTag): 428 """Convert all values to a single boolean""" 429 430 mode: Literal["AND", "NAND", "OR", "NOR", "XOR", "XNOR"] 431 args: list[Any] 432 433 _COMPARATORS = { 434 # Using all and any here instead of from operator import iand, ior 435 # to improve performance 436 "AND": all, 437 "NAND": lambda args: not all(args), 438 "OR": any, 439 "NOR": lambda args: not any(args), 440 "XOR": lambda args: reduce(ixor, args) if len(args) > 1 else args[0], 441 "XNOR": lambda args: not (reduce(ixor, args) if len(args) > 1 else args[0]), 442 } 443 444 def __init__(self, loader: BlueprintLoader, node: SequenceNode) -> None: 445 super().__init__() 446 self.mode = loader.construct_object(node.value[0]) 447 self.args = [] 448 for raw_node in node.value[1:]: 449 self.args.append(loader.construct_object(raw_node)) 450 451 def resolve(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any: 452 args = [] 453 for arg in self.args: 454 if isinstance(arg, YAMLTag): 455 args.append(arg.resolve(entry, blueprint)) 456 else: 457 args.append(arg) 458 459 if not args: 460 raise EntryInvalidError.from_entry( 461 "At least one value is required after mode selection.", entry 462 ) 463 464 try: 465 comparator = self._COMPARATORS[self.mode.upper()] 466 return comparator(tuple(bool(x) for x in args)) 467 except (TypeError, KeyError) as exc: 468 raise EntryInvalidError.from_entry(exc, entry) from exc
Convert all values to a single boolean
451 def resolve(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any: 452 args = [] 453 for arg in self.args: 454 if isinstance(arg, YAMLTag): 455 args.append(arg.resolve(entry, blueprint)) 456 else: 457 args.append(arg) 458 459 if not args: 460 raise EntryInvalidError.from_entry( 461 "At least one value is required after mode selection.", entry 462 ) 463 464 try: 465 comparator = self._COMPARATORS[self.mode.upper()] 466 return comparator(tuple(bool(x) for x in args)) 467 except (TypeError, KeyError) as exc: 468 raise EntryInvalidError.from_entry(exc, entry) from exc
Implement yaml tag logic
471class If(YAMLTag): 472 """Select YAML to use based on condition""" 473 474 condition: Any 475 when_true: Any 476 when_false: Any 477 478 def __init__(self, loader: BlueprintLoader, node: SequenceNode) -> None: 479 super().__init__() 480 self.condition = loader.construct_object(node.value[0]) 481 if len(node.value) == 1: 482 self.when_true = True 483 self.when_false = False 484 else: 485 self.when_true = loader.construct_object(node.value[1]) 486 self.when_false = loader.construct_object(node.value[2]) 487 488 def resolve(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any: 489 if isinstance(self.condition, YAMLTag): 490 condition = self.condition.resolve(entry, blueprint) 491 else: 492 condition = self.condition 493 494 try: 495 return entry.tag_resolver( 496 self.when_true if condition else self.when_false, 497 blueprint, 498 ) 499 except TypeError as exc: 500 raise EntryInvalidError.from_entry(exc, entry) from exc
Select YAML to use based on condition
478 def __init__(self, loader: BlueprintLoader, node: SequenceNode) -> None: 479 super().__init__() 480 self.condition = loader.construct_object(node.value[0]) 481 if len(node.value) == 1: 482 self.when_true = True 483 self.when_false = False 484 else: 485 self.when_true = loader.construct_object(node.value[1]) 486 self.when_false = loader.construct_object(node.value[2])
488 def resolve(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any: 489 if isinstance(self.condition, YAMLTag): 490 condition = self.condition.resolve(entry, blueprint) 491 else: 492 condition = self.condition 493 494 try: 495 return entry.tag_resolver( 496 self.when_true if condition else self.when_false, 497 blueprint, 498 ) 499 except TypeError as exc: 500 raise EntryInvalidError.from_entry(exc, entry) from exc
Implement yaml tag logic
503class Enumerate(YAMLTag, YAMLTagContext): 504 """Iterate over an iterable.""" 505 506 iterable: YAMLTag | Iterable 507 item_body: Any 508 output_body: Literal["SEQ", "MAP"] 509 510 _OUTPUT_BODIES = { 511 "SEQ": (list, lambda a, b: [*a, b]), 512 "MAP": ( 513 dict, 514 lambda a, b: always_merger.merge(a, {b[0]: b[1]} if isinstance(b, tuple | list) else b), 515 ), 516 } 517 518 def __init__(self, loader: BlueprintLoader, node: SequenceNode) -> None: 519 super().__init__() 520 self.iterable = loader.construct_object(node.value[0]) 521 self.output_body = loader.construct_object(node.value[1]) 522 self.item_body = loader.construct_object(node.value[2]) 523 self.__current_context: tuple[Any, Any] = tuple() 524 525 def get_context(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any: 526 return self.__current_context 527 528 def resolve(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any: 529 if isinstance(self.iterable, EnumeratedItem) and self.iterable.depth == 0: 530 raise EntryInvalidError.from_entry( 531 f"{self.__class__.__name__} tag's iterable references this tag's context. " 532 "This is a noop. Check you are setting depth bigger than 0.", 533 entry, 534 ) 535 536 if isinstance(self.iterable, YAMLTag): 537 iterable = self.iterable.resolve(entry, blueprint) 538 else: 539 iterable = self.iterable 540 541 if not isinstance(iterable, Iterable): 542 raise EntryInvalidError.from_entry( 543 f"{self.__class__.__name__}'s iterable must be an iterable " 544 "such as a sequence or a mapping", 545 entry, 546 ) 547 548 if isinstance(iterable, Mapping): 549 iterable = tuple(iterable.items()) 550 else: 551 iterable = tuple(enumerate(iterable)) 552 553 try: 554 output_class, add_fn = self._OUTPUT_BODIES[self.output_body.upper()] 555 except KeyError as exc: 556 raise EntryInvalidError.from_entry(exc, entry) from exc 557 558 result = output_class() 559 560 self.__current_context = tuple() 561 562 try: 563 for item in iterable: 564 self.__current_context = item 565 resolved_body = entry.tag_resolver(self.item_body, blueprint) 566 result = add_fn(result, resolved_body) 567 if not isinstance(result, output_class): 568 raise EntryInvalidError.from_entry( 569 f"Invalid {self.__class__.__name__} item found: {resolved_body}", entry 570 ) 571 finally: 572 self.__current_context = tuple() 573 574 return result
Iterate over an iterable.
518 def __init__(self, loader: BlueprintLoader, node: SequenceNode) -> None: 519 super().__init__() 520 self.iterable = loader.construct_object(node.value[0]) 521 self.output_body = loader.construct_object(node.value[1]) 522 self.item_body = loader.construct_object(node.value[2]) 523 self.__current_context: tuple[Any, Any] = tuple()
525 def get_context(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any: 526 return self.__current_context
Implement yaml tag context logic
528 def resolve(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any: 529 if isinstance(self.iterable, EnumeratedItem) and self.iterable.depth == 0: 530 raise EntryInvalidError.from_entry( 531 f"{self.__class__.__name__} tag's iterable references this tag's context. " 532 "This is a noop. Check you are setting depth bigger than 0.", 533 entry, 534 ) 535 536 if isinstance(self.iterable, YAMLTag): 537 iterable = self.iterable.resolve(entry, blueprint) 538 else: 539 iterable = self.iterable 540 541 if not isinstance(iterable, Iterable): 542 raise EntryInvalidError.from_entry( 543 f"{self.__class__.__name__}'s iterable must be an iterable " 544 "such as a sequence or a mapping", 545 entry, 546 ) 547 548 if isinstance(iterable, Mapping): 549 iterable = tuple(iterable.items()) 550 else: 551 iterable = tuple(enumerate(iterable)) 552 553 try: 554 output_class, add_fn = self._OUTPUT_BODIES[self.output_body.upper()] 555 except KeyError as exc: 556 raise EntryInvalidError.from_entry(exc, entry) from exc 557 558 result = output_class() 559 560 self.__current_context = tuple() 561 562 try: 563 for item in iterable: 564 self.__current_context = item 565 resolved_body = entry.tag_resolver(self.item_body, blueprint) 566 result = add_fn(result, resolved_body) 567 if not isinstance(result, output_class): 568 raise EntryInvalidError.from_entry( 569 f"Invalid {self.__class__.__name__} item found: {resolved_body}", entry 570 ) 571 finally: 572 self.__current_context = tuple() 573 574 return result
Implement yaml tag logic
577class EnumeratedItem(YAMLTag): 578 """Get the current item value and index provided by an Enumerate tag context""" 579 580 depth: int 581 582 _SUPPORTED_CONTEXT_TAGS = (Enumerate,) 583 584 def __init__(self, _loader: BlueprintLoader, node: ScalarNode) -> None: 585 super().__init__() 586 self.depth = int(node.value) 587 588 def resolve(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any: 589 try: 590 context_tag: Enumerate = entry.get_tag_context( 591 depth=self.depth, 592 context_tag_type=EnumeratedItem._SUPPORTED_CONTEXT_TAGS, 593 ) 594 except ValueError as exc: 595 if self.depth == 0: 596 raise EntryInvalidError.from_entry( 597 f"{self.__class__.__name__} tags are only usable " 598 f"inside an {Enumerate.__name__} tag", 599 entry, 600 ) from exc 601 602 raise EntryInvalidError.from_entry( 603 f"{self.__class__.__name__} tag: {exc}", entry 604 ) from exc 605 606 return context_tag.get_context(entry, blueprint)
Get the current item value and index provided by an Enumerate tag context
588 def resolve(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any: 589 try: 590 context_tag: Enumerate = entry.get_tag_context( 591 depth=self.depth, 592 context_tag_type=EnumeratedItem._SUPPORTED_CONTEXT_TAGS, 593 ) 594 except ValueError as exc: 595 if self.depth == 0: 596 raise EntryInvalidError.from_entry( 597 f"{self.__class__.__name__} tags are only usable " 598 f"inside an {Enumerate.__name__} tag", 599 entry, 600 ) from exc 601 602 raise EntryInvalidError.from_entry( 603 f"{self.__class__.__name__} tag: {exc}", entry 604 ) from exc 605 606 return context_tag.get_context(entry, blueprint)
Implement yaml tag logic
609class Index(EnumeratedItem): 610 """Get the current item index provided by an Enumerate tag context""" 611 612 def resolve(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any: 613 context = super().resolve(entry, blueprint) 614 615 try: 616 return context[0] 617 except IndexError as exc: # pragma: no cover 618 raise EntryInvalidError.from_entry(f"Empty/invalid context: {context}", entry) from exc
Get the current item index provided by an Enumerate tag context
612 def resolve(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any: 613 context = super().resolve(entry, blueprint) 614 615 try: 616 return context[0] 617 except IndexError as exc: # pragma: no cover 618 raise EntryInvalidError.from_entry(f"Empty/invalid context: {context}", entry) from exc
Implement yaml tag logic
Inherited Members
621class Value(EnumeratedItem): 622 """Get the current item value provided by an Enumerate tag context""" 623 624 def resolve(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any: 625 context = super().resolve(entry, blueprint) 626 627 try: 628 return context[1] 629 except IndexError as exc: # pragma: no cover 630 raise EntryInvalidError.from_entry(f"Empty/invalid context: {context}", entry) from exc
Get the current item value provided by an Enumerate tag context
624 def resolve(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any: 625 context = super().resolve(entry, blueprint) 626 627 try: 628 return context[1] 629 except IndexError as exc: # pragma: no cover 630 raise EntryInvalidError.from_entry(f"Empty/invalid context: {context}", entry) from exc
Implement yaml tag logic
Inherited Members
633class AtIndex(YAMLTag): 634 """Get value at index of a sequence or mapping""" 635 636 obj: YAMLTag | dict | list | tuple 637 attribute: int | str | YAMLTag 638 default: Any | UNSET 639 640 def __init__(self, loader: BlueprintLoader, node: SequenceNode) -> None: 641 super().__init__() 642 self.obj = loader.construct_object(node.value[0]) 643 self.attribute = loader.construct_object(node.value[1]) 644 if len(node.value) == 2: # noqa: PLR2004 645 self.default = UNSET 646 else: 647 self.default = loader.construct_object(node.value[2]) 648 649 def resolve(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any: 650 if isinstance(self.obj, YAMLTag): 651 obj = self.obj.resolve(entry, blueprint) 652 else: 653 obj = self.obj 654 if isinstance(self.attribute, YAMLTag): 655 attribute = self.attribute.resolve(entry, blueprint) 656 else: 657 attribute = self.attribute 658 659 if isinstance(obj, list | tuple): 660 try: 661 return obj[attribute] 662 except TypeError as exc: 663 raise EntryInvalidError.from_entry( 664 f"Invalid index for list: {attribute}", entry 665 ) from exc 666 except IndexError as exc: 667 if self.default is UNSET: 668 raise EntryInvalidError.from_entry( 669 f"Index out of range: {attribute}", entry 670 ) from exc 671 return self.default 672 if attribute in obj: 673 return obj[attribute] 674 else: 675 if self.default is UNSET: 676 raise EntryInvalidError.from_entry(f"Key does not exist: {attribute}", entry) 677 return self.default
Get value at index of a sequence or mapping
640 def __init__(self, loader: BlueprintLoader, node: SequenceNode) -> None: 641 super().__init__() 642 self.obj = loader.construct_object(node.value[0]) 643 self.attribute = loader.construct_object(node.value[1]) 644 if len(node.value) == 2: # noqa: PLR2004 645 self.default = UNSET 646 else: 647 self.default = loader.construct_object(node.value[2])
649 def resolve(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any: 650 if isinstance(self.obj, YAMLTag): 651 obj = self.obj.resolve(entry, blueprint) 652 else: 653 obj = self.obj 654 if isinstance(self.attribute, YAMLTag): 655 attribute = self.attribute.resolve(entry, blueprint) 656 else: 657 attribute = self.attribute 658 659 if isinstance(obj, list | tuple): 660 try: 661 return obj[attribute] 662 except TypeError as exc: 663 raise EntryInvalidError.from_entry( 664 f"Invalid index for list: {attribute}", entry 665 ) from exc 666 except IndexError as exc: 667 if self.default is UNSET: 668 raise EntryInvalidError.from_entry( 669 f"Index out of range: {attribute}", entry 670 ) from exc 671 return self.default 672 if attribute in obj: 673 return obj[attribute] 674 else: 675 if self.default is UNSET: 676 raise EntryInvalidError.from_entry(f"Key does not exist: {attribute}", entry) 677 return self.default
Implement yaml tag logic
680class BlueprintDumper(SafeDumper): 681 """Dump dataclasses to yaml""" 682 683 default_flow_style = False 684 685 def __init__(self, *args, **kwargs): 686 super().__init__(*args, **kwargs) 687 self.add_representer(UUID, lambda self, data: self.represent_str(str(data))) 688 self.add_representer(OrderedDict, lambda self, data: self.represent_dict(dict(data))) 689 self.add_representer(Enum, lambda self, data: self.represent_str(data.value)) 690 self.add_representer( 691 BlueprintEntryDesiredState, lambda self, data: self.represent_str(data.value) 692 ) 693 self.add_representer(None, lambda self, data: self.represent_str(str(data))) 694 695 def ignore_aliases(self, data): 696 """Don't use any YAML anchors""" 697 return True 698 699 def represent(self, data) -> None: 700 if is_dataclass(data): 701 702 def factory(items): 703 final_dict = dict(items) 704 # Remove internal state variables 705 final_dict.pop("_state", None) 706 # Future-proof to only remove the ID if we don't set a value 707 if "id" in final_dict and final_dict.get("id") is None: 708 final_dict.pop("id") 709 return final_dict 710 711 data = asdict(data, dict_factory=factory) 712 return super().represent(data)
Dump dataclasses to yaml
685 def __init__(self, *args, **kwargs): 686 super().__init__(*args, **kwargs) 687 self.add_representer(UUID, lambda self, data: self.represent_str(str(data))) 688 self.add_representer(OrderedDict, lambda self, data: self.represent_dict(dict(data))) 689 self.add_representer(Enum, lambda self, data: self.represent_str(data.value)) 690 self.add_representer( 691 BlueprintEntryDesiredState, lambda self, data: self.represent_str(data.value) 692 ) 693 self.add_representer(None, lambda self, data: self.represent_str(str(data)))
699 def represent(self, data) -> None: 700 if is_dataclass(data): 701 702 def factory(items): 703 final_dict = dict(items) 704 # Remove internal state variables 705 final_dict.pop("_state", None) 706 # Future-proof to only remove the ID if we don't set a value 707 if "id" in final_dict and final_dict.get("id") is None: 708 final_dict.pop("id") 709 return final_dict 710 711 data = asdict(data, dict_factory=factory) 712 return super().represent(data)
715class BlueprintLoader(SafeLoader): 716 """Loader for blueprints with custom tag support""" 717 718 def __init__(self, *args, **kwargs): 719 super().__init__(*args, **kwargs) 720 self.add_constructor("!KeyOf", KeyOf) 721 self.add_constructor("!Find", Find) 722 self.add_constructor("!FindObject", FindObject) 723 self.add_constructor("!Context", Context) 724 self.add_constructor("!Format", Format) 725 self.add_constructor("!Condition", Condition) 726 self.add_constructor("!If", If) 727 self.add_constructor("!Env", Env) 728 self.add_constructor("!File", File) 729 self.add_constructor("!Enumerate", Enumerate) 730 self.add_constructor("!Value", Value) 731 self.add_constructor("!Index", Index) 732 self.add_constructor("!AtIndex", AtIndex) 733 self.add_constructor("!ParseJSON", ParseJSON)
Loader for blueprints with custom tag support
718 def __init__(self, *args, **kwargs): 719 super().__init__(*args, **kwargs) 720 self.add_constructor("!KeyOf", KeyOf) 721 self.add_constructor("!Find", Find) 722 self.add_constructor("!FindObject", FindObject) 723 self.add_constructor("!Context", Context) 724 self.add_constructor("!Format", Format) 725 self.add_constructor("!Condition", Condition) 726 self.add_constructor("!If", If) 727 self.add_constructor("!Env", Env) 728 self.add_constructor("!File", File) 729 self.add_constructor("!Enumerate", Enumerate) 730 self.add_constructor("!Value", Value) 731 self.add_constructor("!Index", Index) 732 self.add_constructor("!AtIndex", AtIndex) 733 self.add_constructor("!ParseJSON", ParseJSON)
Initialize the scanner.
736class EntryInvalidError(SentryIgnoredException): 737 """Error raised when an entry is invalid""" 738 739 entry_model: str | None 740 entry_id: str | None 741 validation_error: ValidationError | None 742 serializer: Serializer | None = None 743 744 def __init__( 745 self, *args: object, validation_error: ValidationError | None = None, **kwargs 746 ) -> None: 747 super().__init__(*args) 748 self.entry_model = None 749 self.entry_id = None 750 self.validation_error = validation_error 751 for key, value in kwargs.items(): 752 setattr(self, key, value) 753 754 @staticmethod 755 def from_entry( 756 msg_or_exc: str | Exception, entry: BlueprintEntry, *args, **kwargs 757 ) -> EntryInvalidError: 758 """Create EntryInvalidError with the context of an entry""" 759 error = EntryInvalidError(msg_or_exc, *args, **kwargs) 760 if isinstance(msg_or_exc, ValidationError): 761 error.validation_error = msg_or_exc 762 # Make sure the model and id are strings, depending where the error happens 763 # they might still be YAMLTag instances 764 error.entry_model = str(entry.model) 765 error.entry_id = str(entry.id) 766 return error
Error raised when an entry is invalid
744 def __init__( 745 self, *args: object, validation_error: ValidationError | None = None, **kwargs 746 ) -> None: 747 super().__init__(*args) 748 self.entry_model = None 749 self.entry_id = None 750 self.validation_error = validation_error 751 for key, value in kwargs.items(): 752 setattr(self, key, value)
754 @staticmethod 755 def from_entry( 756 msg_or_exc: str | Exception, entry: BlueprintEntry, *args, **kwargs 757 ) -> EntryInvalidError: 758 """Create EntryInvalidError with the context of an entry""" 759 error = EntryInvalidError(msg_or_exc, *args, **kwargs) 760 if isinstance(msg_or_exc, ValidationError): 761 error.validation_error = msg_or_exc 762 # Make sure the model and id are strings, depending where the error happens 763 # they might still be YAMLTag instances 764 error.entry_model = str(entry.model) 765 error.entry_id = str(entry.id) 766 return error
Create EntryInvalidError with the context of an entry