authentik.blueprints.v1.tasks
v1 blueprints tasks
1"""v1 blueprints tasks""" 2 3from dataclasses import asdict, dataclass, field 4from hashlib import sha512 5from pathlib import Path 6from sys import platform 7from uuid import UUID 8 9from dacite.core import from_dict 10from django.conf import settings 11from django.db import DatabaseError, InternalError, ProgrammingError 12from django.utils.text import slugify 13from django.utils.timezone import now 14from django.utils.translation import gettext_lazy as _ 15from dramatiq.actor import actor 16from dramatiq.middleware import Middleware 17from structlog.stdlib import get_logger 18from watchdog.events import ( 19 FileCreatedEvent, 20 FileModifiedEvent, 21 FileSystemEvent, 22 FileSystemEventHandler, 23) 24from watchdog.observers import Observer 25from yaml import load 26from yaml.error import YAMLError 27 28from authentik.blueprints.models import ( 29 BlueprintInstance, 30 BlueprintInstanceStatus, 31 BlueprintRetrievalFailed, 32) 33from authentik.blueprints.v1.common import BlueprintLoader, BlueprintMetadata, EntryInvalidError 34from authentik.blueprints.v1.importer import Importer 35from authentik.blueprints.v1.labels import LABEL_AUTHENTIK_INSTANTIATE 36from authentik.blueprints.v1.oci import OCI_PREFIX 37from authentik.events.logs import capture_logs 38from authentik.events.utils import sanitize_dict 39from authentik.lib.config import CONFIG 40from authentik.tasks.apps import PRIORITY_HIGH 41from authentik.tasks.middleware import CurrentTask 42from authentik.tasks.schedules.models import Schedule 43from authentik.tenants.models import Tenant 44 45LOGGER = get_logger() 46 47 48@dataclass 49class BlueprintFile: 50 """Basic info about a blueprint file""" 51 52 path: str 53 version: int 54 hash: str 55 last_m: int 56 meta: BlueprintMetadata | None = field(default=None) 57 58 59class BlueprintWatcherMiddleware(Middleware): 60 def start_blueprint_watcher(self): 61 """Start blueprint watcher""" 62 observer = Observer() 63 kwargs = {} 64 if platform.startswith("linux"): 65 kwargs["event_filter"] = (FileCreatedEvent, FileModifiedEvent) 66 observer.schedule( 67 BlueprintEventHandler(), CONFIG.get("blueprints_dir"), recursive=True, **kwargs 68 ) 69 observer.start() 70 71 def after_worker_boot(self, broker, worker): 72 if not settings.TEST: 73 self.start_blueprint_watcher() 74 75 76class BlueprintEventHandler(FileSystemEventHandler): 77 """Event handler for blueprint events""" 78 79 # We only ever get creation and modification events. 80 # See the creation of the Observer instance above for the event filtering. 81 82 # Even though we filter to only get file events, we might still get 83 # directory events as some implementations such as inotify do not support 84 # filtering on file/directory. 85 86 def dispatch(self, event: FileSystemEvent) -> None: 87 """Call specific event handler method. Ignores directory changes.""" 88 if event.is_directory: 89 return None 90 return super().dispatch(event) 91 92 def on_created(self, event: FileSystemEvent): 93 """Process file creation""" 94 LOGGER.debug("new blueprint file created, starting discovery") 95 for tenant in Tenant.objects.filter(ready=True): 96 with tenant: 97 Schedule.dispatch_by_actor(blueprints_discovery) 98 99 def on_modified(self, event: FileSystemEvent): 100 """Process file modification""" 101 path = Path(event.src_path) 102 root = Path(CONFIG.get("blueprints_dir")).absolute() 103 rel_path = str(path.relative_to(root)) 104 for tenant in Tenant.objects.filter(ready=True): 105 with tenant: 106 for instance in BlueprintInstance.objects.filter(path=rel_path, enabled=True): 107 LOGGER.debug("modified blueprint file, starting apply", instance=instance) 108 apply_blueprint.send_with_options(args=(instance.pk,), rel_obj=instance) 109 110 111@actor( 112 description=_("Find blueprints as `blueprints_find` does, but return a safe dict."), 113 priority=PRIORITY_HIGH, 114) 115def blueprints_find_dict(): 116 blueprints = [] 117 for blueprint in blueprints_find(): 118 blueprints.append(sanitize_dict(asdict(blueprint))) 119 return blueprints 120 121 122def blueprints_find() -> list[BlueprintFile]: 123 """Find blueprints and return valid ones""" 124 blueprints = [] 125 root = Path(CONFIG.get("blueprints_dir")) 126 for path in root.rglob("**/*.yaml"): 127 rel_path = path.relative_to(root) 128 # Check if any part in the path starts with a dot and assume a hidden file 129 if any(part for part in path.parts if part.startswith(".")): 130 continue 131 with open(path, encoding="utf-8") as blueprint_file: 132 try: 133 raw_blueprint = load(blueprint_file.read(), BlueprintLoader) 134 except YAMLError as exc: 135 raw_blueprint = None 136 LOGGER.warning("failed to parse blueprint", exc=exc, path=str(rel_path)) 137 if not raw_blueprint: 138 continue 139 metadata = raw_blueprint.get("metadata", None) 140 version = raw_blueprint.get("version", 1) 141 if version != 1: 142 LOGGER.warning("invalid blueprint version", version=version, path=str(rel_path)) 143 continue 144 file_hash = sha512(path.read_bytes()).hexdigest() 145 blueprint = BlueprintFile(str(rel_path), version, file_hash, int(path.stat().st_mtime)) 146 blueprint.meta = from_dict(BlueprintMetadata, metadata) if metadata else None 147 blueprints.append(blueprint) 148 return blueprints 149 150 151@actor(description=_("Find blueprints and check if they need to be created in the database.")) 152def blueprints_discovery(path: str | None = None): 153 self = CurrentTask.get_task() 154 count = 0 155 for blueprint in blueprints_find(): 156 if path and blueprint.path != path: 157 continue 158 check_blueprint_v1_file(blueprint) 159 count += 1 160 self.info(f"Successfully imported {count} files.") 161 162 163def check_blueprint_v1_file(blueprint: BlueprintFile): 164 """Check if blueprint should be imported""" 165 instance: BlueprintInstance = BlueprintInstance.objects.filter(path=blueprint.path).first() 166 if ( 167 blueprint.meta 168 and blueprint.meta.labels.get(LABEL_AUTHENTIK_INSTANTIATE, "").lower() == "false" 169 ): 170 return 171 if not instance: 172 instance = BlueprintInstance( 173 name=blueprint.meta.name if blueprint.meta else str(blueprint.path), 174 path=blueprint.path, 175 context={}, 176 status=BlueprintInstanceStatus.UNKNOWN, 177 enabled=True, 178 managed_models=[], 179 metadata={}, 180 ) 181 instance.save() 182 LOGGER.info( 183 "Creating new blueprint instance from file", instance=instance, path=instance.path 184 ) 185 if instance.last_applied_hash != blueprint.hash: 186 LOGGER.info("Applying blueprint due to changed file", instance=instance, path=instance.path) 187 apply_blueprint.send_with_options(args=(instance.pk,), rel_obj=instance) 188 189 190@actor(description=_("Apply single blueprint.")) 191def apply_blueprint(instance_pk: UUID): 192 self = CurrentTask.get_task() 193 self.set_uid(str(instance_pk)) 194 instance: BlueprintInstance | None = None 195 try: 196 instance: BlueprintInstance = BlueprintInstance.objects.filter(pk=instance_pk).first() 197 if not instance: 198 self.warning(f"Could not find blueprint {instance_pk}, skipping") 199 return 200 self.set_uid(slugify(instance.name)) 201 if not instance.enabled: 202 self.info(f"Blueprint {instance.name} is disabled, skipping") 203 return 204 blueprint_content = instance.retrieve() 205 file_hash = sha512(blueprint_content.encode()).hexdigest() 206 importer = Importer.from_string(blueprint_content, instance.context) 207 if importer.blueprint.metadata: 208 instance.metadata = asdict(importer.blueprint.metadata) 209 valid, logs = importer.validate() 210 if not valid: 211 instance.status = BlueprintInstanceStatus.ERROR 212 instance.save() 213 self.logs(logs) 214 return 215 with capture_logs() as logs: 216 applied = importer.apply() 217 if not applied: 218 instance.status = BlueprintInstanceStatus.ERROR 219 instance.save() 220 self.logs(logs) 221 return 222 instance.status = BlueprintInstanceStatus.SUCCESSFUL 223 instance.last_applied_hash = file_hash 224 instance.last_applied = now() 225 except ( 226 OSError, 227 DatabaseError, 228 ProgrammingError, 229 InternalError, 230 BlueprintRetrievalFailed, 231 EntryInvalidError, 232 ) as exc: 233 if instance: 234 instance.status = BlueprintInstanceStatus.ERROR 235 self.error(exc) 236 finally: 237 if instance: 238 instance.save() 239 240 241@actor(description=_("Remove blueprints which couldn't be fetched.")) 242def clear_failed_blueprints(): 243 # Exclude OCI blueprints as those might be temporarily unavailable 244 for blueprint in BlueprintInstance.objects.exclude(path__startswith=OCI_PREFIX): 245 try: 246 blueprint.retrieve() 247 except BlueprintRetrievalFailed: 248 blueprint.delete()
LOGGER =
<BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
@dataclass
class
BlueprintFile:
49@dataclass 50class BlueprintFile: 51 """Basic info about a blueprint file""" 52 53 path: str 54 version: int 55 hash: str 56 last_m: int 57 meta: BlueprintMetadata | None = field(default=None)
Basic info about a blueprint file
BlueprintFile( path: str, version: int, hash: str, last_m: int, meta: authentik.blueprints.v1.common.BlueprintMetadata | None = None)
class
BlueprintWatcherMiddleware(dramatiq.middleware.middleware.Middleware):
60class BlueprintWatcherMiddleware(Middleware): 61 def start_blueprint_watcher(self): 62 """Start blueprint watcher""" 63 observer = Observer() 64 kwargs = {} 65 if platform.startswith("linux"): 66 kwargs["event_filter"] = (FileCreatedEvent, FileModifiedEvent) 67 observer.schedule( 68 BlueprintEventHandler(), CONFIG.get("blueprints_dir"), recursive=True, **kwargs 69 ) 70 observer.start() 71 72 def after_worker_boot(self, broker, worker): 73 if not settings.TEST: 74 self.start_blueprint_watcher()
Base class for broker middleware. The default implementations for all hooks are no-ops and subclasses may implement whatever subset of hooks they like.
def
start_blueprint_watcher(self):
61 def start_blueprint_watcher(self): 62 """Start blueprint watcher""" 63 observer = Observer() 64 kwargs = {} 65 if platform.startswith("linux"): 66 kwargs["event_filter"] = (FileCreatedEvent, FileModifiedEvent) 67 observer.schedule( 68 BlueprintEventHandler(), CONFIG.get("blueprints_dir"), recursive=True, **kwargs 69 ) 70 observer.start()
Start blueprint watcher
class
BlueprintEventHandler(watchdog.events.FileSystemEventHandler):
77class BlueprintEventHandler(FileSystemEventHandler): 78 """Event handler for blueprint events""" 79 80 # We only ever get creation and modification events. 81 # See the creation of the Observer instance above for the event filtering. 82 83 # Even though we filter to only get file events, we might still get 84 # directory events as some implementations such as inotify do not support 85 # filtering on file/directory. 86 87 def dispatch(self, event: FileSystemEvent) -> None: 88 """Call specific event handler method. Ignores directory changes.""" 89 if event.is_directory: 90 return None 91 return super().dispatch(event) 92 93 def on_created(self, event: FileSystemEvent): 94 """Process file creation""" 95 LOGGER.debug("new blueprint file created, starting discovery") 96 for tenant in Tenant.objects.filter(ready=True): 97 with tenant: 98 Schedule.dispatch_by_actor(blueprints_discovery) 99 100 def on_modified(self, event: FileSystemEvent): 101 """Process file modification""" 102 path = Path(event.src_path) 103 root = Path(CONFIG.get("blueprints_dir")).absolute() 104 rel_path = str(path.relative_to(root)) 105 for tenant in Tenant.objects.filter(ready=True): 106 with tenant: 107 for instance in BlueprintInstance.objects.filter(path=rel_path, enabled=True): 108 LOGGER.debug("modified blueprint file, starting apply", instance=instance) 109 apply_blueprint.send_with_options(args=(instance.pk,), rel_obj=instance)
Event handler for blueprint events
def
dispatch(self, event: watchdog.events.FileSystemEvent) -> None:
87 def dispatch(self, event: FileSystemEvent) -> None: 88 """Call specific event handler method. Ignores directory changes.""" 89 if event.is_directory: 90 return None 91 return super().dispatch(event)
Call specific event handler method. Ignores directory changes.
def
on_created(self, event: watchdog.events.FileSystemEvent):
93 def on_created(self, event: FileSystemEvent): 94 """Process file creation""" 95 LOGGER.debug("new blueprint file created, starting discovery") 96 for tenant in Tenant.objects.filter(ready=True): 97 with tenant: 98 Schedule.dispatch_by_actor(blueprints_discovery)
Process file creation
def
on_modified(self, event: watchdog.events.FileSystemEvent):
100 def on_modified(self, event: FileSystemEvent): 101 """Process file modification""" 102 path = Path(event.src_path) 103 root = Path(CONFIG.get("blueprints_dir")).absolute() 104 rel_path = str(path.relative_to(root)) 105 for tenant in Tenant.objects.filter(ready=True): 106 with tenant: 107 for instance in BlueprintInstance.objects.filter(path=rel_path, enabled=True): 108 LOGGER.debug("modified blueprint file, starting apply", instance=instance) 109 apply_blueprint.send_with_options(args=(instance.pk,), rel_obj=instance)
Process file modification
123def blueprints_find() -> list[BlueprintFile]: 124 """Find blueprints and return valid ones""" 125 blueprints = [] 126 root = Path(CONFIG.get("blueprints_dir")) 127 for path in root.rglob("**/*.yaml"): 128 rel_path = path.relative_to(root) 129 # Check if any part in the path starts with a dot and assume a hidden file 130 if any(part for part in path.parts if part.startswith(".")): 131 continue 132 with open(path, encoding="utf-8") as blueprint_file: 133 try: 134 raw_blueprint = load(blueprint_file.read(), BlueprintLoader) 135 except YAMLError as exc: 136 raw_blueprint = None 137 LOGGER.warning("failed to parse blueprint", exc=exc, path=str(rel_path)) 138 if not raw_blueprint: 139 continue 140 metadata = raw_blueprint.get("metadata", None) 141 version = raw_blueprint.get("version", 1) 142 if version != 1: 143 LOGGER.warning("invalid blueprint version", version=version, path=str(rel_path)) 144 continue 145 file_hash = sha512(path.read_bytes()).hexdigest() 146 blueprint = BlueprintFile(str(rel_path), version, file_hash, int(path.stat().st_mtime)) 147 blueprint.meta = from_dict(BlueprintMetadata, metadata) if metadata else None 148 blueprints.append(blueprint) 149 return blueprints
Find blueprints and return valid ones
164def check_blueprint_v1_file(blueprint: BlueprintFile): 165 """Check if blueprint should be imported""" 166 instance: BlueprintInstance = BlueprintInstance.objects.filter(path=blueprint.path).first() 167 if ( 168 blueprint.meta 169 and blueprint.meta.labels.get(LABEL_AUTHENTIK_INSTANTIATE, "").lower() == "false" 170 ): 171 return 172 if not instance: 173 instance = BlueprintInstance( 174 name=blueprint.meta.name if blueprint.meta else str(blueprint.path), 175 path=blueprint.path, 176 context={}, 177 status=BlueprintInstanceStatus.UNKNOWN, 178 enabled=True, 179 managed_models=[], 180 metadata={}, 181 ) 182 instance.save() 183 LOGGER.info( 184 "Creating new blueprint instance from file", instance=instance, path=instance.path 185 ) 186 if instance.last_applied_hash != blueprint.hash: 187 LOGGER.info("Applying blueprint due to changed file", instance=instance, path=instance.path) 188 apply_blueprint.send_with_options(args=(instance.pk,), rel_obj=instance)
Check if blueprint should be imported