authentik.enterprise.providers.ssf.tests.test_tasks
1from jwt import decode_complete 2from requests_mock import Mocker 3from rest_framework.test import APITestCase 4 5from authentik.core.models import Application 6from authentik.core.tests.utils import create_test_cert 7from authentik.enterprise.providers.ssf.models import ( 8 DeliveryMethods, 9 EventTypes, 10 SSFProvider, 11 Stream, 12 StreamStatus, 13) 14from authentik.enterprise.providers.ssf.tasks import send_ssf_event 15from authentik.lib.generators import generate_id 16from authentik.tasks.models import TaskLog 17 18 19class TestTasks(APITestCase): 20 def setUp(self): 21 self.application = Application.objects.create(name=generate_id(), slug=generate_id()) 22 self.provider = SSFProvider.objects.create( 23 name=generate_id(), 24 signing_key=create_test_cert(), 25 backchannel_application=self.application, 26 ) 27 28 def test_push_simple(self): 29 stream = Stream.objects.create( 30 provider=self.provider, 31 delivery_method=DeliveryMethods.RFC_PUSH, 32 endpoint_url="http://localhost/ssf-push", 33 ) 34 event_data = stream.prepare_event_payload( 35 EventTypes.SET_VERIFICATION, 36 {"state": None}, 37 sub_id={"format": "opaque", "id": str(stream.uuid)}, 38 ) 39 with Mocker() as mocker: 40 mocker.post("http://localhost/ssf-push", status_code=202) 41 send_ssf_event.send_with_options( 42 args=(stream.pk, event_data), rel_obj=stream.provider 43 ).get_result(block=True, timeout=1) 44 self.assertEqual( 45 mocker.request_history[0].headers["Content-Type"], "application/secevent+jwt" 46 ) 47 jwt = decode_complete(mocker.request_history[0].body, options={"verify_signature": False}) 48 self.assertEqual(jwt["header"]["typ"], "secevent+jwt") 49 self.assertIsNone(jwt["payload"]["events"][EventTypes.SET_VERIFICATION]["state"]) 50 51 def test_push_auth(self): 52 auth = generate_id() 53 stream = Stream.objects.create( 54 provider=self.provider, 55 delivery_method=DeliveryMethods.RFC_PUSH, 56 endpoint_url="http://localhost/ssf-push", 57 authorization_header=auth, 58 ) 59 event_data = stream.prepare_event_payload( 60 EventTypes.SET_VERIFICATION, 61 {"state": None}, 62 sub_id={"format": "opaque", "id": str(stream.uuid)}, 63 ) 64 with Mocker() as mocker: 65 mocker.post("http://localhost/ssf-push", status_code=202) 66 send_ssf_event.send_with_options( 67 args=(stream.pk, event_data), rel_obj=stream.provider 68 ).get_result(block=True, timeout=1) 69 self.assertEqual(mocker.request_history[0].headers["Authorization"], auth) 70 self.assertEqual( 71 mocker.request_history[0].headers["Content-Type"], "application/secevent+jwt" 72 ) 73 jwt = decode_complete(mocker.request_history[0].body, options={"verify_signature": False}) 74 self.assertEqual(jwt["header"]["typ"], "secevent+jwt") 75 self.assertIsNone(jwt["payload"]["events"][EventTypes.SET_VERIFICATION]["state"]) 76 77 def test_push_stream_disable(self): 78 auth = generate_id() 79 stream = Stream.objects.create( 80 provider=self.provider, 81 delivery_method=DeliveryMethods.RFC_PUSH, 82 endpoint_url="http://localhost/ssf-push", 83 authorization_header=auth, 84 status=StreamStatus.DISABLED, 85 ) 86 event_data = stream.prepare_event_payload( 87 EventTypes.SET_VERIFICATION, 88 {"state": None}, 89 sub_id={"format": "opaque", "id": str(stream.uuid)}, 90 ) 91 with Mocker() as mocker: 92 mocker.post("http://localhost/ssf-push", status_code=202) 93 send_ssf_event.send_with_options( 94 args=(stream.pk, event_data), rel_obj=stream.provider 95 ).get_result(block=True, timeout=1) 96 jwt = decode_complete(mocker.request_history[0].body, options={"verify_signature": False}) 97 self.assertEqual(jwt["header"]["typ"], "secevent+jwt") 98 self.assertIsNone(jwt["payload"]["events"][EventTypes.SET_VERIFICATION]["state"]) 99 self.assertFalse(Stream.objects.filter(pk=stream.pk).exists()) 100 101 def test_push_error(self): 102 stream = Stream.objects.create( 103 provider=self.provider, 104 delivery_method=DeliveryMethods.RFC_PUSH, 105 endpoint_url="http://localhost/ssf-push", 106 ) 107 event_data = stream.prepare_event_payload( 108 EventTypes.SET_VERIFICATION, 109 {"state": None}, 110 sub_id={"format": "opaque", "id": str(stream.uuid)}, 111 ) 112 with Mocker() as mocker: 113 mocker.post("http://localhost/ssf-push", text="error", status_code=400) 114 send_ssf_event.send_with_options( 115 args=(stream.pk, event_data), rel_obj=stream.provider 116 ).get_result(block=True, timeout=1) 117 logs = ( 118 TaskLog.objects.filter(task__actor_name=send_ssf_event.actor_name) 119 .order_by("timestamp") 120 .filter(event="Failed to send request") 121 .first() 122 ) 123 self.assertEqual(logs.attributes, {"response": {"status": 400, "content": "error"}})
class
TestTasks(rest_framework.test.APITestCase):
20class TestTasks(APITestCase): 21 def setUp(self): 22 self.application = Application.objects.create(name=generate_id(), slug=generate_id()) 23 self.provider = SSFProvider.objects.create( 24 name=generate_id(), 25 signing_key=create_test_cert(), 26 backchannel_application=self.application, 27 ) 28 29 def test_push_simple(self): 30 stream = Stream.objects.create( 31 provider=self.provider, 32 delivery_method=DeliveryMethods.RFC_PUSH, 33 endpoint_url="http://localhost/ssf-push", 34 ) 35 event_data = stream.prepare_event_payload( 36 EventTypes.SET_VERIFICATION, 37 {"state": None}, 38 sub_id={"format": "opaque", "id": str(stream.uuid)}, 39 ) 40 with Mocker() as mocker: 41 mocker.post("http://localhost/ssf-push", status_code=202) 42 send_ssf_event.send_with_options( 43 args=(stream.pk, event_data), rel_obj=stream.provider 44 ).get_result(block=True, timeout=1) 45 self.assertEqual( 46 mocker.request_history[0].headers["Content-Type"], "application/secevent+jwt" 47 ) 48 jwt = decode_complete(mocker.request_history[0].body, options={"verify_signature": False}) 49 self.assertEqual(jwt["header"]["typ"], "secevent+jwt") 50 self.assertIsNone(jwt["payload"]["events"][EventTypes.SET_VERIFICATION]["state"]) 51 52 def test_push_auth(self): 53 auth = generate_id() 54 stream = Stream.objects.create( 55 provider=self.provider, 56 delivery_method=DeliveryMethods.RFC_PUSH, 57 endpoint_url="http://localhost/ssf-push", 58 authorization_header=auth, 59 ) 60 event_data = stream.prepare_event_payload( 61 EventTypes.SET_VERIFICATION, 62 {"state": None}, 63 sub_id={"format": "opaque", "id": str(stream.uuid)}, 64 ) 65 with Mocker() as mocker: 66 mocker.post("http://localhost/ssf-push", status_code=202) 67 send_ssf_event.send_with_options( 68 args=(stream.pk, event_data), rel_obj=stream.provider 69 ).get_result(block=True, timeout=1) 70 self.assertEqual(mocker.request_history[0].headers["Authorization"], auth) 71 self.assertEqual( 72 mocker.request_history[0].headers["Content-Type"], "application/secevent+jwt" 73 ) 74 jwt = decode_complete(mocker.request_history[0].body, options={"verify_signature": False}) 75 self.assertEqual(jwt["header"]["typ"], "secevent+jwt") 76 self.assertIsNone(jwt["payload"]["events"][EventTypes.SET_VERIFICATION]["state"]) 77 78 def test_push_stream_disable(self): 79 auth = generate_id() 80 stream = Stream.objects.create( 81 provider=self.provider, 82 delivery_method=DeliveryMethods.RFC_PUSH, 83 endpoint_url="http://localhost/ssf-push", 84 authorization_header=auth, 85 status=StreamStatus.DISABLED, 86 ) 87 event_data = stream.prepare_event_payload( 88 EventTypes.SET_VERIFICATION, 89 {"state": None}, 90 sub_id={"format": "opaque", "id": str(stream.uuid)}, 91 ) 92 with Mocker() as mocker: 93 mocker.post("http://localhost/ssf-push", status_code=202) 94 send_ssf_event.send_with_options( 95 args=(stream.pk, event_data), rel_obj=stream.provider 96 ).get_result(block=True, timeout=1) 97 jwt = decode_complete(mocker.request_history[0].body, options={"verify_signature": False}) 98 self.assertEqual(jwt["header"]["typ"], "secevent+jwt") 99 self.assertIsNone(jwt["payload"]["events"][EventTypes.SET_VERIFICATION]["state"]) 100 self.assertFalse(Stream.objects.filter(pk=stream.pk).exists()) 101 102 def test_push_error(self): 103 stream = Stream.objects.create( 104 provider=self.provider, 105 delivery_method=DeliveryMethods.RFC_PUSH, 106 endpoint_url="http://localhost/ssf-push", 107 ) 108 event_data = stream.prepare_event_payload( 109 EventTypes.SET_VERIFICATION, 110 {"state": None}, 111 sub_id={"format": "opaque", "id": str(stream.uuid)}, 112 ) 113 with Mocker() as mocker: 114 mocker.post("http://localhost/ssf-push", text="error", status_code=400) 115 send_ssf_event.send_with_options( 116 args=(stream.pk, event_data), rel_obj=stream.provider 117 ).get_result(block=True, timeout=1) 118 logs = ( 119 TaskLog.objects.filter(task__actor_name=send_ssf_event.actor_name) 120 .order_by("timestamp") 121 .filter(event="Failed to send request") 122 .first() 123 ) 124 self.assertEqual(logs.attributes, {"response": {"status": 400, "content": "error"}})
Similar to TransactionTestCase, but use transaction.atomic() to achieve
test isolation.
In most situations, TestCase should be preferred to TransactionTestCase as it allows faster execution. However, there are some situations where using TransactionTestCase might be necessary (e.g. testing some transactional behavior).
On database backends with no transaction support, TestCase behaves as TransactionTestCase.
def
setUp(self):
21 def setUp(self): 22 self.application = Application.objects.create(name=generate_id(), slug=generate_id()) 23 self.provider = SSFProvider.objects.create( 24 name=generate_id(), 25 signing_key=create_test_cert(), 26 backchannel_application=self.application, 27 )
Hook method for setting up the test fixture before exercising it.
def
test_push_simple(self):
29 def test_push_simple(self): 30 stream = Stream.objects.create( 31 provider=self.provider, 32 delivery_method=DeliveryMethods.RFC_PUSH, 33 endpoint_url="http://localhost/ssf-push", 34 ) 35 event_data = stream.prepare_event_payload( 36 EventTypes.SET_VERIFICATION, 37 {"state": None}, 38 sub_id={"format": "opaque", "id": str(stream.uuid)}, 39 ) 40 with Mocker() as mocker: 41 mocker.post("http://localhost/ssf-push", status_code=202) 42 send_ssf_event.send_with_options( 43 args=(stream.pk, event_data), rel_obj=stream.provider 44 ).get_result(block=True, timeout=1) 45 self.assertEqual( 46 mocker.request_history[0].headers["Content-Type"], "application/secevent+jwt" 47 ) 48 jwt = decode_complete(mocker.request_history[0].body, options={"verify_signature": False}) 49 self.assertEqual(jwt["header"]["typ"], "secevent+jwt") 50 self.assertIsNone(jwt["payload"]["events"][EventTypes.SET_VERIFICATION]["state"])
def
test_push_auth(self):
52 def test_push_auth(self): 53 auth = generate_id() 54 stream = Stream.objects.create( 55 provider=self.provider, 56 delivery_method=DeliveryMethods.RFC_PUSH, 57 endpoint_url="http://localhost/ssf-push", 58 authorization_header=auth, 59 ) 60 event_data = stream.prepare_event_payload( 61 EventTypes.SET_VERIFICATION, 62 {"state": None}, 63 sub_id={"format": "opaque", "id": str(stream.uuid)}, 64 ) 65 with Mocker() as mocker: 66 mocker.post("http://localhost/ssf-push", status_code=202) 67 send_ssf_event.send_with_options( 68 args=(stream.pk, event_data), rel_obj=stream.provider 69 ).get_result(block=True, timeout=1) 70 self.assertEqual(mocker.request_history[0].headers["Authorization"], auth) 71 self.assertEqual( 72 mocker.request_history[0].headers["Content-Type"], "application/secevent+jwt" 73 ) 74 jwt = decode_complete(mocker.request_history[0].body, options={"verify_signature": False}) 75 self.assertEqual(jwt["header"]["typ"], "secevent+jwt") 76 self.assertIsNone(jwt["payload"]["events"][EventTypes.SET_VERIFICATION]["state"])
def
test_push_stream_disable(self):
78 def test_push_stream_disable(self): 79 auth = generate_id() 80 stream = Stream.objects.create( 81 provider=self.provider, 82 delivery_method=DeliveryMethods.RFC_PUSH, 83 endpoint_url="http://localhost/ssf-push", 84 authorization_header=auth, 85 status=StreamStatus.DISABLED, 86 ) 87 event_data = stream.prepare_event_payload( 88 EventTypes.SET_VERIFICATION, 89 {"state": None}, 90 sub_id={"format": "opaque", "id": str(stream.uuid)}, 91 ) 92 with Mocker() as mocker: 93 mocker.post("http://localhost/ssf-push", status_code=202) 94 send_ssf_event.send_with_options( 95 args=(stream.pk, event_data), rel_obj=stream.provider 96 ).get_result(block=True, timeout=1) 97 jwt = decode_complete(mocker.request_history[0].body, options={"verify_signature": False}) 98 self.assertEqual(jwt["header"]["typ"], "secevent+jwt") 99 self.assertIsNone(jwt["payload"]["events"][EventTypes.SET_VERIFICATION]["state"]) 100 self.assertFalse(Stream.objects.filter(pk=stream.pk).exists())
def
test_push_error(self):
102 def test_push_error(self): 103 stream = Stream.objects.create( 104 provider=self.provider, 105 delivery_method=DeliveryMethods.RFC_PUSH, 106 endpoint_url="http://localhost/ssf-push", 107 ) 108 event_data = stream.prepare_event_payload( 109 EventTypes.SET_VERIFICATION, 110 {"state": None}, 111 sub_id={"format": "opaque", "id": str(stream.uuid)}, 112 ) 113 with Mocker() as mocker: 114 mocker.post("http://localhost/ssf-push", text="error", status_code=400) 115 send_ssf_event.send_with_options( 116 args=(stream.pk, event_data), rel_obj=stream.provider 117 ).get_result(block=True, timeout=1) 118 logs = ( 119 TaskLog.objects.filter(task__actor_name=send_ssf_event.actor_name) 120 .order_by("timestamp") 121 .filter(event="Failed to send request") 122 .first() 123 ) 124 self.assertEqual(logs.attributes, {"response": {"status": 400, "content": "error"}})