authentik.blueprints.v1.tasks

v1 blueprints tasks

  1"""v1 blueprints tasks"""
  2
  3from dataclasses import asdict, dataclass, field
  4from hashlib import sha512
  5from pathlib import Path
  6from sys import platform
  7from uuid import UUID
  8
  9from dacite.core import from_dict
 10from django.conf import settings
 11from django.db import DatabaseError, InternalError, ProgrammingError
 12from django.utils.text import slugify
 13from django.utils.timezone import now
 14from django.utils.translation import gettext_lazy as _
 15from dramatiq.actor import actor
 16from dramatiq.middleware import Middleware
 17from structlog.stdlib import get_logger
 18from watchdog.events import (
 19    FileCreatedEvent,
 20    FileModifiedEvent,
 21    FileSystemEvent,
 22    FileSystemEventHandler,
 23)
 24from watchdog.observers import Observer
 25from yaml import load
 26from yaml.error import YAMLError
 27
 28from authentik.blueprints.models import (
 29    BlueprintInstance,
 30    BlueprintInstanceStatus,
 31    BlueprintRetrievalFailed,
 32)
 33from authentik.blueprints.v1.common import BlueprintLoader, BlueprintMetadata, EntryInvalidError
 34from authentik.blueprints.v1.importer import Importer
 35from authentik.blueprints.v1.labels import LABEL_AUTHENTIK_INSTANTIATE
 36from authentik.blueprints.v1.oci import OCI_PREFIX
 37from authentik.events.logs import capture_logs
 38from authentik.events.utils import sanitize_dict
 39from authentik.lib.config import CONFIG
 40from authentik.tasks.apps import PRIORITY_HIGH
 41from authentik.tasks.middleware import CurrentTask
 42from authentik.tasks.schedules.models import Schedule
 43from authentik.tenants.models import Tenant
 44
 45LOGGER = get_logger()
 46
 47
 48@dataclass
 49class BlueprintFile:
 50    """Basic info about a blueprint file"""
 51
 52    path: str
 53    version: int
 54    hash: str
 55    last_m: int
 56    meta: BlueprintMetadata | None = field(default=None)
 57
 58
 59class BlueprintWatcherMiddleware(Middleware):
 60    def start_blueprint_watcher(self):
 61        """Start blueprint watcher"""
 62        observer = Observer()
 63        kwargs = {}
 64        if platform.startswith("linux"):
 65            kwargs["event_filter"] = (FileCreatedEvent, FileModifiedEvent)
 66        observer.schedule(
 67            BlueprintEventHandler(), CONFIG.get("blueprints_dir"), recursive=True, **kwargs
 68        )
 69        observer.start()
 70
 71    def after_worker_boot(self, broker, worker):
 72        if not settings.TEST:
 73            self.start_blueprint_watcher()
 74
 75
 76class BlueprintEventHandler(FileSystemEventHandler):
 77    """Event handler for blueprint events"""
 78
 79    # We only ever get creation and modification events.
 80    # See the creation of the Observer instance above for the event filtering.
 81
 82    # Even though we filter to only get file events, we might still get
 83    # directory events as some implementations such as inotify do not support
 84    # filtering on file/directory.
 85
 86    def dispatch(self, event: FileSystemEvent) -> None:
 87        """Call specific event handler method. Ignores directory changes."""
 88        if event.is_directory:
 89            return None
 90        return super().dispatch(event)
 91
 92    def on_created(self, event: FileSystemEvent):
 93        """Process file creation"""
 94        LOGGER.debug("new blueprint file created, starting discovery")
 95        for tenant in Tenant.objects.filter(ready=True):
 96            with tenant:
 97                Schedule.dispatch_by_actor(blueprints_discovery)
 98
 99    def on_modified(self, event: FileSystemEvent):
100        """Process file modification"""
101        path = Path(event.src_path)
102        root = Path(CONFIG.get("blueprints_dir")).absolute()
103        rel_path = str(path.relative_to(root))
104        for tenant in Tenant.objects.filter(ready=True):
105            with tenant:
106                for instance in BlueprintInstance.objects.filter(path=rel_path, enabled=True):
107                    LOGGER.debug("modified blueprint file, starting apply", instance=instance)
108                    apply_blueprint.send_with_options(args=(instance.pk,), rel_obj=instance)
109
110
111@actor(
112    description=_("Find blueprints as `blueprints_find` does, but return a safe dict."),
113    priority=PRIORITY_HIGH,
114)
115def blueprints_find_dict():
116    blueprints = []
117    for blueprint in blueprints_find():
118        blueprints.append(sanitize_dict(asdict(blueprint)))
119    return blueprints
120
121
122def blueprints_find() -> list[BlueprintFile]:
123    """Find blueprints and return valid ones"""
124    blueprints = []
125    root = Path(CONFIG.get("blueprints_dir"))
126    for path in root.rglob("**/*.yaml"):
127        rel_path = path.relative_to(root)
128        # Check if any part in the path starts with a dot and assume a hidden file
129        if any(part for part in path.parts if part.startswith(".")):
130            continue
131        with open(path, encoding="utf-8") as blueprint_file:
132            try:
133                raw_blueprint = load(blueprint_file.read(), BlueprintLoader)
134            except YAMLError as exc:
135                raw_blueprint = None
136                LOGGER.warning("failed to parse blueprint", exc=exc, path=str(rel_path))
137            if not raw_blueprint:
138                continue
139            metadata = raw_blueprint.get("metadata", None)
140            version = raw_blueprint.get("version", 1)
141            if version != 1:
142                LOGGER.warning("invalid blueprint version", version=version, path=str(rel_path))
143                continue
144        file_hash = sha512(path.read_bytes()).hexdigest()
145        blueprint = BlueprintFile(str(rel_path), version, file_hash, int(path.stat().st_mtime))
146        blueprint.meta = from_dict(BlueprintMetadata, metadata) if metadata else None
147        blueprints.append(blueprint)
148    return blueprints
149
150
151@actor(description=_("Find blueprints and check if they need to be created in the database."))
152def blueprints_discovery(path: str | None = None):
153    self = CurrentTask.get_task()
154    count = 0
155    for blueprint in blueprints_find():
156        if path and blueprint.path != path:
157            continue
158        check_blueprint_v1_file(blueprint)
159        count += 1
160    self.info(f"Successfully imported {count} files.")
161
162
163def check_blueprint_v1_file(blueprint: BlueprintFile):
164    """Check if blueprint should be imported"""
165    instance: BlueprintInstance = BlueprintInstance.objects.filter(path=blueprint.path).first()
166    if (
167        blueprint.meta
168        and blueprint.meta.labels.get(LABEL_AUTHENTIK_INSTANTIATE, "").lower() == "false"
169    ):
170        return
171    if not instance:
172        instance = BlueprintInstance(
173            name=blueprint.meta.name if blueprint.meta else str(blueprint.path),
174            path=blueprint.path,
175            context={},
176            status=BlueprintInstanceStatus.UNKNOWN,
177            enabled=True,
178            managed_models=[],
179            metadata={},
180        )
181        instance.save()
182        LOGGER.info(
183            "Creating new blueprint instance from file", instance=instance, path=instance.path
184        )
185    if instance.last_applied_hash != blueprint.hash:
186        LOGGER.info("Applying blueprint due to changed file", instance=instance, path=instance.path)
187        apply_blueprint.send_with_options(args=(instance.pk,), rel_obj=instance)
188
189
190@actor(description=_("Apply single blueprint."))
191def apply_blueprint(instance_pk: UUID):
192    self = CurrentTask.get_task()
193    self.set_uid(str(instance_pk))
194    instance: BlueprintInstance | None = None
195    try:
196        instance: BlueprintInstance = BlueprintInstance.objects.filter(pk=instance_pk).first()
197        if not instance:
198            self.warning(f"Could not find blueprint {instance_pk}, skipping")
199            return
200        self.set_uid(slugify(instance.name))
201        if not instance.enabled:
202            self.info(f"Blueprint {instance.name} is disabled, skipping")
203            return
204        blueprint_content = instance.retrieve()
205        file_hash = sha512(blueprint_content.encode()).hexdigest()
206        importer = Importer.from_string(blueprint_content, instance.context)
207        if importer.blueprint.metadata:
208            instance.metadata = asdict(importer.blueprint.metadata)
209        valid, logs = importer.validate()
210        if not valid:
211            instance.status = BlueprintInstanceStatus.ERROR
212            instance.save()
213            self.logs(logs)
214            return
215        with capture_logs() as logs:
216            applied = importer.apply()
217            if not applied:
218                instance.status = BlueprintInstanceStatus.ERROR
219                instance.save()
220                self.logs(logs)
221                return
222        instance.status = BlueprintInstanceStatus.SUCCESSFUL
223        instance.last_applied_hash = file_hash
224        instance.last_applied = now()
225    except (
226        OSError,
227        DatabaseError,
228        ProgrammingError,
229        InternalError,
230        BlueprintRetrievalFailed,
231        EntryInvalidError,
232    ) as exc:
233        if instance:
234            instance.status = BlueprintInstanceStatus.ERROR
235        self.error(exc)
236    finally:
237        if instance:
238            instance.save()
239
240
241@actor(description=_("Remove blueprints which couldn't be fetched."))
242def clear_failed_blueprints():
243    # Exclude OCI blueprints as those might be temporarily unavailable
244    for blueprint in BlueprintInstance.objects.exclude(path__startswith=OCI_PREFIX):
245        try:
246            blueprint.retrieve()
247        except BlueprintRetrievalFailed:
248            blueprint.delete()
LOGGER = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
@dataclass
class BlueprintFile:
49@dataclass
50class BlueprintFile:
51    """Basic info about a blueprint file"""
52
53    path: str
54    version: int
55    hash: str
56    last_m: int
57    meta: BlueprintMetadata | None = field(default=None)

Basic info about a blueprint file

BlueprintFile( path: str, version: int, hash: str, last_m: int, meta: authentik.blueprints.v1.common.BlueprintMetadata | None = None)
path: str
version: int
hash: str
last_m: int
class BlueprintWatcherMiddleware(dramatiq.middleware.middleware.Middleware):
60class BlueprintWatcherMiddleware(Middleware):
61    def start_blueprint_watcher(self):
62        """Start blueprint watcher"""
63        observer = Observer()
64        kwargs = {}
65        if platform.startswith("linux"):
66            kwargs["event_filter"] = (FileCreatedEvent, FileModifiedEvent)
67        observer.schedule(
68            BlueprintEventHandler(), CONFIG.get("blueprints_dir"), recursive=True, **kwargs
69        )
70        observer.start()
71
72    def after_worker_boot(self, broker, worker):
73        if not settings.TEST:
74            self.start_blueprint_watcher()

Base class for broker middleware. The default implementations for all hooks are no-ops and subclasses may implement whatever subset of hooks they like.

def start_blueprint_watcher(self):
61    def start_blueprint_watcher(self):
62        """Start blueprint watcher"""
63        observer = Observer()
64        kwargs = {}
65        if platform.startswith("linux"):
66            kwargs["event_filter"] = (FileCreatedEvent, FileModifiedEvent)
67        observer.schedule(
68            BlueprintEventHandler(), CONFIG.get("blueprints_dir"), recursive=True, **kwargs
69        )
70        observer.start()

Start blueprint watcher

def after_worker_boot(self, broker, worker):
72    def after_worker_boot(self, broker, worker):
73        if not settings.TEST:
74            self.start_blueprint_watcher()

Called after the worker process has started up.

class BlueprintEventHandler(watchdog.events.FileSystemEventHandler):
 77class BlueprintEventHandler(FileSystemEventHandler):
 78    """Event handler for blueprint events"""
 79
 80    # We only ever get creation and modification events.
 81    # See the creation of the Observer instance above for the event filtering.
 82
 83    # Even though we filter to only get file events, we might still get
 84    # directory events as some implementations such as inotify do not support
 85    # filtering on file/directory.
 86
 87    def dispatch(self, event: FileSystemEvent) -> None:
 88        """Call specific event handler method. Ignores directory changes."""
 89        if event.is_directory:
 90            return None
 91        return super().dispatch(event)
 92
 93    def on_created(self, event: FileSystemEvent):
 94        """Process file creation"""
 95        LOGGER.debug("new blueprint file created, starting discovery")
 96        for tenant in Tenant.objects.filter(ready=True):
 97            with tenant:
 98                Schedule.dispatch_by_actor(blueprints_discovery)
 99
100    def on_modified(self, event: FileSystemEvent):
101        """Process file modification"""
102        path = Path(event.src_path)
103        root = Path(CONFIG.get("blueprints_dir")).absolute()
104        rel_path = str(path.relative_to(root))
105        for tenant in Tenant.objects.filter(ready=True):
106            with tenant:
107                for instance in BlueprintInstance.objects.filter(path=rel_path, enabled=True):
108                    LOGGER.debug("modified blueprint file, starting apply", instance=instance)
109                    apply_blueprint.send_with_options(args=(instance.pk,), rel_obj=instance)

Event handler for blueprint events

def dispatch(self, event: watchdog.events.FileSystemEvent) -> None:
87    def dispatch(self, event: FileSystemEvent) -> None:
88        """Call specific event handler method. Ignores directory changes."""
89        if event.is_directory:
90            return None
91        return super().dispatch(event)

Call specific event handler method. Ignores directory changes.

def on_created(self, event: watchdog.events.FileSystemEvent):
93    def on_created(self, event: FileSystemEvent):
94        """Process file creation"""
95        LOGGER.debug("new blueprint file created, starting discovery")
96        for tenant in Tenant.objects.filter(ready=True):
97            with tenant:
98                Schedule.dispatch_by_actor(blueprints_discovery)

Process file creation

def on_modified(self, event: watchdog.events.FileSystemEvent):
100    def on_modified(self, event: FileSystemEvent):
101        """Process file modification"""
102        path = Path(event.src_path)
103        root = Path(CONFIG.get("blueprints_dir")).absolute()
104        rel_path = str(path.relative_to(root))
105        for tenant in Tenant.objects.filter(ready=True):
106            with tenant:
107                for instance in BlueprintInstance.objects.filter(path=rel_path, enabled=True):
108                    LOGGER.debug("modified blueprint file, starting apply", instance=instance)
109                    apply_blueprint.send_with_options(args=(instance.pk,), rel_obj=instance)

Process file modification

def blueprints_find() -> list[BlueprintFile]:
123def blueprints_find() -> list[BlueprintFile]:
124    """Find blueprints and return valid ones"""
125    blueprints = []
126    root = Path(CONFIG.get("blueprints_dir"))
127    for path in root.rglob("**/*.yaml"):
128        rel_path = path.relative_to(root)
129        # Check if any part in the path starts with a dot and assume a hidden file
130        if any(part for part in path.parts if part.startswith(".")):
131            continue
132        with open(path, encoding="utf-8") as blueprint_file:
133            try:
134                raw_blueprint = load(blueprint_file.read(), BlueprintLoader)
135            except YAMLError as exc:
136                raw_blueprint = None
137                LOGGER.warning("failed to parse blueprint", exc=exc, path=str(rel_path))
138            if not raw_blueprint:
139                continue
140            metadata = raw_blueprint.get("metadata", None)
141            version = raw_blueprint.get("version", 1)
142            if version != 1:
143                LOGGER.warning("invalid blueprint version", version=version, path=str(rel_path))
144                continue
145        file_hash = sha512(path.read_bytes()).hexdigest()
146        blueprint = BlueprintFile(str(rel_path), version, file_hash, int(path.stat().st_mtime))
147        blueprint.meta = from_dict(BlueprintMetadata, metadata) if metadata else None
148        blueprints.append(blueprint)
149    return blueprints

Find blueprints and return valid ones

def check_blueprint_v1_file(blueprint: BlueprintFile):
164def check_blueprint_v1_file(blueprint: BlueprintFile):
165    """Check if blueprint should be imported"""
166    instance: BlueprintInstance = BlueprintInstance.objects.filter(path=blueprint.path).first()
167    if (
168        blueprint.meta
169        and blueprint.meta.labels.get(LABEL_AUTHENTIK_INSTANTIATE, "").lower() == "false"
170    ):
171        return
172    if not instance:
173        instance = BlueprintInstance(
174            name=blueprint.meta.name if blueprint.meta else str(blueprint.path),
175            path=blueprint.path,
176            context={},
177            status=BlueprintInstanceStatus.UNKNOWN,
178            enabled=True,
179            managed_models=[],
180            metadata={},
181        )
182        instance.save()
183        LOGGER.info(
184            "Creating new blueprint instance from file", instance=instance, path=instance.path
185        )
186    if instance.last_applied_hash != blueprint.hash:
187        LOGGER.info("Applying blueprint due to changed file", instance=instance, path=instance.path)
188        apply_blueprint.send_with_options(args=(instance.pk,), rel_obj=instance)

Check if blueprint should be imported