authentik.tasks.models
1from collections.abc import Iterable 2from typing import Self 3from uuid import UUID, uuid4 4 5import pgtrigger 6from django.contrib.contenttypes.fields import GenericForeignKey, GenericRelation 7from django.contrib.contenttypes.models import ContentType 8from django.db import models 9from django.utils.translation import gettext_lazy as _ 10from django_dramatiq_postgres.models import TaskBase, TaskState 11 12from authentik.events.logs import LogEvent 13from authentik.events.utils import sanitize_item 14from authentik.lib.models import InternallyManagedMixin, SerializerModel 15from authentik.lib.utils.errors import exception_to_dict 16from authentik.tenants.models import Tenant 17 18 19class TaskStatus(models.TextChoices): 20 """Task aggregated status. Reported by the task runners""" 21 22 QUEUED = TaskState.QUEUED 23 CONSUMED = TaskState.CONSUMED 24 PREPROCESS = TaskState.PREPROCESS 25 RUNNING = TaskState.RUNNING 26 POSTPROCESS = TaskState.POSTPROCESS 27 REJECTED = TaskState.REJECTED 28 DONE = TaskState.DONE 29 INFO = "info" 30 WARNING = "warning" 31 ERROR = "error" 32 33 34class Task(InternallyManagedMixin, SerializerModel, TaskBase): 35 tenant = models.ForeignKey( 36 Tenant, 37 on_delete=models.CASCADE, 38 help_text=_("Tenant this task belongs to"), 39 ) 40 41 rel_obj_content_type = models.ForeignKey(ContentType, on_delete=models.CASCADE, null=True) 42 rel_obj_id = models.TextField(null=True) 43 rel_obj = GenericForeignKey("rel_obj_content_type", "rel_obj_id") 44 45 _uid = models.TextField(blank=True, null=True) 46 _messages = models.JSONField(default=list) 47 _previous_messages = models.JSONField(default=list) 48 49 aggregated_status = models.TextField(choices=TaskStatus.choices) 50 51 class Meta(TaskBase.Meta): 52 default_permissions = ("view",) 53 permissions = [ 54 ("retry_task", _("Retry failed task")), 55 ] 56 indexes = TaskBase.Meta.indexes + ( 57 models.Index(fields=("rel_obj_content_type", "rel_obj_id")), 58 ) 59 triggers = TaskBase.Meta.triggers + ( 60 pgtrigger.Trigger( 61 name="update_aggregated_status", 62 operation=pgtrigger.Insert | pgtrigger.Update, 63 when=pgtrigger.Before, 64 func=f""" 65 NEW.aggregated_status := CASE 66 WHEN NEW.state != '{TaskState.DONE.value}' THEN NEW.state 67 ELSE COALESCE(( 68 SELECT CASE 69 WHEN bool_or(msg->>'log_level' = 'error') THEN 'error' 70 WHEN bool_or(msg->>'log_level' = 'warning') THEN 'warning' 71 WHEN bool_or(msg->>'log_level' = 'info') THEN 'info' 72 ELSE '{TaskState.DONE.value}' 73 END 74 FROM jsonb_array_elements(NEW._messages) AS msg 75 ), '{TaskState.DONE.value}') 76 END; 77 78 RETURN NEW; 79 """, # nosec 80 ), 81 ) 82 83 @property 84 def uid(self) -> str: 85 uid = str(self.actor_name) 86 if self._uid: 87 uid += f":{self._uid}" 88 return uid 89 90 @property 91 def serializer(self): 92 from authentik.tasks.api.tasks import TaskSerializer 93 94 return TaskSerializer 95 96 def set_uid(self, uid: str | UUID, save: bool = False): 97 self._uid = str(uid) 98 if save: 99 self.save() 100 101 @classmethod 102 def _make_log( 103 cls, logger: str, log_level: TaskStatus, message: str | Exception, **attributes 104 ) -> LogEvent: 105 if isinstance(message, Exception): 106 attributes = { 107 "exception": exception_to_dict(message), 108 **attributes, 109 } 110 message = str(message) 111 return LogEvent( 112 message, 113 logger=logger, 114 log_level=log_level.value, 115 attributes=attributes, 116 ) 117 118 def logs(self, logs: Iterable[LogEvent]): 119 TaskLog.bulk_create_from_log_events(self, logs) 120 121 def log( 122 self, 123 logger: str, 124 log_level: TaskStatus, 125 message: str | Exception, 126 **attributes, 127 ) -> None: 128 TaskLog.create_from_log_event( 129 self, 130 self._make_log( 131 logger, 132 log_level, 133 message, 134 **attributes, 135 ), 136 ) 137 138 def info(self, message: str | Exception, **attributes) -> None: 139 self.log(self.uid, TaskStatus.INFO, message, **attributes) 140 141 def warning(self, message: str | Exception, **attributes) -> None: 142 self.log(self.uid, TaskStatus.WARNING, message, **attributes) 143 144 def error(self, message: str | Exception, **attributes) -> None: 145 self.log(self.uid, TaskStatus.ERROR, message, **attributes) 146 147 148class TaskLog(InternallyManagedMixin, models.Model): 149 id = models.UUIDField(default=uuid4, primary_key=True, editable=False) 150 151 task = models.ForeignKey(Task, on_delete=models.CASCADE, related_name="tasklogs") 152 event = models.TextField() 153 log_level = models.TextField() 154 logger = models.TextField() 155 timestamp = models.DateTimeField() 156 attributes = models.JSONField() 157 158 previous = models.BooleanField(default=False, db_index=True) 159 160 class Meta: 161 default_permissions = [] 162 verbose_name = _("Task log") 163 verbose_name_plural = _("Task logs") 164 indexes = (models.Index(fields=("task", "previous")),) 165 166 def __str__(self): 167 return str(self.pk) 168 169 @classmethod 170 def create_from_log_event(cls, task: Task, log_event: LogEvent) -> Self | None: 171 if not task.message: 172 return None 173 return cls.objects.create( 174 task=task, 175 event=log_event.event, 176 log_level=log_event.log_level, 177 logger=log_event.logger, 178 timestamp=log_event.timestamp, 179 attributes=sanitize_item(log_event.attributes), 180 ) 181 182 @classmethod 183 def bulk_create_from_log_events( 184 cls, 185 task: Task, 186 log_events: Iterable[LogEvent], 187 ) -> list[Self] | None: 188 if not task.message: 189 return None 190 return cls.objects.bulk_create( 191 [ 192 cls( 193 task=task, 194 event=log_event.event, 195 log_level=log_event.log_level, 196 logger=log_event.logger, 197 timestamp=log_event.timestamp, 198 attributes=sanitize_item(log_event.attributes), 199 ) 200 for log_event in log_events 201 ] 202 ) 203 204 def to_log_event(self) -> LogEvent: 205 return LogEvent( 206 event=self.event, 207 log_level=self.log_level, 208 logger=self.logger, 209 timestamp=self.timestamp, 210 attributes=self.attributes, 211 ) 212 213 214class TasksModel(models.Model): 215 tasks = GenericRelation( 216 Task, content_type_field="rel_obj_content_type", object_id_field="rel_obj_id" 217 ) 218 219 class Meta: 220 abstract = True 221 222 223class WorkerStatus(models.Model): 224 id = models.UUIDField(primary_key=True, default=uuid4) 225 hostname = models.TextField() 226 version = models.TextField() 227 last_seen = models.DateTimeField(auto_now_add=True) 228 229 class Meta: 230 default_permissions = [] 231 verbose_name = _("Worker status") 232 verbose_name_plural = _("Worker statuses") 233 234 def __str__(self): 235 return f"{self.id} - {self.hostname} - {self.version} - {self.last_seen}"
20class TaskStatus(models.TextChoices): 21 """Task aggregated status. Reported by the task runners""" 22 23 QUEUED = TaskState.QUEUED 24 CONSUMED = TaskState.CONSUMED 25 PREPROCESS = TaskState.PREPROCESS 26 RUNNING = TaskState.RUNNING 27 POSTPROCESS = TaskState.POSTPROCESS 28 REJECTED = TaskState.REJECTED 29 DONE = TaskState.DONE 30 INFO = "info" 31 WARNING = "warning" 32 ERROR = "error"
Task aggregated status. Reported by the task runners
35class Task(InternallyManagedMixin, SerializerModel, TaskBase): 36 tenant = models.ForeignKey( 37 Tenant, 38 on_delete=models.CASCADE, 39 help_text=_("Tenant this task belongs to"), 40 ) 41 42 rel_obj_content_type = models.ForeignKey(ContentType, on_delete=models.CASCADE, null=True) 43 rel_obj_id = models.TextField(null=True) 44 rel_obj = GenericForeignKey("rel_obj_content_type", "rel_obj_id") 45 46 _uid = models.TextField(blank=True, null=True) 47 _messages = models.JSONField(default=list) 48 _previous_messages = models.JSONField(default=list) 49 50 aggregated_status = models.TextField(choices=TaskStatus.choices) 51 52 class Meta(TaskBase.Meta): 53 default_permissions = ("view",) 54 permissions = [ 55 ("retry_task", _("Retry failed task")), 56 ] 57 indexes = TaskBase.Meta.indexes + ( 58 models.Index(fields=("rel_obj_content_type", "rel_obj_id")), 59 ) 60 triggers = TaskBase.Meta.triggers + ( 61 pgtrigger.Trigger( 62 name="update_aggregated_status", 63 operation=pgtrigger.Insert | pgtrigger.Update, 64 when=pgtrigger.Before, 65 func=f""" 66 NEW.aggregated_status := CASE 67 WHEN NEW.state != '{TaskState.DONE.value}' THEN NEW.state 68 ELSE COALESCE(( 69 SELECT CASE 70 WHEN bool_or(msg->>'log_level' = 'error') THEN 'error' 71 WHEN bool_or(msg->>'log_level' = 'warning') THEN 'warning' 72 WHEN bool_or(msg->>'log_level' = 'info') THEN 'info' 73 ELSE '{TaskState.DONE.value}' 74 END 75 FROM jsonb_array_elements(NEW._messages) AS msg 76 ), '{TaskState.DONE.value}') 77 END; 78 79 RETURN NEW; 80 """, # nosec 81 ), 82 ) 83 84 @property 85 def uid(self) -> str: 86 uid = str(self.actor_name) 87 if self._uid: 88 uid += f":{self._uid}" 89 return uid 90 91 @property 92 def serializer(self): 93 from authentik.tasks.api.tasks import TaskSerializer 94 95 return TaskSerializer 96 97 def set_uid(self, uid: str | UUID, save: bool = False): 98 self._uid = str(uid) 99 if save: 100 self.save() 101 102 @classmethod 103 def _make_log( 104 cls, logger: str, log_level: TaskStatus, message: str | Exception, **attributes 105 ) -> LogEvent: 106 if isinstance(message, Exception): 107 attributes = { 108 "exception": exception_to_dict(message), 109 **attributes, 110 } 111 message = str(message) 112 return LogEvent( 113 message, 114 logger=logger, 115 log_level=log_level.value, 116 attributes=attributes, 117 ) 118 119 def logs(self, logs: Iterable[LogEvent]): 120 TaskLog.bulk_create_from_log_events(self, logs) 121 122 def log( 123 self, 124 logger: str, 125 log_level: TaskStatus, 126 message: str | Exception, 127 **attributes, 128 ) -> None: 129 TaskLog.create_from_log_event( 130 self, 131 self._make_log( 132 logger, 133 log_level, 134 message, 135 **attributes, 136 ), 137 ) 138 139 def info(self, message: str | Exception, **attributes) -> None: 140 self.log(self.uid, TaskStatus.INFO, message, **attributes) 141 142 def warning(self, message: str | Exception, **attributes) -> None: 143 self.log(self.uid, TaskStatus.WARNING, message, **attributes) 144 145 def error(self, message: str | Exception, **attributes) -> None: 146 self.log(self.uid, TaskStatus.ERROR, message, **attributes)
Task(message_id, queue_name, actor_name, message, state, mtime, retries, eta, result, result_expiry, tenant, rel_obj_content_type, rel_obj_id, _uid, _messages, _previous_messages, aggregated_status)
Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Child.parent is a ForwardManyToOneDescriptor instance.
Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Child.parent is a ForwardManyToOneDescriptor instance.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
Provide a generic many-to-one relation through the content_type and
object_id fields.
This class also doubles as an accessor to the related object (similar to ForwardManyToOneDescriptor) by adding itself as a model attribute.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
91 @property 92 def serializer(self): 93 from authentik.tasks.api.tasks import TaskSerializer 94 95 return TaskSerializer
Get serializer for this model
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
Method descriptor with partial application of the given arguments and keywords.
Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.
Method descriptor with partial application of the given arguments and keywords.
Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.
Method descriptor with partial application of the given arguments and keywords.
Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.
Method descriptor with partial application of the given arguments and keywords.
Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.
Accessor to the related objects manager on the reverse side of a many-to-one relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Parent.children is a ReverseManyToOneDescriptor instance.
Most of the implementation is delegated to a dynamically defined manager
class built by create_forward_many_to_many_manager() defined below.
Inherited Members
The requested object does not exist
The query returned multiple objects when only one was expected.
149class TaskLog(InternallyManagedMixin, models.Model): 150 id = models.UUIDField(default=uuid4, primary_key=True, editable=False) 151 152 task = models.ForeignKey(Task, on_delete=models.CASCADE, related_name="tasklogs") 153 event = models.TextField() 154 log_level = models.TextField() 155 logger = models.TextField() 156 timestamp = models.DateTimeField() 157 attributes = models.JSONField() 158 159 previous = models.BooleanField(default=False, db_index=True) 160 161 class Meta: 162 default_permissions = [] 163 verbose_name = _("Task log") 164 verbose_name_plural = _("Task logs") 165 indexes = (models.Index(fields=("task", "previous")),) 166 167 def __str__(self): 168 return str(self.pk) 169 170 @classmethod 171 def create_from_log_event(cls, task: Task, log_event: LogEvent) -> Self | None: 172 if not task.message: 173 return None 174 return cls.objects.create( 175 task=task, 176 event=log_event.event, 177 log_level=log_event.log_level, 178 logger=log_event.logger, 179 timestamp=log_event.timestamp, 180 attributes=sanitize_item(log_event.attributes), 181 ) 182 183 @classmethod 184 def bulk_create_from_log_events( 185 cls, 186 task: Task, 187 log_events: Iterable[LogEvent], 188 ) -> list[Self] | None: 189 if not task.message: 190 return None 191 return cls.objects.bulk_create( 192 [ 193 cls( 194 task=task, 195 event=log_event.event, 196 log_level=log_event.log_level, 197 logger=log_event.logger, 198 timestamp=log_event.timestamp, 199 attributes=sanitize_item(log_event.attributes), 200 ) 201 for log_event in log_events 202 ] 203 ) 204 205 def to_log_event(self) -> LogEvent: 206 return LogEvent( 207 event=self.event, 208 log_level=self.log_level, 209 logger=self.logger, 210 timestamp=self.timestamp, 211 attributes=self.attributes, 212 )
TaskLog(id, task, event, log_level, logger, timestamp, attributes, previous)
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Child.parent is a ForwardManyToOneDescriptor instance.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
170 @classmethod 171 def create_from_log_event(cls, task: Task, log_event: LogEvent) -> Self | None: 172 if not task.message: 173 return None 174 return cls.objects.create( 175 task=task, 176 event=log_event.event, 177 log_level=log_event.log_level, 178 logger=log_event.logger, 179 timestamp=log_event.timestamp, 180 attributes=sanitize_item(log_event.attributes), 181 )
183 @classmethod 184 def bulk_create_from_log_events( 185 cls, 186 task: Task, 187 log_events: Iterable[LogEvent], 188 ) -> list[Self] | None: 189 if not task.message: 190 return None 191 return cls.objects.bulk_create( 192 [ 193 cls( 194 task=task, 195 event=log_event.event, 196 log_level=log_event.log_level, 197 logger=log_event.logger, 198 timestamp=log_event.timestamp, 199 attributes=sanitize_item(log_event.attributes), 200 ) 201 for log_event in log_events 202 ] 203 )
Method descriptor with partial application of the given arguments and keywords.
Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.
The requested object does not exist
The query returned multiple objects when only one was expected.
215class TasksModel(models.Model): 216 tasks = GenericRelation( 217 Task, content_type_field="rel_obj_content_type", object_id_field="rel_obj_id" 218 ) 219 220 class Meta: 221 abstract = True
Make subclasses preserve the alters_data attribute on overridden methods.
224class WorkerStatus(models.Model): 225 id = models.UUIDField(primary_key=True, default=uuid4) 226 hostname = models.TextField() 227 version = models.TextField() 228 last_seen = models.DateTimeField(auto_now_add=True) 229 230 class Meta: 231 default_permissions = [] 232 verbose_name = _("Worker status") 233 verbose_name_plural = _("Worker statuses") 234 235 def __str__(self): 236 return f"{self.id} - {self.hostname} - {self.version} - {self.last_seen}"
WorkerStatus(id, hostname, version, last_seen)
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
Method descriptor with partial application of the given arguments and keywords.
Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.
The requested object does not exist
The query returned multiple objects when only one was expected.