authentik.tasks.test
1from queue import PriorityQueue 2 3from django.utils.module_loading import import_string 4from django_dramatiq_postgres.conf import Conf 5from dramatiq import set_broker 6from dramatiq.broker import Broker, MessageProxy, get_broker 7from dramatiq.middleware.middleware import Middleware 8from dramatiq.middleware.retries import Retries 9from dramatiq.results.middleware import Results 10from dramatiq.worker import Worker, _ConsumerThread, _WorkerThread 11 12from authentik.tasks.broker import PostgresBroker 13from authentik.tasks.middleware import WorkerHealthcheckMiddleware 14 15TESTING_QUEUE = "testing" 16 17 18class TestWorker(Worker): 19 def __init__(self, broker: Broker): 20 super().__init__(broker=broker) 21 self.work_queue = PriorityQueue() 22 self.consumers = { 23 TESTING_QUEUE: _ConsumerThread( 24 broker=self.broker, 25 queue_name=TESTING_QUEUE, 26 prefetch=2, 27 work_queue=self.work_queue, 28 worker_timeout=1, 29 ), 30 } 31 self.consumers[TESTING_QUEUE].consumer = self.broker.consume( 32 queue_name=TESTING_QUEUE, 33 prefetch=2, 34 timeout=1, 35 ) 36 self._worker = _WorkerThread( 37 broker=self.broker, 38 consumers=self.consumers, 39 work_queue=self.work_queue, 40 worker_timeout=1, 41 ) 42 43 self.broker.emit_before("worker_boot", self) 44 self.broker.emit_after("worker_boot", self) 45 self.broker.emit_after("process_boot") 46 47 def process_message(self, message: MessageProxy): 48 self.work_queue.put((0, message)) 49 self.consumers[TESTING_QUEUE].consumer.in_processing.add(message.message_id) 50 self._worker.process_message(message) 51 52 53class TestBroker(PostgresBroker): 54 worker: TestWorker | None = None 55 56 def start(self): 57 self.worker = TestWorker(broker=self) 58 59 def close(self): 60 self.emit_before("worker_shutdown", self) 61 return super().close() 62 63 def enqueue(self, *args, **kwargs): 64 message = super().enqueue(*args, **kwargs).copy(queue_name=TESTING_QUEUE) 65 if not self.worker: 66 return message 67 self.worker.process_message(MessageProxy(message)) 68 return message 69 70 71def use_test_broker(): 72 old_broker = get_broker() 73 74 broker = TestBroker() 75 76 for actor_name in old_broker.get_declared_actors(): 77 actor = old_broker.get_actor(actor_name) 78 actor.broker = broker 79 actor.broker.declare_actor(actor) 80 81 for middleware_class, middleware_kwargs in Conf().middlewares: 82 middleware: Middleware = import_string(middleware_class)( 83 **middleware_kwargs, 84 ) 85 if isinstance(middleware, WorkerHealthcheckMiddleware): 86 middleware.port = 9102 87 if isinstance(middleware, Retries): 88 middleware.max_retries = 0 89 if isinstance(middleware, Results): 90 middleware.backend = import_string(Conf().result_backend)( 91 *Conf().result_backend_args, 92 **Conf().result_backend_kwargs, 93 ) 94 broker.add_middleware(middleware) 95 96 broker.start() 97 set_broker(broker) 98 return broker
19class TestWorker(Worker): 20 def __init__(self, broker: Broker): 21 super().__init__(broker=broker) 22 self.work_queue = PriorityQueue() 23 self.consumers = { 24 TESTING_QUEUE: _ConsumerThread( 25 broker=self.broker, 26 queue_name=TESTING_QUEUE, 27 prefetch=2, 28 work_queue=self.work_queue, 29 worker_timeout=1, 30 ), 31 } 32 self.consumers[TESTING_QUEUE].consumer = self.broker.consume( 33 queue_name=TESTING_QUEUE, 34 prefetch=2, 35 timeout=1, 36 ) 37 self._worker = _WorkerThread( 38 broker=self.broker, 39 consumers=self.consumers, 40 work_queue=self.work_queue, 41 worker_timeout=1, 42 ) 43 44 self.broker.emit_before("worker_boot", self) 45 self.broker.emit_after("worker_boot", self) 46 self.broker.emit_after("process_boot") 47 48 def process_message(self, message: MessageProxy): 49 self.work_queue.put((0, message)) 50 self.consumers[TESTING_QUEUE].consumer.in_processing.add(message.message_id) 51 self._worker.process_message(message)
Workers consume messages off of all declared queues and distribute those messages to individual worker threads for processing. Workers don't block the current thread so it's up to the caller to keep it alive.
Don't run more than one Worker per process.
Parameters: broker(Broker) queues(Set[str]): An optional subset of queues to listen on. By default, if this is not provided, the worker will listen on all declared queues. worker_timeout(int): The number of milliseconds workers should wake up after if the queue is idle. worker_threads(int): The number of worker threads to spawn.
20 def __init__(self, broker: Broker): 21 super().__init__(broker=broker) 22 self.work_queue = PriorityQueue() 23 self.consumers = { 24 TESTING_QUEUE: _ConsumerThread( 25 broker=self.broker, 26 queue_name=TESTING_QUEUE, 27 prefetch=2, 28 work_queue=self.work_queue, 29 worker_timeout=1, 30 ), 31 } 32 self.consumers[TESTING_QUEUE].consumer = self.broker.consume( 33 queue_name=TESTING_QUEUE, 34 prefetch=2, 35 timeout=1, 36 ) 37 self._worker = _WorkerThread( 38 broker=self.broker, 39 consumers=self.consumers, 40 work_queue=self.work_queue, 41 worker_timeout=1, 42 ) 43 44 self.broker.emit_before("worker_boot", self) 45 self.broker.emit_after("worker_boot", self) 46 self.broker.emit_after("process_boot")
54class TestBroker(PostgresBroker): 55 worker: TestWorker | None = None 56 57 def start(self): 58 self.worker = TestWorker(broker=self) 59 60 def close(self): 61 self.emit_before("worker_shutdown", self) 62 return super().close() 63 64 def enqueue(self, *args, **kwargs): 65 message = super().enqueue(*args, **kwargs).copy(queue_name=TESTING_QUEUE) 66 if not self.worker: 67 return message 68 self.worker.process_message(MessageProxy(message)) 69 return message
Base class for broker implementations.
Parameters:
middleware(list[Middleware]): The set of middleware that apply
to this broker. If you supply this parameter, you are
expected to declare all middleware. Most of the time,
you'll want to use :meth:.add_middleware instead.
Attributes: actor_options(set[str]): The names of all the options actors may overwrite when they are declared.
64 def enqueue(self, *args, **kwargs): 65 message = super().enqueue(*args, **kwargs).copy(queue_name=TESTING_QUEUE) 66 if not self.worker: 67 return message 68 self.worker.process_message(MessageProxy(message)) 69 return message
Enqueue a message on this broker.
Parameters: message(Message): The message to enqueue. delay(int): The number of milliseconds to delay the message for.
Returns: Message: Either the original message or a copy of it.
72def use_test_broker(): 73 old_broker = get_broker() 74 75 broker = TestBroker() 76 77 for actor_name in old_broker.get_declared_actors(): 78 actor = old_broker.get_actor(actor_name) 79 actor.broker = broker 80 actor.broker.declare_actor(actor) 81 82 for middleware_class, middleware_kwargs in Conf().middlewares: 83 middleware: Middleware = import_string(middleware_class)( 84 **middleware_kwargs, 85 ) 86 if isinstance(middleware, WorkerHealthcheckMiddleware): 87 middleware.port = 9102 88 if isinstance(middleware, Retries): 89 middleware.max_retries = 0 90 if isinstance(middleware, Results): 91 middleware.backend = import_string(Conf().result_backend)( 92 *Conf().result_backend_args, 93 **Conf().result_backend_kwargs, 94 ) 95 broker.add_middleware(middleware) 96 97 broker.start() 98 set_broker(broker) 99 return broker