authentik.tasks.tests.test_worker_middleware

 1from django.test import TestCase
 2from dramatiq import actor, get_broker
 3
 4from authentik.tasks.middleware import CurrentTask
 5from authentik.tasks.models import Task, TaskLog
 6
 7
 8class TestWorkerMiddleware(TestCase):
 9
10    def test_task_log(self):
11        @actor
12        def test_task():
13            self = CurrentTask.get_task()
14            self.info("foo")
15
16        test_task.send()
17        task = Task.objects.filter(actor_name=test_task.actor_name).first()
18        logs = list(
19            TaskLog.objects.filter(task=task).order_by("timestamp").values_list("event", flat=True)
20        )
21        self.assertEqual(
22            logs,
23            [
24                "Task has been queued",
25                "Task is being processed",
26                "foo",
27                "Task finished processing without errors",
28            ],
29        )
30        broker = get_broker()
31        del broker.actors[test_task.actor_name]
32
33    def test_task_exceptions(self):
34        @actor
35        def test_task():
36            raise ValueError("foo")
37
38        test_task.send()
39        task = Task.objects.filter(actor_name=test_task.actor_name).first()
40        logs = list(
41            TaskLog.objects.filter(task=task).order_by("timestamp").values_list("event", flat=True)
42        )
43        self.assertEqual(
44            logs,
45            [
46                "Task has been queued",
47                "Task is being processed",
48                "foo",
49            ],
50        )
51        broker = get_broker()
52        del broker.actors[test_task.actor_name]
class TestWorkerMiddleware(django.test.testcases.TestCase):
 9class TestWorkerMiddleware(TestCase):
10
11    def test_task_log(self):
12        @actor
13        def test_task():
14            self = CurrentTask.get_task()
15            self.info("foo")
16
17        test_task.send()
18        task = Task.objects.filter(actor_name=test_task.actor_name).first()
19        logs = list(
20            TaskLog.objects.filter(task=task).order_by("timestamp").values_list("event", flat=True)
21        )
22        self.assertEqual(
23            logs,
24            [
25                "Task has been queued",
26                "Task is being processed",
27                "foo",
28                "Task finished processing without errors",
29            ],
30        )
31        broker = get_broker()
32        del broker.actors[test_task.actor_name]
33
34    def test_task_exceptions(self):
35        @actor
36        def test_task():
37            raise ValueError("foo")
38
39        test_task.send()
40        task = Task.objects.filter(actor_name=test_task.actor_name).first()
41        logs = list(
42            TaskLog.objects.filter(task=task).order_by("timestamp").values_list("event", flat=True)
43        )
44        self.assertEqual(
45            logs,
46            [
47                "Task has been queued",
48                "Task is being processed",
49                "foo",
50            ],
51        )
52        broker = get_broker()
53        del broker.actors[test_task.actor_name]

Similar to TransactionTestCase, but use transaction.atomic() to achieve test isolation.

In most situations, TestCase should be preferred to TransactionTestCase as it allows faster execution. However, there are some situations where using TransactionTestCase might be necessary (e.g. testing some transactional behavior).

On database backends with no transaction support, TestCase behaves as TransactionTestCase.

def test_task_log(self):
11    def test_task_log(self):
12        @actor
13        def test_task():
14            self = CurrentTask.get_task()
15            self.info("foo")
16
17        test_task.send()
18        task = Task.objects.filter(actor_name=test_task.actor_name).first()
19        logs = list(
20            TaskLog.objects.filter(task=task).order_by("timestamp").values_list("event", flat=True)
21        )
22        self.assertEqual(
23            logs,
24            [
25                "Task has been queued",
26                "Task is being processed",
27                "foo",
28                "Task finished processing without errors",
29            ],
30        )
31        broker = get_broker()
32        del broker.actors[test_task.actor_name]
def test_task_exceptions(self):
34    def test_task_exceptions(self):
35        @actor
36        def test_task():
37            raise ValueError("foo")
38
39        test_task.send()
40        task = Task.objects.filter(actor_name=test_task.actor_name).first()
41        logs = list(
42            TaskLog.objects.filter(task=task).order_by("timestamp").values_list("event", flat=True)
43        )
44        self.assertEqual(
45            logs,
46            [
47                "Task has been queued",
48                "Task is being processed",
49                "foo",
50            ],
51        )
52        broker = get_broker()
53        del broker.actors[test_task.actor_name]