authentik.tasks.models
1from collections.abc import Iterable 2from typing import Self 3from uuid import UUID, uuid4 4 5import pgtrigger 6from django.contrib.contenttypes.fields import GenericForeignKey, GenericRelation 7from django.contrib.contenttypes.models import ContentType 8from django.db import models 9from django.utils.translation import gettext_lazy as _ 10from django_dramatiq_postgres.models import TaskBase, TaskState 11from dramatiq.errors import Retry 12 13from authentik.events.logs import LogEvent 14from authentik.events.utils import sanitize_item 15from authentik.lib.models import InternallyManagedMixin, SerializerModel 16from authentik.lib.utils.errors import exception_to_dict 17from authentik.tenants.models import Tenant 18 19 20class TaskStatus(models.TextChoices): 21 """Task aggregated status. Reported by the task runners""" 22 23 QUEUED = TaskState.QUEUED 24 CONSUMED = TaskState.CONSUMED 25 PREPROCESS = TaskState.PREPROCESS 26 RUNNING = TaskState.RUNNING 27 POSTPROCESS = TaskState.POSTPROCESS 28 REJECTED = TaskState.REJECTED 29 DONE = TaskState.DONE 30 INFO = "info" 31 WARNING = "warning" 32 ERROR = "error" 33 34 35class Task(InternallyManagedMixin, SerializerModel, TaskBase): 36 tenant = models.ForeignKey( 37 Tenant, 38 on_delete=models.CASCADE, 39 help_text=_("Tenant this task belongs to"), 40 ) 41 42 rel_obj_content_type = models.ForeignKey(ContentType, on_delete=models.CASCADE, null=True) 43 rel_obj_id = models.TextField(null=True) 44 rel_obj = GenericForeignKey("rel_obj_content_type", "rel_obj_id") 45 46 _uid = models.TextField(blank=True, null=True) 47 _messages = models.JSONField(default=list) 48 _previous_messages = models.JSONField(default=list) 49 50 aggregated_status = models.TextField(choices=TaskStatus.choices) 51 52 class Meta(TaskBase.Meta): 53 default_permissions = ("view",) 54 permissions = [ 55 ("retry_task", _("Retry failed task")), 56 ] 57 indexes = TaskBase.Meta.indexes + ( 58 models.Index(fields=("rel_obj_content_type", "rel_obj_id")), 59 ) 60 triggers = TaskBase.Meta.triggers + ( 61 pgtrigger.Trigger( 62 name="update_aggregated_status", 63 operation=pgtrigger.Insert | pgtrigger.Update, 64 when=pgtrigger.Before, 65 func=f""" 66 NEW.aggregated_status := CASE 67 WHEN NEW.state != '{TaskState.DONE.value}' THEN NEW.state 68 ELSE COALESCE(( 69 SELECT CASE 70 WHEN bool_or(msg->>'log_level' = 'error') THEN 'error' 71 WHEN bool_or(msg->>'log_level' = 'warning') THEN 'warning' 72 WHEN bool_or(msg->>'log_level' = 'info') THEN 'info' 73 ELSE '{TaskState.DONE.value}' 74 END 75 FROM jsonb_array_elements(NEW._messages) AS msg 76 ), '{TaskState.DONE.value}') 77 END; 78 79 RETURN NEW; 80 """, # nosec 81 ), 82 ) 83 84 @property 85 def uid(self) -> str: 86 uid = str(self.actor_name) 87 if self._uid: 88 uid += f":{self._uid}" 89 return uid 90 91 @property 92 def serializer(self): 93 from authentik.tasks.api.tasks import TaskSerializer 94 95 return TaskSerializer 96 97 def set_uid(self, uid: str | UUID, save: bool = False): 98 self._uid = str(uid) 99 if save: 100 self.save() 101 102 @classmethod 103 def _make_log( 104 cls, logger: str, log_level: TaskStatus, message: str | Exception, **attributes 105 ) -> LogEvent: 106 if isinstance(message, Exception): 107 exc = message 108 attributes = { 109 "exception": exception_to_dict(exc), 110 **attributes, 111 } 112 message = str(message) 113 if not message and isinstance(exc, Retry): 114 message = "Task has encountered an error and will be retried" 115 return LogEvent( 116 message, 117 logger=logger, 118 log_level=log_level.value, 119 attributes=attributes, 120 ) 121 122 def logs(self, logs: Iterable[LogEvent]): 123 TaskLog.bulk_create_from_log_events(self, logs) 124 125 def log( 126 self, 127 logger: str, 128 log_level: TaskStatus, 129 message: str | Exception, 130 **attributes, 131 ) -> None: 132 TaskLog.create_from_log_event( 133 self, 134 self._make_log( 135 logger, 136 log_level, 137 message, 138 **attributes, 139 ), 140 ) 141 142 def info(self, message: str | Exception, **attributes) -> None: 143 self.log(self.uid, TaskStatus.INFO, message, **attributes) 144 145 def warning(self, message: str | Exception, **attributes) -> None: 146 self.log(self.uid, TaskStatus.WARNING, message, **attributes) 147 148 def error(self, message: str | Exception, **attributes) -> None: 149 self.log(self.uid, TaskStatus.ERROR, message, **attributes) 150 151 152class TaskLog(InternallyManagedMixin, models.Model): 153 id = models.UUIDField(default=uuid4, primary_key=True, editable=False) 154 155 task = models.ForeignKey(Task, on_delete=models.CASCADE, related_name="tasklogs") 156 event = models.TextField() 157 log_level = models.TextField() 158 logger = models.TextField() 159 timestamp = models.DateTimeField() 160 attributes = models.JSONField() 161 162 previous = models.BooleanField(default=False, db_index=True) 163 164 class Meta: 165 default_permissions = [] 166 verbose_name = _("Task log") 167 verbose_name_plural = _("Task logs") 168 indexes = (models.Index(fields=("task", "previous")),) 169 170 def __str__(self): 171 return str(self.pk) 172 173 @classmethod 174 def create_from_log_event(cls, task: Task, log_event: LogEvent) -> Self | None: 175 if not task.message: 176 return None 177 return cls.objects.create( 178 task=task, 179 event=log_event.event, 180 log_level=log_event.log_level, 181 logger=log_event.logger, 182 timestamp=log_event.timestamp, 183 attributes=sanitize_item(log_event.attributes), 184 ) 185 186 @classmethod 187 def bulk_create_from_log_events( 188 cls, 189 task: Task, 190 log_events: Iterable[LogEvent], 191 ) -> list[Self] | None: 192 if not task.message: 193 return None 194 return cls.objects.bulk_create( 195 [ 196 cls( 197 task=task, 198 event=log_event.event, 199 log_level=log_event.log_level, 200 logger=log_event.logger, 201 timestamp=log_event.timestamp, 202 attributes=sanitize_item(log_event.attributes), 203 ) 204 for log_event in log_events 205 ] 206 ) 207 208 def to_log_event(self) -> LogEvent: 209 return LogEvent( 210 event=self.event, 211 log_level=self.log_level, 212 logger=self.logger, 213 timestamp=self.timestamp, 214 attributes=self.attributes, 215 ) 216 217 218class TasksModel(models.Model): 219 tasks = GenericRelation( 220 Task, content_type_field="rel_obj_content_type", object_id_field="rel_obj_id" 221 ) 222 223 class Meta: 224 abstract = True 225 226 227class WorkerStatus(models.Model): 228 id = models.UUIDField(primary_key=True, default=uuid4) 229 hostname = models.TextField() 230 version = models.TextField() 231 last_seen = models.DateTimeField(auto_now_add=True) 232 233 class Meta: 234 default_permissions = [] 235 verbose_name = _("Worker status") 236 verbose_name_plural = _("Worker statuses") 237 238 def __str__(self): 239 return f"{self.id} - {self.hostname} - {self.version} - {self.last_seen}"
21class TaskStatus(models.TextChoices): 22 """Task aggregated status. Reported by the task runners""" 23 24 QUEUED = TaskState.QUEUED 25 CONSUMED = TaskState.CONSUMED 26 PREPROCESS = TaskState.PREPROCESS 27 RUNNING = TaskState.RUNNING 28 POSTPROCESS = TaskState.POSTPROCESS 29 REJECTED = TaskState.REJECTED 30 DONE = TaskState.DONE 31 INFO = "info" 32 WARNING = "warning" 33 ERROR = "error"
Task aggregated status. Reported by the task runners
36class Task(InternallyManagedMixin, SerializerModel, TaskBase): 37 tenant = models.ForeignKey( 38 Tenant, 39 on_delete=models.CASCADE, 40 help_text=_("Tenant this task belongs to"), 41 ) 42 43 rel_obj_content_type = models.ForeignKey(ContentType, on_delete=models.CASCADE, null=True) 44 rel_obj_id = models.TextField(null=True) 45 rel_obj = GenericForeignKey("rel_obj_content_type", "rel_obj_id") 46 47 _uid = models.TextField(blank=True, null=True) 48 _messages = models.JSONField(default=list) 49 _previous_messages = models.JSONField(default=list) 50 51 aggregated_status = models.TextField(choices=TaskStatus.choices) 52 53 class Meta(TaskBase.Meta): 54 default_permissions = ("view",) 55 permissions = [ 56 ("retry_task", _("Retry failed task")), 57 ] 58 indexes = TaskBase.Meta.indexes + ( 59 models.Index(fields=("rel_obj_content_type", "rel_obj_id")), 60 ) 61 triggers = TaskBase.Meta.triggers + ( 62 pgtrigger.Trigger( 63 name="update_aggregated_status", 64 operation=pgtrigger.Insert | pgtrigger.Update, 65 when=pgtrigger.Before, 66 func=f""" 67 NEW.aggregated_status := CASE 68 WHEN NEW.state != '{TaskState.DONE.value}' THEN NEW.state 69 ELSE COALESCE(( 70 SELECT CASE 71 WHEN bool_or(msg->>'log_level' = 'error') THEN 'error' 72 WHEN bool_or(msg->>'log_level' = 'warning') THEN 'warning' 73 WHEN bool_or(msg->>'log_level' = 'info') THEN 'info' 74 ELSE '{TaskState.DONE.value}' 75 END 76 FROM jsonb_array_elements(NEW._messages) AS msg 77 ), '{TaskState.DONE.value}') 78 END; 79 80 RETURN NEW; 81 """, # nosec 82 ), 83 ) 84 85 @property 86 def uid(self) -> str: 87 uid = str(self.actor_name) 88 if self._uid: 89 uid += f":{self._uid}" 90 return uid 91 92 @property 93 def serializer(self): 94 from authentik.tasks.api.tasks import TaskSerializer 95 96 return TaskSerializer 97 98 def set_uid(self, uid: str | UUID, save: bool = False): 99 self._uid = str(uid) 100 if save: 101 self.save() 102 103 @classmethod 104 def _make_log( 105 cls, logger: str, log_level: TaskStatus, message: str | Exception, **attributes 106 ) -> LogEvent: 107 if isinstance(message, Exception): 108 exc = message 109 attributes = { 110 "exception": exception_to_dict(exc), 111 **attributes, 112 } 113 message = str(message) 114 if not message and isinstance(exc, Retry): 115 message = "Task has encountered an error and will be retried" 116 return LogEvent( 117 message, 118 logger=logger, 119 log_level=log_level.value, 120 attributes=attributes, 121 ) 122 123 def logs(self, logs: Iterable[LogEvent]): 124 TaskLog.bulk_create_from_log_events(self, logs) 125 126 def log( 127 self, 128 logger: str, 129 log_level: TaskStatus, 130 message: str | Exception, 131 **attributes, 132 ) -> None: 133 TaskLog.create_from_log_event( 134 self, 135 self._make_log( 136 logger, 137 log_level, 138 message, 139 **attributes, 140 ), 141 ) 142 143 def info(self, message: str | Exception, **attributes) -> None: 144 self.log(self.uid, TaskStatus.INFO, message, **attributes) 145 146 def warning(self, message: str | Exception, **attributes) -> None: 147 self.log(self.uid, TaskStatus.WARNING, message, **attributes) 148 149 def error(self, message: str | Exception, **attributes) -> None: 150 self.log(self.uid, TaskStatus.ERROR, message, **attributes)
Task(message_id, queue_name, actor_name, message, state, mtime, retries, eta, result, result_expiry, tenant, rel_obj_content_type, rel_obj_id, _uid, _messages, _previous_messages, aggregated_status)
Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Child.parent is a ForwardManyToOneDescriptor instance.
Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Child.parent is a ForwardManyToOneDescriptor instance.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
Provide a generic many-to-one relation through the content_type and
object_id fields.
This class also doubles as an accessor to the related object (similar to ForwardManyToOneDescriptor) by adding itself as a model attribute.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
92 @property 93 def serializer(self): 94 from authentik.tasks.api.tasks import TaskSerializer 95 96 return TaskSerializer
Get serializer for this model
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
Method descriptor with partial application of the given arguments and keywords.
Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.
Method descriptor with partial application of the given arguments and keywords.
Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.
Method descriptor with partial application of the given arguments and keywords.
Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.
Method descriptor with partial application of the given arguments and keywords.
Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.
Accessor to the related objects manager on the reverse side of a many-to-one relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Parent.children is a ReverseManyToOneDescriptor instance.
Most of the implementation is delegated to a dynamically defined manager
class built by create_forward_many_to_many_manager() defined below.
Inherited Members
The requested object does not exist
The query returned multiple objects when only one was expected.
153class TaskLog(InternallyManagedMixin, models.Model): 154 id = models.UUIDField(default=uuid4, primary_key=True, editable=False) 155 156 task = models.ForeignKey(Task, on_delete=models.CASCADE, related_name="tasklogs") 157 event = models.TextField() 158 log_level = models.TextField() 159 logger = models.TextField() 160 timestamp = models.DateTimeField() 161 attributes = models.JSONField() 162 163 previous = models.BooleanField(default=False, db_index=True) 164 165 class Meta: 166 default_permissions = [] 167 verbose_name = _("Task log") 168 verbose_name_plural = _("Task logs") 169 indexes = (models.Index(fields=("task", "previous")),) 170 171 def __str__(self): 172 return str(self.pk) 173 174 @classmethod 175 def create_from_log_event(cls, task: Task, log_event: LogEvent) -> Self | None: 176 if not task.message: 177 return None 178 return cls.objects.create( 179 task=task, 180 event=log_event.event, 181 log_level=log_event.log_level, 182 logger=log_event.logger, 183 timestamp=log_event.timestamp, 184 attributes=sanitize_item(log_event.attributes), 185 ) 186 187 @classmethod 188 def bulk_create_from_log_events( 189 cls, 190 task: Task, 191 log_events: Iterable[LogEvent], 192 ) -> list[Self] | None: 193 if not task.message: 194 return None 195 return cls.objects.bulk_create( 196 [ 197 cls( 198 task=task, 199 event=log_event.event, 200 log_level=log_event.log_level, 201 logger=log_event.logger, 202 timestamp=log_event.timestamp, 203 attributes=sanitize_item(log_event.attributes), 204 ) 205 for log_event in log_events 206 ] 207 ) 208 209 def to_log_event(self) -> LogEvent: 210 return LogEvent( 211 event=self.event, 212 log_level=self.log_level, 213 logger=self.logger, 214 timestamp=self.timestamp, 215 attributes=self.attributes, 216 )
TaskLog(id, task, event, log_level, logger, timestamp, attributes, previous)
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Child.parent is a ForwardManyToOneDescriptor instance.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
174 @classmethod 175 def create_from_log_event(cls, task: Task, log_event: LogEvent) -> Self | None: 176 if not task.message: 177 return None 178 return cls.objects.create( 179 task=task, 180 event=log_event.event, 181 log_level=log_event.log_level, 182 logger=log_event.logger, 183 timestamp=log_event.timestamp, 184 attributes=sanitize_item(log_event.attributes), 185 )
187 @classmethod 188 def bulk_create_from_log_events( 189 cls, 190 task: Task, 191 log_events: Iterable[LogEvent], 192 ) -> list[Self] | None: 193 if not task.message: 194 return None 195 return cls.objects.bulk_create( 196 [ 197 cls( 198 task=task, 199 event=log_event.event, 200 log_level=log_event.log_level, 201 logger=log_event.logger, 202 timestamp=log_event.timestamp, 203 attributes=sanitize_item(log_event.attributes), 204 ) 205 for log_event in log_events 206 ] 207 )
Method descriptor with partial application of the given arguments and keywords.
Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.
The requested object does not exist
The query returned multiple objects when only one was expected.
219class TasksModel(models.Model): 220 tasks = GenericRelation( 221 Task, content_type_field="rel_obj_content_type", object_id_field="rel_obj_id" 222 ) 223 224 class Meta: 225 abstract = True
Make subclasses preserve the alters_data attribute on overridden methods.
228class WorkerStatus(models.Model): 229 id = models.UUIDField(primary_key=True, default=uuid4) 230 hostname = models.TextField() 231 version = models.TextField() 232 last_seen = models.DateTimeField(auto_now_add=True) 233 234 class Meta: 235 default_permissions = [] 236 verbose_name = _("Worker status") 237 verbose_name_plural = _("Worker statuses") 238 239 def __str__(self): 240 return f"{self.id} - {self.hostname} - {self.version} - {self.last_seen}"
WorkerStatus(id, hostname, version, last_seen)
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
Method descriptor with partial application of the given arguments and keywords.
Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.
The requested object does not exist
The query returned multiple objects when only one was expected.