authentik.sources.saml.tests.test_response

SAML Source tests

  1"""SAML Source tests"""
  2
  3from base64 import b64encode
  4
  5from django.test import TestCase
  6
  7from authentik.core.tests.utils import RequestFactory, create_test_cert, create_test_flow
  8from authentik.crypto.models import CertificateKeyPair
  9from authentik.lib.generators import generate_id
 10from authentik.lib.tests.utils import load_fixture
 11from authentik.sources.saml.exceptions import InvalidEncryption, InvalidSignature
 12from authentik.sources.saml.models import SAMLSource
 13from authentik.sources.saml.processors.response import ResponseProcessor
 14
 15
 16class TestResponseProcessor(TestCase):
 17    """Test ResponseProcessor"""
 18
 19    def setUp(self):
 20        self.factory = RequestFactory()
 21        self.source = SAMLSource.objects.create(
 22            name=generate_id(),
 23            slug=generate_id(),
 24            issuer="authentik",
 25            allow_idp_initiated=True,
 26            pre_authentication_flow=create_test_flow(),
 27        )
 28
 29    def test_status_error(self):
 30        """Test error status"""
 31        request = self.factory.post(
 32            "/",
 33            data={
 34                "SAMLResponse": b64encode(
 35                    load_fixture("fixtures/response_error.xml").encode()
 36                ).decode()
 37            },
 38        )
 39
 40        with self.assertRaisesMessage(
 41            ValueError,
 42            (
 43                "Invalid request, ACS Url in request http://localhost:9000/source/saml/google/acs/ "
 44                "doesn't match configured ACS Url https://127.0.0.1:9443/source/saml/google/acs/."
 45            ),
 46        ):
 47            ResponseProcessor(self.source, request).parse()
 48
 49    def test_success(self):
 50        """Test success"""
 51        request = self.factory.post(
 52            "/",
 53            data={
 54                "SAMLResponse": b64encode(
 55                    load_fixture("fixtures/response_success.xml").encode()
 56                ).decode()
 57            },
 58        )
 59
 60        parser = ResponseProcessor(self.source, request)
 61        parser.parse()
 62        sfm = parser.prepare_flow_manager()
 63        self.assertEqual(
 64            sfm.user_properties,
 65            {
 66                "email": "foo@bar.baz",
 67                "name": "foo",
 68                "sn": "bar",
 69                "username": "jens@goauthentik.io",
 70                "attributes": {},
 71                "path": self.source.get_user_path(),
 72            },
 73        )
 74
 75    def test_success_with_status_message_and_detail(self):
 76        """Test success with StatusMessage and StatusDetail present (should not raise error)"""
 77        request = self.factory.post(
 78            "/",
 79            data={
 80                "SAMLResponse": b64encode(
 81                    load_fixture("fixtures/response_success_with_message.xml").encode()
 82                ).decode()
 83            },
 84        )
 85
 86        parser = ResponseProcessor(self.source, request)
 87        parser.parse()
 88        sfm = parser.prepare_flow_manager()
 89        self.assertEqual(sfm.user_properties["username"], "jens@goauthentik.io")
 90
 91    def test_error_with_message_and_detail(self):
 92        """Test error status with StatusMessage and StatusDetail includes both in error"""
 93        request = self.factory.post(
 94            "/",
 95            data={
 96                "SAMLResponse": b64encode(
 97                    load_fixture("fixtures/response_error_with_detail.xml").encode()
 98                ).decode()
 99            },
100        )
101
102        with self.assertRaises(ValueError) as ctx:
103            ResponseProcessor(self.source, request).parse()
104        # Should contain both detail and message
105        self.assertIn("User account is disabled", str(ctx.exception))
106        self.assertIn("Authentication failed", str(ctx.exception))
107
108    def test_encrypted_correct(self):
109        """Test encrypted"""
110        key = load_fixture("fixtures/encrypted-key.pem")
111        kp = CertificateKeyPair.objects.create(
112            name=generate_id(),
113            key_data=key,
114        )
115        self.source.encryption_kp = kp
116        request = self.factory.post(
117            "/",
118            data={
119                "SAMLResponse": b64encode(
120                    load_fixture("fixtures/response_encrypted.xml").encode()
121                ).decode()
122            },
123        )
124
125        parser = ResponseProcessor(self.source, request)
126        parser.parse()
127
128    def test_encrypted_incorrect_key(self):
129        """Test encrypted"""
130        kp = create_test_cert()
131        self.source.encryption_kp = kp
132        request = self.factory.post(
133            "/",
134            data={
135                "SAMLResponse": b64encode(
136                    load_fixture("fixtures/response_encrypted.xml").encode()
137                ).decode()
138            },
139        )
140
141        parser = ResponseProcessor(self.source, request)
142        with self.assertRaises(InvalidEncryption):
143            parser.parse()
144
145    def test_verification_assertion(self):
146        """Test verifying signature inside assertion"""
147        key = load_fixture("fixtures/signature_cert.pem")
148        kp = CertificateKeyPair.objects.create(
149            name=generate_id(),
150            certificate_data=key,
151        )
152        self.source.verification_kp = kp
153        self.source.signed_assertion = True
154        self.source.signed_response = False
155        request = self.factory.post(
156            "/",
157            data={
158                "SAMLResponse": b64encode(
159                    load_fixture("fixtures/response_signed_assertion.xml").encode()
160                ).decode()
161            },
162        )
163
164        parser = ResponseProcessor(self.source, request)
165        parser.parse()
166
167    def test_verification_assertion_duplicate(self):
168        """Test verifying signature inside assertion, where the response has another assertion
169        before our signed assertion"""
170        key = load_fixture("fixtures/signature_cert.pem")
171        kp = CertificateKeyPair.objects.create(
172            name=generate_id(),
173            certificate_data=key,
174        )
175        self.source.verification_kp = kp
176        self.source.signed_assertion = True
177        self.source.signed_response = False
178        request = self.factory.post(
179            "/",
180            data={
181                "SAMLResponse": b64encode(
182                    load_fixture("fixtures/response_signed_assertion_dup.xml").encode()
183                ).decode()
184            },
185        )
186
187        parser = ResponseProcessor(self.source, request)
188        parser.parse()
189        self.assertNotEqual(parser._get_name_id().text, "bad")
190        self.assertEqual(parser._get_name_id().text, "_ce3d2948b4cf20146dee0a0b3dd6f69b6cf86f62d7")
191
192    def test_verification_response(self):
193        """Test verifying signature inside response"""
194        key = load_fixture("fixtures/signature_cert.pem")
195        kp = CertificateKeyPair.objects.create(
196            name=generate_id(),
197            certificate_data=key,
198        )
199        self.source.verification_kp = kp
200        self.source.signed_response = True
201        self.source.signed_assertion = False
202        request = self.factory.post(
203            "/",
204            data={
205                "SAMLResponse": b64encode(
206                    load_fixture("fixtures/response_signed_response.xml").encode()
207                ).decode()
208            },
209        )
210
211        parser = ResponseProcessor(self.source, request)
212        parser.parse()
213
214    def test_verification_response_and_assertion(self):
215        """Test verifying signature inside response and assertion"""
216        key = load_fixture("fixtures/signature_cert.pem")
217        kp = CertificateKeyPair.objects.create(
218            name=generate_id(),
219            certificate_data=key,
220        )
221        self.source.verification_kp = kp
222        self.source.signed_assertion = True
223        self.source.signed_response = True
224        request = self.factory.post(
225            "/",
226            data={
227                "SAMLResponse": b64encode(
228                    load_fixture("fixtures/response_signed_response_and_assertion.xml").encode()
229                ).decode()
230            },
231        )
232
233        parser = ResponseProcessor(self.source, request)
234        parser.parse()
235
236    def test_verification_wrong_signature(self):
237        """Test invalid signature fails"""
238        key = load_fixture("fixtures/signature_cert.pem")
239        kp = CertificateKeyPair.objects.create(
240            name=generate_id(),
241            certificate_data=key,
242        )
243        self.source.verification_kp = kp
244        self.source.signed_assertion = True
245        request = self.factory.post(
246            "/",
247            data={
248                "SAMLResponse": b64encode(
249                    # Same as response_signed_assertion.xml but the role name is altered
250                    load_fixture("fixtures/response_signed_error.xml").encode()
251                ).decode()
252            },
253        )
254
255        parser = ResponseProcessor(self.source, request)
256
257        with self.assertRaisesMessage(InvalidSignature, ""):
258            parser.parse()
259
260    def test_verification_no_signature(self):
261        """Test rejecting response without signature when signed_assertion is True"""
262        key = load_fixture("fixtures/signature_cert.pem")
263        kp = CertificateKeyPair.objects.create(
264            name=generate_id(),
265            certificate_data=key,
266        )
267        self.source.verification_kp = kp
268        self.source.signed_assertion = True
269        request = self.factory.post(
270            "/",
271            data={
272                "SAMLResponse": b64encode(
273                    load_fixture("fixtures/response_success.xml").encode()
274                ).decode()
275            },
276        )
277
278        parser = ResponseProcessor(self.source, request)
279
280        with self.assertRaisesMessage(InvalidSignature, ""):
281            parser.parse()
282
283    def test_verification_incorrect_response(self):
284        """Test verifying signature inside response"""
285        key = load_fixture("fixtures/signature_cert.pem")
286        kp = CertificateKeyPair.objects.create(
287            name=generate_id(),
288            certificate_data=key,
289        )
290        self.source.verification_kp = kp
291        self.source.signed_response = True
292        self.source.signed_assertion = False
293        request = self.factory.post(
294            "/",
295            data={
296                "SAMLResponse": b64encode(
297                    load_fixture("fixtures/response_incorrect_signed_response.xml").encode()
298                ).decode()
299            },
300        )
301
302        parser = ResponseProcessor(self.source, request)
303        with self.assertRaisesMessage(InvalidSignature, ""):
304            parser.parse()
305
306    def test_signed_encrypted_response(self):
307        """Test signed & encrypted response"""
308        verification_key = load_fixture("fixtures/signature_cert2.pem")
309        vkp = CertificateKeyPair.objects.create(
310            name=generate_id(),
311            certificate_data=verification_key,
312        )
313
314        encrypted_key = load_fixture("fixtures/encrypted-key2.pem")
315        ekp = CertificateKeyPair.objects.create(name=generate_id(), key_data=encrypted_key)
316
317        self.source.verification_kp = vkp
318        self.source.encryption_kp = ekp
319        self.source.signed_response = True
320        self.source.signed_assertion = False
321        request = self.factory.post(
322            "/",
323            data={
324                "SAMLResponse": b64encode(
325                    load_fixture("fixtures/response_signed_encrypted.xml").encode()
326                ).decode()
327            },
328        )
329
330        parser = ResponseProcessor(self.source, request)
331        parser.parse()
332
333    def test_transient(self):
334        """Test SAML transient NameID"""
335        verification_key = load_fixture("fixtures/signature_cert2.pem")
336        vkp = CertificateKeyPair.objects.create(
337            name=generate_id(),
338            certificate_data=verification_key,
339        )
340        self.source.verification_kp = vkp
341        self.source.signed_response = True
342        self.source.signed_assertion = False
343        request = self.factory.post(
344            "/",
345            data={
346                "SAMLResponse": b64encode(
347                    load_fixture("fixtures/response_transient.xml").encode()
348                ).decode()
349            },
350        )
351
352        parser = ResponseProcessor(self.source, request)
353        parser.parse()
354        parser.prepare_flow_manager()
class TestResponseProcessor(django.test.testcases.TestCase):
 17class TestResponseProcessor(TestCase):
 18    """Test ResponseProcessor"""
 19
 20    def setUp(self):
 21        self.factory = RequestFactory()
 22        self.source = SAMLSource.objects.create(
 23            name=generate_id(),
 24            slug=generate_id(),
 25            issuer="authentik",
 26            allow_idp_initiated=True,
 27            pre_authentication_flow=create_test_flow(),
 28        )
 29
 30    def test_status_error(self):
 31        """Test error status"""
 32        request = self.factory.post(
 33            "/",
 34            data={
 35                "SAMLResponse": b64encode(
 36                    load_fixture("fixtures/response_error.xml").encode()
 37                ).decode()
 38            },
 39        )
 40
 41        with self.assertRaisesMessage(
 42            ValueError,
 43            (
 44                "Invalid request, ACS Url in request http://localhost:9000/source/saml/google/acs/ "
 45                "doesn't match configured ACS Url https://127.0.0.1:9443/source/saml/google/acs/."
 46            ),
 47        ):
 48            ResponseProcessor(self.source, request).parse()
 49
 50    def test_success(self):
 51        """Test success"""
 52        request = self.factory.post(
 53            "/",
 54            data={
 55                "SAMLResponse": b64encode(
 56                    load_fixture("fixtures/response_success.xml").encode()
 57                ).decode()
 58            },
 59        )
 60
 61        parser = ResponseProcessor(self.source, request)
 62        parser.parse()
 63        sfm = parser.prepare_flow_manager()
 64        self.assertEqual(
 65            sfm.user_properties,
 66            {
 67                "email": "foo@bar.baz",
 68                "name": "foo",
 69                "sn": "bar",
 70                "username": "jens@goauthentik.io",
 71                "attributes": {},
 72                "path": self.source.get_user_path(),
 73            },
 74        )
 75
 76    def test_success_with_status_message_and_detail(self):
 77        """Test success with StatusMessage and StatusDetail present (should not raise error)"""
 78        request = self.factory.post(
 79            "/",
 80            data={
 81                "SAMLResponse": b64encode(
 82                    load_fixture("fixtures/response_success_with_message.xml").encode()
 83                ).decode()
 84            },
 85        )
 86
 87        parser = ResponseProcessor(self.source, request)
 88        parser.parse()
 89        sfm = parser.prepare_flow_manager()
 90        self.assertEqual(sfm.user_properties["username"], "jens@goauthentik.io")
 91
 92    def test_error_with_message_and_detail(self):
 93        """Test error status with StatusMessage and StatusDetail includes both in error"""
 94        request = self.factory.post(
 95            "/",
 96            data={
 97                "SAMLResponse": b64encode(
 98                    load_fixture("fixtures/response_error_with_detail.xml").encode()
 99                ).decode()
100            },
101        )
102
103        with self.assertRaises(ValueError) as ctx:
104            ResponseProcessor(self.source, request).parse()
105        # Should contain both detail and message
106        self.assertIn("User account is disabled", str(ctx.exception))
107        self.assertIn("Authentication failed", str(ctx.exception))
108
109    def test_encrypted_correct(self):
110        """Test encrypted"""
111        key = load_fixture("fixtures/encrypted-key.pem")
112        kp = CertificateKeyPair.objects.create(
113            name=generate_id(),
114            key_data=key,
115        )
116        self.source.encryption_kp = kp
117        request = self.factory.post(
118            "/",
119            data={
120                "SAMLResponse": b64encode(
121                    load_fixture("fixtures/response_encrypted.xml").encode()
122                ).decode()
123            },
124        )
125
126        parser = ResponseProcessor(self.source, request)
127        parser.parse()
128
129    def test_encrypted_incorrect_key(self):
130        """Test encrypted"""
131        kp = create_test_cert()
132        self.source.encryption_kp = kp
133        request = self.factory.post(
134            "/",
135            data={
136                "SAMLResponse": b64encode(
137                    load_fixture("fixtures/response_encrypted.xml").encode()
138                ).decode()
139            },
140        )
141
142        parser = ResponseProcessor(self.source, request)
143        with self.assertRaises(InvalidEncryption):
144            parser.parse()
145
146    def test_verification_assertion(self):
147        """Test verifying signature inside assertion"""
148        key = load_fixture("fixtures/signature_cert.pem")
149        kp = CertificateKeyPair.objects.create(
150            name=generate_id(),
151            certificate_data=key,
152        )
153        self.source.verification_kp = kp
154        self.source.signed_assertion = True
155        self.source.signed_response = False
156        request = self.factory.post(
157            "/",
158            data={
159                "SAMLResponse": b64encode(
160                    load_fixture("fixtures/response_signed_assertion.xml").encode()
161                ).decode()
162            },
163        )
164
165        parser = ResponseProcessor(self.source, request)
166        parser.parse()
167
168    def test_verification_assertion_duplicate(self):
169        """Test verifying signature inside assertion, where the response has another assertion
170        before our signed assertion"""
171        key = load_fixture("fixtures/signature_cert.pem")
172        kp = CertificateKeyPair.objects.create(
173            name=generate_id(),
174            certificate_data=key,
175        )
176        self.source.verification_kp = kp
177        self.source.signed_assertion = True
178        self.source.signed_response = False
179        request = self.factory.post(
180            "/",
181            data={
182                "SAMLResponse": b64encode(
183                    load_fixture("fixtures/response_signed_assertion_dup.xml").encode()
184                ).decode()
185            },
186        )
187
188        parser = ResponseProcessor(self.source, request)
189        parser.parse()
190        self.assertNotEqual(parser._get_name_id().text, "bad")
191        self.assertEqual(parser._get_name_id().text, "_ce3d2948b4cf20146dee0a0b3dd6f69b6cf86f62d7")
192
193    def test_verification_response(self):
194        """Test verifying signature inside response"""
195        key = load_fixture("fixtures/signature_cert.pem")
196        kp = CertificateKeyPair.objects.create(
197            name=generate_id(),
198            certificate_data=key,
199        )
200        self.source.verification_kp = kp
201        self.source.signed_response = True
202        self.source.signed_assertion = False
203        request = self.factory.post(
204            "/",
205            data={
206                "SAMLResponse": b64encode(
207                    load_fixture("fixtures/response_signed_response.xml").encode()
208                ).decode()
209            },
210        )
211
212        parser = ResponseProcessor(self.source, request)
213        parser.parse()
214
215    def test_verification_response_and_assertion(self):
216        """Test verifying signature inside response and assertion"""
217        key = load_fixture("fixtures/signature_cert.pem")
218        kp = CertificateKeyPair.objects.create(
219            name=generate_id(),
220            certificate_data=key,
221        )
222        self.source.verification_kp = kp
223        self.source.signed_assertion = True
224        self.source.signed_response = True
225        request = self.factory.post(
226            "/",
227            data={
228                "SAMLResponse": b64encode(
229                    load_fixture("fixtures/response_signed_response_and_assertion.xml").encode()
230                ).decode()
231            },
232        )
233
234        parser = ResponseProcessor(self.source, request)
235        parser.parse()
236
237    def test_verification_wrong_signature(self):
238        """Test invalid signature fails"""
239        key = load_fixture("fixtures/signature_cert.pem")
240        kp = CertificateKeyPair.objects.create(
241            name=generate_id(),
242            certificate_data=key,
243        )
244        self.source.verification_kp = kp
245        self.source.signed_assertion = True
246        request = self.factory.post(
247            "/",
248            data={
249                "SAMLResponse": b64encode(
250                    # Same as response_signed_assertion.xml but the role name is altered
251                    load_fixture("fixtures/response_signed_error.xml").encode()
252                ).decode()
253            },
254        )
255
256        parser = ResponseProcessor(self.source, request)
257
258        with self.assertRaisesMessage(InvalidSignature, ""):
259            parser.parse()
260
261    def test_verification_no_signature(self):
262        """Test rejecting response without signature when signed_assertion is True"""
263        key = load_fixture("fixtures/signature_cert.pem")
264        kp = CertificateKeyPair.objects.create(
265            name=generate_id(),
266            certificate_data=key,
267        )
268        self.source.verification_kp = kp
269        self.source.signed_assertion = True
270        request = self.factory.post(
271            "/",
272            data={
273                "SAMLResponse": b64encode(
274                    load_fixture("fixtures/response_success.xml").encode()
275                ).decode()
276            },
277        )
278
279        parser = ResponseProcessor(self.source, request)
280
281        with self.assertRaisesMessage(InvalidSignature, ""):
282            parser.parse()
283
284    def test_verification_incorrect_response(self):
285        """Test verifying signature inside response"""
286        key = load_fixture("fixtures/signature_cert.pem")
287        kp = CertificateKeyPair.objects.create(
288            name=generate_id(),
289            certificate_data=key,
290        )
291        self.source.verification_kp = kp
292        self.source.signed_response = True
293        self.source.signed_assertion = False
294        request = self.factory.post(
295            "/",
296            data={
297                "SAMLResponse": b64encode(
298                    load_fixture("fixtures/response_incorrect_signed_response.xml").encode()
299                ).decode()
300            },
301        )
302
303        parser = ResponseProcessor(self.source, request)
304        with self.assertRaisesMessage(InvalidSignature, ""):
305            parser.parse()
306
307    def test_signed_encrypted_response(self):
308        """Test signed & encrypted response"""
309        verification_key = load_fixture("fixtures/signature_cert2.pem")
310        vkp = CertificateKeyPair.objects.create(
311            name=generate_id(),
312            certificate_data=verification_key,
313        )
314
315        encrypted_key = load_fixture("fixtures/encrypted-key2.pem")
316        ekp = CertificateKeyPair.objects.create(name=generate_id(), key_data=encrypted_key)
317
318        self.source.verification_kp = vkp
319        self.source.encryption_kp = ekp
320        self.source.signed_response = True
321        self.source.signed_assertion = False
322        request = self.factory.post(
323            "/",
324            data={
325                "SAMLResponse": b64encode(
326                    load_fixture("fixtures/response_signed_encrypted.xml").encode()
327                ).decode()
328            },
329        )
330
331        parser = ResponseProcessor(self.source, request)
332        parser.parse()
333
334    def test_transient(self):
335        """Test SAML transient NameID"""
336        verification_key = load_fixture("fixtures/signature_cert2.pem")
337        vkp = CertificateKeyPair.objects.create(
338            name=generate_id(),
339            certificate_data=verification_key,
340        )
341        self.source.verification_kp = vkp
342        self.source.signed_response = True
343        self.source.signed_assertion = False
344        request = self.factory.post(
345            "/",
346            data={
347                "SAMLResponse": b64encode(
348                    load_fixture("fixtures/response_transient.xml").encode()
349                ).decode()
350            },
351        )
352
353        parser = ResponseProcessor(self.source, request)
354        parser.parse()
355        parser.prepare_flow_manager()

Test ResponseProcessor

def setUp(self):
20    def setUp(self):
21        self.factory = RequestFactory()
22        self.source = SAMLSource.objects.create(
23            name=generate_id(),
24            slug=generate_id(),
25            issuer="authentik",
26            allow_idp_initiated=True,
27            pre_authentication_flow=create_test_flow(),
28        )

Hook method for setting up the test fixture before exercising it.

def test_status_error(self):
30    def test_status_error(self):
31        """Test error status"""
32        request = self.factory.post(
33            "/",
34            data={
35                "SAMLResponse": b64encode(
36                    load_fixture("fixtures/response_error.xml").encode()
37                ).decode()
38            },
39        )
40
41        with self.assertRaisesMessage(
42            ValueError,
43            (
44                "Invalid request, ACS Url in request http://localhost:9000/source/saml/google/acs/ "
45                "doesn't match configured ACS Url https://127.0.0.1:9443/source/saml/google/acs/."
46            ),
47        ):
48            ResponseProcessor(self.source, request).parse()

Test error status

def test_success(self):
50    def test_success(self):
51        """Test success"""
52        request = self.factory.post(
53            "/",
54            data={
55                "SAMLResponse": b64encode(
56                    load_fixture("fixtures/response_success.xml").encode()
57                ).decode()
58            },
59        )
60
61        parser = ResponseProcessor(self.source, request)
62        parser.parse()
63        sfm = parser.prepare_flow_manager()
64        self.assertEqual(
65            sfm.user_properties,
66            {
67                "email": "foo@bar.baz",
68                "name": "foo",
69                "sn": "bar",
70                "username": "jens@goauthentik.io",
71                "attributes": {},
72                "path": self.source.get_user_path(),
73            },
74        )

Test success

def test_success_with_status_message_and_detail(self):
76    def test_success_with_status_message_and_detail(self):
77        """Test success with StatusMessage and StatusDetail present (should not raise error)"""
78        request = self.factory.post(
79            "/",
80            data={
81                "SAMLResponse": b64encode(
82                    load_fixture("fixtures/response_success_with_message.xml").encode()
83                ).decode()
84            },
85        )
86
87        parser = ResponseProcessor(self.source, request)
88        parser.parse()
89        sfm = parser.prepare_flow_manager()
90        self.assertEqual(sfm.user_properties["username"], "jens@goauthentik.io")

Test success with StatusMessage and StatusDetail present (should not raise error)

def test_error_with_message_and_detail(self):
 92    def test_error_with_message_and_detail(self):
 93        """Test error status with StatusMessage and StatusDetail includes both in error"""
 94        request = self.factory.post(
 95            "/",
 96            data={
 97                "SAMLResponse": b64encode(
 98                    load_fixture("fixtures/response_error_with_detail.xml").encode()
 99                ).decode()
100            },
101        )
102
103        with self.assertRaises(ValueError) as ctx:
104            ResponseProcessor(self.source, request).parse()
105        # Should contain both detail and message
106        self.assertIn("User account is disabled", str(ctx.exception))
107        self.assertIn("Authentication failed", str(ctx.exception))

Test error status with StatusMessage and StatusDetail includes both in error

def test_encrypted_correct(self):
109    def test_encrypted_correct(self):
110        """Test encrypted"""
111        key = load_fixture("fixtures/encrypted-key.pem")
112        kp = CertificateKeyPair.objects.create(
113            name=generate_id(),
114            key_data=key,
115        )
116        self.source.encryption_kp = kp
117        request = self.factory.post(
118            "/",
119            data={
120                "SAMLResponse": b64encode(
121                    load_fixture("fixtures/response_encrypted.xml").encode()
122                ).decode()
123            },
124        )
125
126        parser = ResponseProcessor(self.source, request)
127        parser.parse()

Test encrypted

def test_encrypted_incorrect_key(self):
129    def test_encrypted_incorrect_key(self):
130        """Test encrypted"""
131        kp = create_test_cert()
132        self.source.encryption_kp = kp
133        request = self.factory.post(
134            "/",
135            data={
136                "SAMLResponse": b64encode(
137                    load_fixture("fixtures/response_encrypted.xml").encode()
138                ).decode()
139            },
140        )
141
142        parser = ResponseProcessor(self.source, request)
143        with self.assertRaises(InvalidEncryption):
144            parser.parse()

Test encrypted

def test_verification_assertion(self):
146    def test_verification_assertion(self):
147        """Test verifying signature inside assertion"""
148        key = load_fixture("fixtures/signature_cert.pem")
149        kp = CertificateKeyPair.objects.create(
150            name=generate_id(),
151            certificate_data=key,
152        )
153        self.source.verification_kp = kp
154        self.source.signed_assertion = True
155        self.source.signed_response = False
156        request = self.factory.post(
157            "/",
158            data={
159                "SAMLResponse": b64encode(
160                    load_fixture("fixtures/response_signed_assertion.xml").encode()
161                ).decode()
162            },
163        )
164
165        parser = ResponseProcessor(self.source, request)
166        parser.parse()

Test verifying signature inside assertion

def test_verification_assertion_duplicate(self):
168    def test_verification_assertion_duplicate(self):
169        """Test verifying signature inside assertion, where the response has another assertion
170        before our signed assertion"""
171        key = load_fixture("fixtures/signature_cert.pem")
172        kp = CertificateKeyPair.objects.create(
173            name=generate_id(),
174            certificate_data=key,
175        )
176        self.source.verification_kp = kp
177        self.source.signed_assertion = True
178        self.source.signed_response = False
179        request = self.factory.post(
180            "/",
181            data={
182                "SAMLResponse": b64encode(
183                    load_fixture("fixtures/response_signed_assertion_dup.xml").encode()
184                ).decode()
185            },
186        )
187
188        parser = ResponseProcessor(self.source, request)
189        parser.parse()
190        self.assertNotEqual(parser._get_name_id().text, "bad")
191        self.assertEqual(parser._get_name_id().text, "_ce3d2948b4cf20146dee0a0b3dd6f69b6cf86f62d7")

Test verifying signature inside assertion, where the response has another assertion before our signed assertion

def test_verification_response(self):
193    def test_verification_response(self):
194        """Test verifying signature inside response"""
195        key = load_fixture("fixtures/signature_cert.pem")
196        kp = CertificateKeyPair.objects.create(
197            name=generate_id(),
198            certificate_data=key,
199        )
200        self.source.verification_kp = kp
201        self.source.signed_response = True
202        self.source.signed_assertion = False
203        request = self.factory.post(
204            "/",
205            data={
206                "SAMLResponse": b64encode(
207                    load_fixture("fixtures/response_signed_response.xml").encode()
208                ).decode()
209            },
210        )
211
212        parser = ResponseProcessor(self.source, request)
213        parser.parse()

Test verifying signature inside response

def test_verification_response_and_assertion(self):
215    def test_verification_response_and_assertion(self):
216        """Test verifying signature inside response and assertion"""
217        key = load_fixture("fixtures/signature_cert.pem")
218        kp = CertificateKeyPair.objects.create(
219            name=generate_id(),
220            certificate_data=key,
221        )
222        self.source.verification_kp = kp
223        self.source.signed_assertion = True
224        self.source.signed_response = True
225        request = self.factory.post(
226            "/",
227            data={
228                "SAMLResponse": b64encode(
229                    load_fixture("fixtures/response_signed_response_and_assertion.xml").encode()
230                ).decode()
231            },
232        )
233
234        parser = ResponseProcessor(self.source, request)
235        parser.parse()

Test verifying signature inside response and assertion

def test_verification_wrong_signature(self):
237    def test_verification_wrong_signature(self):
238        """Test invalid signature fails"""
239        key = load_fixture("fixtures/signature_cert.pem")
240        kp = CertificateKeyPair.objects.create(
241            name=generate_id(),
242            certificate_data=key,
243        )
244        self.source.verification_kp = kp
245        self.source.signed_assertion = True
246        request = self.factory.post(
247            "/",
248            data={
249                "SAMLResponse": b64encode(
250                    # Same as response_signed_assertion.xml but the role name is altered
251                    load_fixture("fixtures/response_signed_error.xml").encode()
252                ).decode()
253            },
254        )
255
256        parser = ResponseProcessor(self.source, request)
257
258        with self.assertRaisesMessage(InvalidSignature, ""):
259            parser.parse()

Test invalid signature fails

def test_verification_no_signature(self):
261    def test_verification_no_signature(self):
262        """Test rejecting response without signature when signed_assertion is True"""
263        key = load_fixture("fixtures/signature_cert.pem")
264        kp = CertificateKeyPair.objects.create(
265            name=generate_id(),
266            certificate_data=key,
267        )
268        self.source.verification_kp = kp
269        self.source.signed_assertion = True
270        request = self.factory.post(
271            "/",
272            data={
273                "SAMLResponse": b64encode(
274                    load_fixture("fixtures/response_success.xml").encode()
275                ).decode()
276            },
277        )
278
279        parser = ResponseProcessor(self.source, request)
280
281        with self.assertRaisesMessage(InvalidSignature, ""):
282            parser.parse()

Test rejecting response without signature when signed_assertion is True

def test_verification_incorrect_response(self):
284    def test_verification_incorrect_response(self):
285        """Test verifying signature inside response"""
286        key = load_fixture("fixtures/signature_cert.pem")
287        kp = CertificateKeyPair.objects.create(
288            name=generate_id(),
289            certificate_data=key,
290        )
291        self.source.verification_kp = kp
292        self.source.signed_response = True
293        self.source.signed_assertion = False
294        request = self.factory.post(
295            "/",
296            data={
297                "SAMLResponse": b64encode(
298                    load_fixture("fixtures/response_incorrect_signed_response.xml").encode()
299                ).decode()
300            },
301        )
302
303        parser = ResponseProcessor(self.source, request)
304        with self.assertRaisesMessage(InvalidSignature, ""):
305            parser.parse()

Test verifying signature inside response

def test_signed_encrypted_response(self):
307    def test_signed_encrypted_response(self):
308        """Test signed & encrypted response"""
309        verification_key = load_fixture("fixtures/signature_cert2.pem")
310        vkp = CertificateKeyPair.objects.create(
311            name=generate_id(),
312            certificate_data=verification_key,
313        )
314
315        encrypted_key = load_fixture("fixtures/encrypted-key2.pem")
316        ekp = CertificateKeyPair.objects.create(name=generate_id(), key_data=encrypted_key)
317
318        self.source.verification_kp = vkp
319        self.source.encryption_kp = ekp
320        self.source.signed_response = True
321        self.source.signed_assertion = False
322        request = self.factory.post(
323            "/",
324            data={
325                "SAMLResponse": b64encode(
326                    load_fixture("fixtures/response_signed_encrypted.xml").encode()
327                ).decode()
328            },
329        )
330
331        parser = ResponseProcessor(self.source, request)
332        parser.parse()

Test signed & encrypted response

def test_transient(self):
334    def test_transient(self):
335        """Test SAML transient NameID"""
336        verification_key = load_fixture("fixtures/signature_cert2.pem")
337        vkp = CertificateKeyPair.objects.create(
338            name=generate_id(),
339            certificate_data=verification_key,
340        )
341        self.source.verification_kp = vkp
342        self.source.signed_response = True
343        self.source.signed_assertion = False
344        request = self.factory.post(
345            "/",
346            data={
347                "SAMLResponse": b64encode(
348                    load_fixture("fixtures/response_transient.xml").encode()
349                ).decode()
350            },
351        )
352
353        parser = ResponseProcessor(self.source, request)
354        parser.parse()
355        parser.prepare_flow_manager()

Test SAML transient NameID