authentik.enterprise.providers.ssf.tests.test_tasks

  1from jwt import decode_complete
  2from requests_mock import Mocker
  3from rest_framework.test import APITestCase
  4
  5from authentik.core.models import Application
  6from authentik.core.tests.utils import create_test_cert
  7from authentik.enterprise.providers.ssf.models import (
  8    DeliveryMethods,
  9    EventTypes,
 10    SSFProvider,
 11    Stream,
 12    StreamStatus,
 13)
 14from authentik.enterprise.providers.ssf.tasks import send_ssf_event
 15from authentik.lib.generators import generate_id
 16from authentik.tasks.models import TaskLog
 17
 18
 19class TestTasks(APITestCase):
 20    def setUp(self):
 21        self.application = Application.objects.create(name=generate_id(), slug=generate_id())
 22        self.provider = SSFProvider.objects.create(
 23            name=generate_id(),
 24            signing_key=create_test_cert(),
 25            backchannel_application=self.application,
 26        )
 27
 28    def test_push_simple(self):
 29        stream = Stream.objects.create(
 30            provider=self.provider,
 31            delivery_method=DeliveryMethods.RFC_PUSH,
 32            endpoint_url="http://localhost/ssf-push",
 33        )
 34        event_data = stream.prepare_event_payload(
 35            EventTypes.SET_VERIFICATION,
 36            {"state": None},
 37            sub_id={"format": "opaque", "id": str(stream.uuid)},
 38        )
 39        with Mocker() as mocker:
 40            mocker.post("http://localhost/ssf-push", status_code=202)
 41            send_ssf_event.send_with_options(
 42                args=(stream.pk, event_data), rel_obj=stream.provider
 43            ).get_result(block=True, timeout=1)
 44        self.assertEqual(
 45            mocker.request_history[0].headers["Content-Type"], "application/secevent+jwt"
 46        )
 47        jwt = decode_complete(mocker.request_history[0].body, options={"verify_signature": False})
 48        self.assertEqual(jwt["header"]["typ"], "secevent+jwt")
 49        self.assertIsNone(jwt["payload"]["events"][EventTypes.SET_VERIFICATION]["state"])
 50
 51    def test_push_auth(self):
 52        auth = generate_id()
 53        stream = Stream.objects.create(
 54            provider=self.provider,
 55            delivery_method=DeliveryMethods.RFC_PUSH,
 56            endpoint_url="http://localhost/ssf-push",
 57            authorization_header=auth,
 58        )
 59        event_data = stream.prepare_event_payload(
 60            EventTypes.SET_VERIFICATION,
 61            {"state": None},
 62            sub_id={"format": "opaque", "id": str(stream.uuid)},
 63        )
 64        with Mocker() as mocker:
 65            mocker.post("http://localhost/ssf-push", status_code=202)
 66            send_ssf_event.send_with_options(
 67                args=(stream.pk, event_data), rel_obj=stream.provider
 68            ).get_result(block=True, timeout=1)
 69        self.assertEqual(mocker.request_history[0].headers["Authorization"], auth)
 70        self.assertEqual(
 71            mocker.request_history[0].headers["Content-Type"], "application/secevent+jwt"
 72        )
 73        jwt = decode_complete(mocker.request_history[0].body, options={"verify_signature": False})
 74        self.assertEqual(jwt["header"]["typ"], "secevent+jwt")
 75        self.assertIsNone(jwt["payload"]["events"][EventTypes.SET_VERIFICATION]["state"])
 76
 77    def test_push_stream_disable(self):
 78        auth = generate_id()
 79        stream = Stream.objects.create(
 80            provider=self.provider,
 81            delivery_method=DeliveryMethods.RFC_PUSH,
 82            endpoint_url="http://localhost/ssf-push",
 83            authorization_header=auth,
 84            status=StreamStatus.DISABLED,
 85        )
 86        event_data = stream.prepare_event_payload(
 87            EventTypes.SET_VERIFICATION,
 88            {"state": None},
 89            sub_id={"format": "opaque", "id": str(stream.uuid)},
 90        )
 91        with Mocker() as mocker:
 92            mocker.post("http://localhost/ssf-push", status_code=202)
 93            send_ssf_event.send_with_options(
 94                args=(stream.pk, event_data), rel_obj=stream.provider
 95            ).get_result(block=True, timeout=1)
 96        jwt = decode_complete(mocker.request_history[0].body, options={"verify_signature": False})
 97        self.assertEqual(jwt["header"]["typ"], "secevent+jwt")
 98        self.assertIsNone(jwt["payload"]["events"][EventTypes.SET_VERIFICATION]["state"])
 99        self.assertFalse(Stream.objects.filter(pk=stream.pk).exists())
100
101    def test_push_error(self):
102        stream = Stream.objects.create(
103            provider=self.provider,
104            delivery_method=DeliveryMethods.RFC_PUSH,
105            endpoint_url="http://localhost/ssf-push",
106        )
107        event_data = stream.prepare_event_payload(
108            EventTypes.SET_VERIFICATION,
109            {"state": None},
110            sub_id={"format": "opaque", "id": str(stream.uuid)},
111        )
112        with Mocker() as mocker:
113            mocker.post("http://localhost/ssf-push", text="error", status_code=400)
114            send_ssf_event.send_with_options(
115                args=(stream.pk, event_data), rel_obj=stream.provider
116            ).get_result(block=True, timeout=1)
117        logs = (
118            TaskLog.objects.filter(task__actor_name=send_ssf_event.actor_name)
119            .order_by("timestamp")
120            .filter(event="Failed to send request")
121            .first()
122        )
123        self.assertEqual(logs.attributes, {"response": {"status": 400, "content": "error"}})
class TestTasks(rest_framework.test.APITestCase):
 20class TestTasks(APITestCase):
 21    def setUp(self):
 22        self.application = Application.objects.create(name=generate_id(), slug=generate_id())
 23        self.provider = SSFProvider.objects.create(
 24            name=generate_id(),
 25            signing_key=create_test_cert(),
 26            backchannel_application=self.application,
 27        )
 28
 29    def test_push_simple(self):
 30        stream = Stream.objects.create(
 31            provider=self.provider,
 32            delivery_method=DeliveryMethods.RFC_PUSH,
 33            endpoint_url="http://localhost/ssf-push",
 34        )
 35        event_data = stream.prepare_event_payload(
 36            EventTypes.SET_VERIFICATION,
 37            {"state": None},
 38            sub_id={"format": "opaque", "id": str(stream.uuid)},
 39        )
 40        with Mocker() as mocker:
 41            mocker.post("http://localhost/ssf-push", status_code=202)
 42            send_ssf_event.send_with_options(
 43                args=(stream.pk, event_data), rel_obj=stream.provider
 44            ).get_result(block=True, timeout=1)
 45        self.assertEqual(
 46            mocker.request_history[0].headers["Content-Type"], "application/secevent+jwt"
 47        )
 48        jwt = decode_complete(mocker.request_history[0].body, options={"verify_signature": False})
 49        self.assertEqual(jwt["header"]["typ"], "secevent+jwt")
 50        self.assertIsNone(jwt["payload"]["events"][EventTypes.SET_VERIFICATION]["state"])
 51
 52    def test_push_auth(self):
 53        auth = generate_id()
 54        stream = Stream.objects.create(
 55            provider=self.provider,
 56            delivery_method=DeliveryMethods.RFC_PUSH,
 57            endpoint_url="http://localhost/ssf-push",
 58            authorization_header=auth,
 59        )
 60        event_data = stream.prepare_event_payload(
 61            EventTypes.SET_VERIFICATION,
 62            {"state": None},
 63            sub_id={"format": "opaque", "id": str(stream.uuid)},
 64        )
 65        with Mocker() as mocker:
 66            mocker.post("http://localhost/ssf-push", status_code=202)
 67            send_ssf_event.send_with_options(
 68                args=(stream.pk, event_data), rel_obj=stream.provider
 69            ).get_result(block=True, timeout=1)
 70        self.assertEqual(mocker.request_history[0].headers["Authorization"], auth)
 71        self.assertEqual(
 72            mocker.request_history[0].headers["Content-Type"], "application/secevent+jwt"
 73        )
 74        jwt = decode_complete(mocker.request_history[0].body, options={"verify_signature": False})
 75        self.assertEqual(jwt["header"]["typ"], "secevent+jwt")
 76        self.assertIsNone(jwt["payload"]["events"][EventTypes.SET_VERIFICATION]["state"])
 77
 78    def test_push_stream_disable(self):
 79        auth = generate_id()
 80        stream = Stream.objects.create(
 81            provider=self.provider,
 82            delivery_method=DeliveryMethods.RFC_PUSH,
 83            endpoint_url="http://localhost/ssf-push",
 84            authorization_header=auth,
 85            status=StreamStatus.DISABLED,
 86        )
 87        event_data = stream.prepare_event_payload(
 88            EventTypes.SET_VERIFICATION,
 89            {"state": None},
 90            sub_id={"format": "opaque", "id": str(stream.uuid)},
 91        )
 92        with Mocker() as mocker:
 93            mocker.post("http://localhost/ssf-push", status_code=202)
 94            send_ssf_event.send_with_options(
 95                args=(stream.pk, event_data), rel_obj=stream.provider
 96            ).get_result(block=True, timeout=1)
 97        jwt = decode_complete(mocker.request_history[0].body, options={"verify_signature": False})
 98        self.assertEqual(jwt["header"]["typ"], "secevent+jwt")
 99        self.assertIsNone(jwt["payload"]["events"][EventTypes.SET_VERIFICATION]["state"])
100        self.assertFalse(Stream.objects.filter(pk=stream.pk).exists())
101
102    def test_push_error(self):
103        stream = Stream.objects.create(
104            provider=self.provider,
105            delivery_method=DeliveryMethods.RFC_PUSH,
106            endpoint_url="http://localhost/ssf-push",
107        )
108        event_data = stream.prepare_event_payload(
109            EventTypes.SET_VERIFICATION,
110            {"state": None},
111            sub_id={"format": "opaque", "id": str(stream.uuid)},
112        )
113        with Mocker() as mocker:
114            mocker.post("http://localhost/ssf-push", text="error", status_code=400)
115            send_ssf_event.send_with_options(
116                args=(stream.pk, event_data), rel_obj=stream.provider
117            ).get_result(block=True, timeout=1)
118        logs = (
119            TaskLog.objects.filter(task__actor_name=send_ssf_event.actor_name)
120            .order_by("timestamp")
121            .filter(event="Failed to send request")
122            .first()
123        )
124        self.assertEqual(logs.attributes, {"response": {"status": 400, "content": "error"}})

Similar to TransactionTestCase, but use transaction.atomic() to achieve test isolation.

In most situations, TestCase should be preferred to TransactionTestCase as it allows faster execution. However, there are some situations where using TransactionTestCase might be necessary (e.g. testing some transactional behavior).

On database backends with no transaction support, TestCase behaves as TransactionTestCase.

def setUp(self):
21    def setUp(self):
22        self.application = Application.objects.create(name=generate_id(), slug=generate_id())
23        self.provider = SSFProvider.objects.create(
24            name=generate_id(),
25            signing_key=create_test_cert(),
26            backchannel_application=self.application,
27        )

Hook method for setting up the test fixture before exercising it.

def test_push_simple(self):
29    def test_push_simple(self):
30        stream = Stream.objects.create(
31            provider=self.provider,
32            delivery_method=DeliveryMethods.RFC_PUSH,
33            endpoint_url="http://localhost/ssf-push",
34        )
35        event_data = stream.prepare_event_payload(
36            EventTypes.SET_VERIFICATION,
37            {"state": None},
38            sub_id={"format": "opaque", "id": str(stream.uuid)},
39        )
40        with Mocker() as mocker:
41            mocker.post("http://localhost/ssf-push", status_code=202)
42            send_ssf_event.send_with_options(
43                args=(stream.pk, event_data), rel_obj=stream.provider
44            ).get_result(block=True, timeout=1)
45        self.assertEqual(
46            mocker.request_history[0].headers["Content-Type"], "application/secevent+jwt"
47        )
48        jwt = decode_complete(mocker.request_history[0].body, options={"verify_signature": False})
49        self.assertEqual(jwt["header"]["typ"], "secevent+jwt")
50        self.assertIsNone(jwt["payload"]["events"][EventTypes.SET_VERIFICATION]["state"])
def test_push_auth(self):
52    def test_push_auth(self):
53        auth = generate_id()
54        stream = Stream.objects.create(
55            provider=self.provider,
56            delivery_method=DeliveryMethods.RFC_PUSH,
57            endpoint_url="http://localhost/ssf-push",
58            authorization_header=auth,
59        )
60        event_data = stream.prepare_event_payload(
61            EventTypes.SET_VERIFICATION,
62            {"state": None},
63            sub_id={"format": "opaque", "id": str(stream.uuid)},
64        )
65        with Mocker() as mocker:
66            mocker.post("http://localhost/ssf-push", status_code=202)
67            send_ssf_event.send_with_options(
68                args=(stream.pk, event_data), rel_obj=stream.provider
69            ).get_result(block=True, timeout=1)
70        self.assertEqual(mocker.request_history[0].headers["Authorization"], auth)
71        self.assertEqual(
72            mocker.request_history[0].headers["Content-Type"], "application/secevent+jwt"
73        )
74        jwt = decode_complete(mocker.request_history[0].body, options={"verify_signature": False})
75        self.assertEqual(jwt["header"]["typ"], "secevent+jwt")
76        self.assertIsNone(jwt["payload"]["events"][EventTypes.SET_VERIFICATION]["state"])
def test_push_stream_disable(self):
 78    def test_push_stream_disable(self):
 79        auth = generate_id()
 80        stream = Stream.objects.create(
 81            provider=self.provider,
 82            delivery_method=DeliveryMethods.RFC_PUSH,
 83            endpoint_url="http://localhost/ssf-push",
 84            authorization_header=auth,
 85            status=StreamStatus.DISABLED,
 86        )
 87        event_data = stream.prepare_event_payload(
 88            EventTypes.SET_VERIFICATION,
 89            {"state": None},
 90            sub_id={"format": "opaque", "id": str(stream.uuid)},
 91        )
 92        with Mocker() as mocker:
 93            mocker.post("http://localhost/ssf-push", status_code=202)
 94            send_ssf_event.send_with_options(
 95                args=(stream.pk, event_data), rel_obj=stream.provider
 96            ).get_result(block=True, timeout=1)
 97        jwt = decode_complete(mocker.request_history[0].body, options={"verify_signature": False})
 98        self.assertEqual(jwt["header"]["typ"], "secevent+jwt")
 99        self.assertIsNone(jwt["payload"]["events"][EventTypes.SET_VERIFICATION]["state"])
100        self.assertFalse(Stream.objects.filter(pk=stream.pk).exists())
def test_push_error(self):
102    def test_push_error(self):
103        stream = Stream.objects.create(
104            provider=self.provider,
105            delivery_method=DeliveryMethods.RFC_PUSH,
106            endpoint_url="http://localhost/ssf-push",
107        )
108        event_data = stream.prepare_event_payload(
109            EventTypes.SET_VERIFICATION,
110            {"state": None},
111            sub_id={"format": "opaque", "id": str(stream.uuid)},
112        )
113        with Mocker() as mocker:
114            mocker.post("http://localhost/ssf-push", text="error", status_code=400)
115            send_ssf_event.send_with_options(
116                args=(stream.pk, event_data), rel_obj=stream.provider
117            ).get_result(block=True, timeout=1)
118        logs = (
119            TaskLog.objects.filter(task__actor_name=send_ssf_event.actor_name)
120            .order_by("timestamp")
121            .filter(event="Failed to send request")
122            .first()
123        )
124        self.assertEqual(logs.attributes, {"response": {"status": 400, "content": "error"}})