authentik.tasks.schedules.common
1import pickle # nosec 2from collections.abc import Iterable 3from dataclasses import dataclass, field 4from typing import TYPE_CHECKING, Any 5from uuid import UUID 6 7from dramatiq.actor import Actor 8from psqlextra.types import ConflictAction 9 10if TYPE_CHECKING: 11 from authentik.tasks.schedules.models import Schedule 12 13 14@dataclass 15class ScheduleSpec: 16 actor: Actor 17 crontab: str 18 paused: bool = False 19 identifier: str | UUID | None = None 20 uid: str | None = None 21 22 args: Iterable[Any] = field(default_factory=tuple) 23 kwargs: dict[str, Any] = field(default_factory=dict) 24 options: dict[str, Any] = field(default_factory=dict) 25 26 rel_obj: Any | None = None 27 28 send_on_save: bool = False 29 30 send_on_startup: bool = False 31 32 def get_args(self) -> bytes: 33 return pickle.dumps(self.args) 34 35 def get_kwargs(self) -> bytes: 36 return pickle.dumps(self.kwargs) 37 38 def get_options(self) -> bytes: 39 options = self.options 40 if self.uid is not None: 41 options["uid"] = self.uid 42 return pickle.dumps(options) 43 44 def update_or_create(self) -> Schedule: 45 from django.contrib.contenttypes.models import ContentType 46 47 from authentik.tasks.schedules.models import Schedule 48 49 update_values = { 50 "_uid": self.uid, 51 "paused": self.paused, 52 "args": self.get_args(), 53 "kwargs": self.get_kwargs(), 54 "options": self.get_options(), 55 } 56 if self.rel_obj is not None: 57 update_values["rel_obj_content_type"] = ContentType.objects.get_for_model(self.rel_obj) 58 update_values["rel_obj_id"] = str(self.rel_obj.pk) 59 create_values = { 60 **update_values, 61 "crontab": self.crontab, 62 } 63 64 schedule = Schedule.objects.on_conflict( 65 ["actor_name", "identifier"], 66 ConflictAction.UPDATE, 67 update_values=update_values, 68 ).insert_and_get( 69 actor_name=self.actor.actor_name, 70 identifier=str(self.identifier), 71 **create_values, 72 ) 73 74 return schedule
@dataclass
class
ScheduleSpec:
15@dataclass 16class ScheduleSpec: 17 actor: Actor 18 crontab: str 19 paused: bool = False 20 identifier: str | UUID | None = None 21 uid: str | None = None 22 23 args: Iterable[Any] = field(default_factory=tuple) 24 kwargs: dict[str, Any] = field(default_factory=dict) 25 options: dict[str, Any] = field(default_factory=dict) 26 27 rel_obj: Any | None = None 28 29 send_on_save: bool = False 30 31 send_on_startup: bool = False 32 33 def get_args(self) -> bytes: 34 return pickle.dumps(self.args) 35 36 def get_kwargs(self) -> bytes: 37 return pickle.dumps(self.kwargs) 38 39 def get_options(self) -> bytes: 40 options = self.options 41 if self.uid is not None: 42 options["uid"] = self.uid 43 return pickle.dumps(options) 44 45 def update_or_create(self) -> Schedule: 46 from django.contrib.contenttypes.models import ContentType 47 48 from authentik.tasks.schedules.models import Schedule 49 50 update_values = { 51 "_uid": self.uid, 52 "paused": self.paused, 53 "args": self.get_args(), 54 "kwargs": self.get_kwargs(), 55 "options": self.get_options(), 56 } 57 if self.rel_obj is not None: 58 update_values["rel_obj_content_type"] = ContentType.objects.get_for_model(self.rel_obj) 59 update_values["rel_obj_id"] = str(self.rel_obj.pk) 60 create_values = { 61 **update_values, 62 "crontab": self.crontab, 63 } 64 65 schedule = Schedule.objects.on_conflict( 66 ["actor_name", "identifier"], 67 ConflictAction.UPDATE, 68 update_values=update_values, 69 ).insert_and_get( 70 actor_name=self.actor.actor_name, 71 identifier=str(self.identifier), 72 **create_values, 73 ) 74 75 return schedule
ScheduleSpec( actor: dramatiq.actor.Actor, crontab: str, paused: bool = False, identifier: str | uuid.UUID | None = None, uid: str | None = None, args: Iterable[typing.Any] = <factory>, kwargs: dict[str, typing.Any] = <factory>, options: dict[str, typing.Any] = <factory>, rel_obj: Any | None = None, send_on_save: bool = False, send_on_startup: bool = False)
def
update_or_create(unknown):
45 def update_or_create(self) -> Schedule: 46 from django.contrib.contenttypes.models import ContentType 47 48 from authentik.tasks.schedules.models import Schedule 49 50 update_values = { 51 "_uid": self.uid, 52 "paused": self.paused, 53 "args": self.get_args(), 54 "kwargs": self.get_kwargs(), 55 "options": self.get_options(), 56 } 57 if self.rel_obj is not None: 58 update_values["rel_obj_content_type"] = ContentType.objects.get_for_model(self.rel_obj) 59 update_values["rel_obj_id"] = str(self.rel_obj.pk) 60 create_values = { 61 **update_values, 62 "crontab": self.crontab, 63 } 64 65 schedule = Schedule.objects.on_conflict( 66 ["actor_name", "identifier"], 67 ConflictAction.UPDATE, 68 update_values=update_values, 69 ).insert_and_get( 70 actor_name=self.actor.actor_name, 71 identifier=str(self.identifier), 72 **create_values, 73 ) 74 75 return schedule