authentik.tasks.test

 1from queue import PriorityQueue
 2
 3from django.utils.module_loading import import_string
 4from django_dramatiq_postgres.conf import Conf
 5from dramatiq import set_broker
 6from dramatiq.broker import Broker, MessageProxy, get_broker
 7from dramatiq.middleware.middleware import Middleware
 8from dramatiq.middleware.retries import Retries
 9from dramatiq.results.middleware import Results
10from dramatiq.worker import Worker, _ConsumerThread, _WorkerThread
11
12from authentik.tasks.broker import PostgresBroker
13from authentik.tasks.middleware import WorkerHealthcheckMiddleware
14
15TESTING_QUEUE = "testing"
16
17
18class TestWorker(Worker):
19    def __init__(self, broker: Broker):
20        super().__init__(broker=broker)
21        self.work_queue = PriorityQueue()
22        self.consumers = {
23            TESTING_QUEUE: _ConsumerThread(
24                broker=self.broker,
25                queue_name=TESTING_QUEUE,
26                prefetch=2,
27                work_queue=self.work_queue,
28                worker_timeout=1,
29            ),
30        }
31        self.consumers[TESTING_QUEUE].consumer = self.broker.consume(
32            queue_name=TESTING_QUEUE,
33            prefetch=2,
34            timeout=1,
35        )
36        self._worker = _WorkerThread(
37            broker=self.broker,
38            consumers=self.consumers,
39            work_queue=self.work_queue,
40            worker_timeout=1,
41        )
42
43        self.broker.emit_before("worker_boot", self)
44        self.broker.emit_after("worker_boot", self)
45        self.broker.emit_after("process_boot")
46
47    def process_message(self, message: MessageProxy):
48        self.work_queue.put((0, message))
49        self.consumers[TESTING_QUEUE].consumer.in_processing.add(message.message_id)
50        self._worker.process_message(message)
51
52
53class TestBroker(PostgresBroker):
54    worker: TestWorker | None = None
55
56    def start(self):
57        self.worker = TestWorker(broker=self)
58
59    def close(self):
60        self.emit_before("worker_shutdown", self)
61        return super().close()
62
63    def enqueue(self, *args, **kwargs):
64        message = super().enqueue(*args, **kwargs).copy(queue_name=TESTING_QUEUE)
65        if not self.worker:
66            return message
67        self.worker.process_message(MessageProxy(message))
68        return message
69
70
71def use_test_broker():
72    old_broker = get_broker()
73
74    broker = TestBroker()
75
76    for actor_name in old_broker.get_declared_actors():
77        actor = old_broker.get_actor(actor_name)
78        actor.broker = broker
79        actor.broker.declare_actor(actor)
80
81    for middleware_class, middleware_kwargs in Conf().middlewares:
82        middleware: Middleware = import_string(middleware_class)(
83            **middleware_kwargs,
84        )
85        if isinstance(middleware, WorkerHealthcheckMiddleware):
86            middleware.port = 9102
87        if isinstance(middleware, Retries):
88            middleware.max_retries = 0
89        if isinstance(middleware, Results):
90            middleware.backend = import_string(Conf().result_backend)(
91                *Conf().result_backend_args,
92                **Conf().result_backend_kwargs,
93            )
94        broker.add_middleware(middleware)
95
96    broker.start()
97    set_broker(broker)
98    return broker
TESTING_QUEUE = 'testing'
class TestWorker(dramatiq.worker.Worker):
19class TestWorker(Worker):
20    def __init__(self, broker: Broker):
21        super().__init__(broker=broker)
22        self.work_queue = PriorityQueue()
23        self.consumers = {
24            TESTING_QUEUE: _ConsumerThread(
25                broker=self.broker,
26                queue_name=TESTING_QUEUE,
27                prefetch=2,
28                work_queue=self.work_queue,
29                worker_timeout=1,
30            ),
31        }
32        self.consumers[TESTING_QUEUE].consumer = self.broker.consume(
33            queue_name=TESTING_QUEUE,
34            prefetch=2,
35            timeout=1,
36        )
37        self._worker = _WorkerThread(
38            broker=self.broker,
39            consumers=self.consumers,
40            work_queue=self.work_queue,
41            worker_timeout=1,
42        )
43
44        self.broker.emit_before("worker_boot", self)
45        self.broker.emit_after("worker_boot", self)
46        self.broker.emit_after("process_boot")
47
48    def process_message(self, message: MessageProxy):
49        self.work_queue.put((0, message))
50        self.consumers[TESTING_QUEUE].consumer.in_processing.add(message.message_id)
51        self._worker.process_message(message)

Workers consume messages off of all declared queues and distribute those messages to individual worker threads for processing. Workers don't block the current thread so it's up to the caller to keep it alive.

Don't run more than one Worker per process.

Parameters: broker(Broker) queues(Set[str]): An optional subset of queues to listen on. By default, if this is not provided, the worker will listen on all declared queues. worker_timeout(int): The number of milliseconds workers should wake up after if the queue is idle. worker_threads(int): The number of worker threads to spawn.

TestWorker(broker: dramatiq.broker.Broker)
20    def __init__(self, broker: Broker):
21        super().__init__(broker=broker)
22        self.work_queue = PriorityQueue()
23        self.consumers = {
24            TESTING_QUEUE: _ConsumerThread(
25                broker=self.broker,
26                queue_name=TESTING_QUEUE,
27                prefetch=2,
28                work_queue=self.work_queue,
29                worker_timeout=1,
30            ),
31        }
32        self.consumers[TESTING_QUEUE].consumer = self.broker.consume(
33            queue_name=TESTING_QUEUE,
34            prefetch=2,
35            timeout=1,
36        )
37        self._worker = _WorkerThread(
38            broker=self.broker,
39            consumers=self.consumers,
40            work_queue=self.work_queue,
41            worker_timeout=1,
42        )
43
44        self.broker.emit_before("worker_boot", self)
45        self.broker.emit_after("worker_boot", self)
46        self.broker.emit_after("process_boot")
work_queue
consumers
def process_message(self, message: dramatiq.broker.MessageProxy):
48    def process_message(self, message: MessageProxy):
49        self.work_queue.put((0, message))
50        self.consumers[TESTING_QUEUE].consumer.in_processing.add(message.message_id)
51        self._worker.process_message(message)
class TestBroker(django_dramatiq_postgres.broker.PostgresBroker):
54class TestBroker(PostgresBroker):
55    worker: TestWorker | None = None
56
57    def start(self):
58        self.worker = TestWorker(broker=self)
59
60    def close(self):
61        self.emit_before("worker_shutdown", self)
62        return super().close()
63
64    def enqueue(self, *args, **kwargs):
65        message = super().enqueue(*args, **kwargs).copy(queue_name=TESTING_QUEUE)
66        if not self.worker:
67            return message
68        self.worker.process_message(MessageProxy(message))
69        return message

Base class for broker implementations.

Parameters: middleware(list[Middleware]): The set of middleware that apply to this broker. If you supply this parameter, you are expected to declare all middleware. Most of the time, you'll want to use :meth:.add_middleware instead.

Attributes: actor_options(set[str]): The names of all the options actors may overwrite when they are declared.

worker: TestWorker | None = None
def start(self):
57    def start(self):
58        self.worker = TestWorker(broker=self)
def close(self):
60    def close(self):
61        self.emit_before("worker_shutdown", self)
62        return super().close()

Close this broker and perform any necessary cleanup actions.

def enqueue(self, *args, **kwargs):
64    def enqueue(self, *args, **kwargs):
65        message = super().enqueue(*args, **kwargs).copy(queue_name=TESTING_QUEUE)
66        if not self.worker:
67            return message
68        self.worker.process_message(MessageProxy(message))
69        return message

Enqueue a message on this broker.

Parameters: message(Message): The message to enqueue. delay(int): The number of milliseconds to delay the message for.

Returns: Message: Either the original message or a copy of it.

def use_test_broker():
72def use_test_broker():
73    old_broker = get_broker()
74
75    broker = TestBroker()
76
77    for actor_name in old_broker.get_declared_actors():
78        actor = old_broker.get_actor(actor_name)
79        actor.broker = broker
80        actor.broker.declare_actor(actor)
81
82    for middleware_class, middleware_kwargs in Conf().middlewares:
83        middleware: Middleware = import_string(middleware_class)(
84            **middleware_kwargs,
85        )
86        if isinstance(middleware, WorkerHealthcheckMiddleware):
87            middleware.port = 9102
88        if isinstance(middleware, Retries):
89            middleware.max_retries = 0
90        if isinstance(middleware, Results):
91            middleware.backend = import_string(Conf().result_backend)(
92                *Conf().result_backend_args,
93                **Conf().result_backend_kwargs,
94            )
95        broker.add_middleware(middleware)
96
97    broker.start()
98    set_broker(broker)
99    return broker