authentik.tasks.schedules.common

 1import pickle  # nosec
 2from collections.abc import Iterable
 3from dataclasses import dataclass, field
 4from typing import TYPE_CHECKING, Any
 5from uuid import UUID
 6
 7from dramatiq.actor import Actor
 8from psqlextra.types import ConflictAction
 9
10if TYPE_CHECKING:
11    from authentik.tasks.schedules.models import Schedule
12
13
14@dataclass
15class ScheduleSpec:
16    actor: Actor
17    crontab: str
18    paused: bool = False
19    identifier: str | UUID | None = None
20    uid: str | None = None
21
22    args: Iterable[Any] = field(default_factory=tuple)
23    kwargs: dict[str, Any] = field(default_factory=dict)
24    options: dict[str, Any] = field(default_factory=dict)
25
26    rel_obj: Any | None = None
27
28    send_on_save: bool = False
29
30    send_on_startup: bool = False
31
32    def get_args(self) -> bytes:
33        return pickle.dumps(self.args)
34
35    def get_kwargs(self) -> bytes:
36        return pickle.dumps(self.kwargs)
37
38    def get_options(self) -> bytes:
39        options = self.options
40        if self.uid is not None:
41            options["uid"] = self.uid
42        return pickle.dumps(options)
43
44    def update_or_create(self) -> Schedule:
45        from django.contrib.contenttypes.models import ContentType
46
47        from authentik.tasks.schedules.models import Schedule
48
49        update_values = {
50            "_uid": self.uid,
51            "paused": self.paused,
52            "args": self.get_args(),
53            "kwargs": self.get_kwargs(),
54            "options": self.get_options(),
55        }
56        if self.rel_obj is not None:
57            update_values["rel_obj_content_type"] = ContentType.objects.get_for_model(self.rel_obj)
58            update_values["rel_obj_id"] = str(self.rel_obj.pk)
59        create_values = {
60            **update_values,
61            "crontab": self.crontab,
62        }
63
64        schedule = Schedule.objects.on_conflict(
65            ["actor_name", "identifier"],
66            ConflictAction.UPDATE,
67            update_values=update_values,
68        ).insert_and_get(
69            actor_name=self.actor.actor_name,
70            identifier=str(self.identifier),
71            **create_values,
72        )
73
74        return schedule
@dataclass
class ScheduleSpec:
15@dataclass
16class ScheduleSpec:
17    actor: Actor
18    crontab: str
19    paused: bool = False
20    identifier: str | UUID | None = None
21    uid: str | None = None
22
23    args: Iterable[Any] = field(default_factory=tuple)
24    kwargs: dict[str, Any] = field(default_factory=dict)
25    options: dict[str, Any] = field(default_factory=dict)
26
27    rel_obj: Any | None = None
28
29    send_on_save: bool = False
30
31    send_on_startup: bool = False
32
33    def get_args(self) -> bytes:
34        return pickle.dumps(self.args)
35
36    def get_kwargs(self) -> bytes:
37        return pickle.dumps(self.kwargs)
38
39    def get_options(self) -> bytes:
40        options = self.options
41        if self.uid is not None:
42            options["uid"] = self.uid
43        return pickle.dumps(options)
44
45    def update_or_create(self) -> Schedule:
46        from django.contrib.contenttypes.models import ContentType
47
48        from authentik.tasks.schedules.models import Schedule
49
50        update_values = {
51            "_uid": self.uid,
52            "paused": self.paused,
53            "args": self.get_args(),
54            "kwargs": self.get_kwargs(),
55            "options": self.get_options(),
56        }
57        if self.rel_obj is not None:
58            update_values["rel_obj_content_type"] = ContentType.objects.get_for_model(self.rel_obj)
59            update_values["rel_obj_id"] = str(self.rel_obj.pk)
60        create_values = {
61            **update_values,
62            "crontab": self.crontab,
63        }
64
65        schedule = Schedule.objects.on_conflict(
66            ["actor_name", "identifier"],
67            ConflictAction.UPDATE,
68            update_values=update_values,
69        ).insert_and_get(
70            actor_name=self.actor.actor_name,
71            identifier=str(self.identifier),
72            **create_values,
73        )
74
75        return schedule
ScheduleSpec( actor: dramatiq.actor.Actor, crontab: str, paused: bool = False, identifier: str | uuid.UUID | None = None, uid: str | None = None, args: Iterable[typing.Any] = <factory>, kwargs: dict[str, typing.Any] = <factory>, options: dict[str, typing.Any] = <factory>, rel_obj: Any | None = None, send_on_save: bool = False, send_on_startup: bool = False)
actor: dramatiq.actor.Actor
crontab: str
paused: bool = False
identifier: str | uuid.UUID | None = None
uid: str | None = None
args: Iterable[typing.Any]
kwargs: dict[str, typing.Any]
options: dict[str, typing.Any]
rel_obj: Any | None = None
send_on_save: bool = False
send_on_startup: bool = False
def get_args(self) -> bytes:
33    def get_args(self) -> bytes:
34        return pickle.dumps(self.args)
def get_kwargs(self) -> bytes:
36    def get_kwargs(self) -> bytes:
37        return pickle.dumps(self.kwargs)
def get_options(self) -> bytes:
39    def get_options(self) -> bytes:
40        options = self.options
41        if self.uid is not None:
42            options["uid"] = self.uid
43        return pickle.dumps(options)
def update_or_create(unknown):
45    def update_or_create(self) -> Schedule:
46        from django.contrib.contenttypes.models import ContentType
47
48        from authentik.tasks.schedules.models import Schedule
49
50        update_values = {
51            "_uid": self.uid,
52            "paused": self.paused,
53            "args": self.get_args(),
54            "kwargs": self.get_kwargs(),
55            "options": self.get_options(),
56        }
57        if self.rel_obj is not None:
58            update_values["rel_obj_content_type"] = ContentType.objects.get_for_model(self.rel_obj)
59            update_values["rel_obj_id"] = str(self.rel_obj.pk)
60        create_values = {
61            **update_values,
62            "crontab": self.crontab,
63        }
64
65        schedule = Schedule.objects.on_conflict(
66            ["actor_name", "identifier"],
67            ConflictAction.UPDATE,
68            update_values=update_values,
69        ).insert_and_get(
70            actor_name=self.actor.actor_name,
71            identifier=str(self.identifier),
72            **create_values,
73        )
74
75        return schedule