authentik.sources.saml.tests.test_response

SAML Source tests

  1"""SAML Source tests"""
  2
  3from base64 import b64encode
  4
  5from django.test import TestCase
  6from freezegun import freeze_time
  7
  8from authentik.core.tests.utils import RequestFactory, create_test_cert, create_test_flow
  9from authentik.crypto.models import CertificateKeyPair
 10from authentik.lib.generators import generate_id
 11from authentik.lib.tests.utils import load_fixture
 12from authentik.sources.saml.exceptions import InvalidEncryption, InvalidSignature
 13from authentik.sources.saml.models import SAMLSource
 14from authentik.sources.saml.processors.response import ResponseProcessor
 15
 16
 17class TestResponseProcessor(TestCase):
 18    """Test ResponseProcessor"""
 19
 20    def setUp(self):
 21        self.factory = RequestFactory()
 22        self.source = SAMLSource.objects.create(
 23            name=generate_id(),
 24            slug=generate_id(),
 25            issuer="authentik",
 26            allow_idp_initiated=True,
 27            pre_authentication_flow=create_test_flow(),
 28        )
 29
 30    def test_status_error(self):
 31        """Test error status"""
 32        request = self.factory.post(
 33            "/",
 34            data={
 35                "SAMLResponse": b64encode(
 36                    load_fixture("fixtures/response_error.xml").encode()
 37                ).decode()
 38            },
 39        )
 40
 41        with self.assertRaisesMessage(
 42            ValueError,
 43            (
 44                "Invalid request, ACS Url in request http://localhost:9000/source/saml/google/acs/ "
 45                "doesn't match configured ACS Url https://127.0.0.1:9443/source/saml/google/acs/."
 46            ),
 47        ):
 48            ResponseProcessor(self.source, request).parse()
 49
 50    @freeze_time("2022-10-14T14:15:00")
 51    def test_success(self):
 52        """Test success"""
 53        request = self.factory.post(
 54            "/",
 55            data={
 56                "SAMLResponse": b64encode(
 57                    load_fixture("fixtures/response_success.xml").encode()
 58                ).decode()
 59            },
 60        )
 61
 62        parser = ResponseProcessor(self.source, request)
 63        parser.parse()
 64        sfm = parser.prepare_flow_manager()
 65        self.assertEqual(
 66            sfm.user_properties,
 67            {
 68                "email": "foo@bar.baz",
 69                "name": "foo",
 70                "sn": "bar",
 71                "username": "jens@goauthentik.io",
 72                "attributes": {},
 73                "path": self.source.get_user_path(),
 74            },
 75        )
 76
 77    @freeze_time("2022-10-14T14:16:40Z")
 78    def test_success_with_status_message_and_detail(self):
 79        """Test success with StatusMessage and StatusDetail present (should not raise error)"""
 80        request = self.factory.post(
 81            "/",
 82            data={
 83                "SAMLResponse": b64encode(
 84                    load_fixture("fixtures/response_success_with_message.xml").encode()
 85                ).decode()
 86            },
 87        )
 88
 89        parser = ResponseProcessor(self.source, request)
 90        parser.parse()
 91        sfm = parser.prepare_flow_manager()
 92        self.assertEqual(sfm.user_properties["username"], "jens@goauthentik.io")
 93
 94    @freeze_time("2022-10-14T14:16:40Z")
 95    def test_error_with_message_and_detail(self):
 96        """Test error status with StatusMessage and StatusDetail includes both in error"""
 97        request = self.factory.post(
 98            "/",
 99            data={
100                "SAMLResponse": b64encode(
101                    load_fixture("fixtures/response_error_with_detail.xml").encode()
102                ).decode()
103            },
104        )
105
106        with self.assertRaises(ValueError) as ctx:
107            ResponseProcessor(self.source, request).parse()
108        # Should contain both detail and message
109        self.assertIn("User account is disabled", str(ctx.exception))
110        self.assertIn("Authentication failed", str(ctx.exception))
111
112    @freeze_time("2024-08-07T15:48:09.325Z")
113    def test_encrypted_correct(self):
114        """Test encrypted"""
115        key = load_fixture("fixtures/encrypted-key.pem")
116        kp = CertificateKeyPair.objects.create(
117            name=generate_id(),
118            key_data=key,
119        )
120        self.source.encryption_kp = kp
121        request = self.factory.post(
122            "/",
123            data={
124                "SAMLResponse": b64encode(
125                    load_fixture("fixtures/response_encrypted.xml").encode()
126                ).decode()
127            },
128        )
129
130        parser = ResponseProcessor(self.source, request)
131        parser.parse()
132
133    def test_encrypted_incorrect_key(self):
134        """Test encrypted"""
135        kp = create_test_cert()
136        self.source.encryption_kp = kp
137        request = self.factory.post(
138            "/",
139            data={
140                "SAMLResponse": b64encode(
141                    load_fixture("fixtures/response_encrypted.xml").encode()
142                ).decode()
143            },
144        )
145
146        parser = ResponseProcessor(self.source, request)
147        with self.assertRaises(InvalidEncryption):
148            parser.parse()
149
150    @freeze_time("2022-10-14T14:16:40Z")
151    def test_verification_assertion(self):
152        """Test verifying signature inside assertion"""
153        key = load_fixture("fixtures/signature_cert.pem")
154        kp = CertificateKeyPair.objects.create(
155            name=generate_id(),
156            certificate_data=key,
157        )
158        self.source.verification_kp = kp
159        self.source.signed_assertion = True
160        self.source.signed_response = False
161        request = self.factory.post(
162            "/",
163            data={
164                "SAMLResponse": b64encode(
165                    load_fixture("fixtures/response_signed_assertion.xml").encode()
166                ).decode()
167            },
168        )
169
170        parser = ResponseProcessor(self.source, request)
171        parser.parse()
172
173    @freeze_time("2014-07-17T01:02:18Z")
174    def test_verification_assertion_duplicate(self):
175        """Test verifying signature inside assertion, where the response has another assertion
176        before our signed assertion"""
177        key = load_fixture("fixtures/signature_cert.pem")
178        kp = CertificateKeyPair.objects.create(
179            name=generate_id(),
180            certificate_data=key,
181        )
182        self.source.verification_kp = kp
183        self.source.signed_assertion = True
184        self.source.signed_response = False
185        request = self.factory.post(
186            "/",
187            data={
188                "SAMLResponse": b64encode(
189                    load_fixture("fixtures/response_signed_assertion_dup.xml").encode()
190                ).decode()
191            },
192        )
193
194        parser = ResponseProcessor(self.source, request)
195        parser.parse()
196        self.assertNotEqual(parser._get_name_id()[1], "bad")
197        self.assertEqual(parser._get_name_id()[1], "_ce3d2948b4cf20146dee0a0b3dd6f69b6cf86f62d7")
198
199    @freeze_time("2014-07-17T01:02:18Z")
200    def test_verification_assertion_xsw_nested_duplicate_id(self):
201        """Nested-duplicate-ID XSW: a forged outer Assertion shares its ID with a
202        nested copy of the original signed Assertion (placed inside <saml:Advice>),
203        so the Signature's Reference URI (#ORIG_ID) matches the outer Assertion's
204        ID *and* dereferences to legitimately-signed content. Must be rejected."""
205        key = load_fixture("fixtures/signature_cert.pem")
206        kp = CertificateKeyPair.objects.create(
207            name=generate_id(),
208            certificate_data=key,
209        )
210        self.source.verification_kp = kp
211        self.source.signed_assertion = True
212        self.source.signed_response = False
213        request = self.factory.post(
214            "/",
215            data={
216                "SAMLResponse": b64encode(
217                    load_fixture("fixtures/response_signed_assertion_xsw_nested.xml").encode()
218                ).decode()
219            },
220        )
221
222        parser = ResponseProcessor(self.source, request)
223        with self.assertRaises(InvalidSignature):
224            parser.parse()
225
226    @freeze_time("2014-07-17T01:02:18Z")
227    def test_verification_response_uri_empty(self):
228        """Some real-world IdPs (notably some Okta dev-tenant configurations
229        observed in the gosaml2 testdata corpus at saml.oktadev.com) sign the
230        Response with ds:Reference URI="" instead of URI="#<ID>". Per xmldsig
231        §4.4.3.2, URI="" covers the entire enclosing document via the
232        enveloped-signature transform — strictly more attested content than
233        "#<ID>" — so consuming the target is a subset of what was signed."""
234        key = load_fixture("fixtures/signature_cert_uri_empty.pem")
235        kp = CertificateKeyPair.objects.create(
236            name=generate_id(),
237            certificate_data=key,
238        )
239        self.source.verification_kp = kp
240        self.source.signed_response = True
241        self.source.signed_assertion = False
242        request = self.factory.post(
243            "/",
244            data={
245                "SAMLResponse": b64encode(
246                    load_fixture("fixtures/response_signed_response_uri_empty.xml").encode()
247                ).decode()
248            },
249        )
250
251        parser = ResponseProcessor(self.source, request)
252        parser.parse()
253
254    @freeze_time("2014-07-17T01:02:18Z")
255    def test_verification_assertion_uri_empty(self):
256        """Symmetric to test_verification_response_uri_empty but for an
257        Assertion-level signature: the same xmldsig "this document" semantics
258        still cover the whole enclosing document, so the Assertion we then
259        consume is part of the attested content. We have no real-world IdP
260        samples emitting this configuration, but the pre-fix code accepted it
261        and the cryptographic guarantee holds, so keep accepting it rather
262        than risk breaking an IdP we haven't sampled."""
263        key = load_fixture("fixtures/signature_cert_assertion_uri_empty.pem")
264        kp = CertificateKeyPair.objects.create(
265            name=generate_id(),
266            certificate_data=key,
267        )
268        self.source.verification_kp = kp
269        self.source.signed_assertion = True
270        self.source.signed_response = False
271        request = self.factory.post(
272            "/",
273            data={
274                "SAMLResponse": b64encode(
275                    load_fixture("fixtures/response_signed_assertion_uri_empty.xml").encode()
276                ).decode()
277            },
278        )
279
280        parser = ResponseProcessor(self.source, request)
281        parser.parse()
282
283    @freeze_time("2014-07-17T01:02:18Z")
284    def test_verification_assertion_xsw3(self):
285        """XSW-3 (signature relocation): a forged Assertion contains a Signature whose
286        ds:Reference URI points to a second Assertion in the document. The signature
287        verifies (because the digest matches the legitimate referenced Assertion),
288        but the verifier must NOT then consume the forged Assertion as if it were
289        signed."""
290        key = load_fixture("fixtures/signature_cert.pem")
291        kp = CertificateKeyPair.objects.create(
292            name=generate_id(),
293            certificate_data=key,
294        )
295        self.source.verification_kp = kp
296        self.source.signed_assertion = True
297        self.source.signed_response = False
298        request = self.factory.post(
299            "/",
300            data={
301                "SAMLResponse": b64encode(
302                    load_fixture("fixtures/response_signed_assertion_xsw3.xml").encode()
303                ).decode()
304            },
305        )
306
307        parser = ResponseProcessor(self.source, request)
308        with self.assertRaises(InvalidSignature):
309            parser.parse()
310
311    @freeze_time("2014-07-17T01:02:18Z")
312    def test_name_id_comment(self):
313        """Test comment in name ID"""
314        fixture = load_fixture("fixtures/response_signed_assertion.xml")
315        fixture = fixture.replace(
316            "_ce3d2948b4cf20146dee0a0b3dd6f69b6cf86f62d7",
317            "_ce3d2948b4cf20146dee0a0b3dd6f<!--x-->69b6cf86f62d7",
318        )
319        key = load_fixture("fixtures/signature_cert.pem")
320        kp = CertificateKeyPair.objects.create(
321            name=generate_id(),
322            certificate_data=key,
323        )
324        self.source.verification_kp = kp
325        self.source.signed_assertion = True
326        self.source.signed_response = False
327        request = self.factory.post(
328            "/",
329            data={"SAMLResponse": b64encode(fixture.encode()).decode()},
330        )
331
332        parser = ResponseProcessor(self.source, request)
333        parser.parse()
334        self.assertEqual(parser._get_name_id()[1], "_ce3d2948b4cf20146dee0a0b3dd6f69b6cf86f62d7")
335
336    @freeze_time("2014-07-17T01:02:18Z")
337    def test_verification_response(self):
338        """Test verifying signature inside response"""
339        key = load_fixture("fixtures/signature_cert.pem")
340        kp = CertificateKeyPair.objects.create(
341            name=generate_id(),
342            certificate_data=key,
343        )
344        self.source.verification_kp = kp
345        self.source.signed_response = True
346        self.source.signed_assertion = False
347        request = self.factory.post(
348            "/",
349            data={
350                "SAMLResponse": b64encode(
351                    load_fixture("fixtures/response_signed_response.xml").encode()
352                ).decode()
353            },
354        )
355
356        parser = ResponseProcessor(self.source, request)
357        parser.parse()
358
359    @freeze_time("2024-01-18T06:20:48Z")
360    def test_verification_response_and_assertion(self):
361        """Test verifying signature inside response and assertion"""
362        key = load_fixture("fixtures/signature_cert.pem")
363        kp = CertificateKeyPair.objects.create(
364            name=generate_id(),
365            certificate_data=key,
366        )
367        self.source.verification_kp = kp
368        self.source.signed_assertion = True
369        self.source.signed_response = True
370        request = self.factory.post(
371            "/",
372            data={
373                "SAMLResponse": b64encode(
374                    load_fixture("fixtures/response_signed_response_and_assertion.xml").encode()
375                ).decode()
376            },
377        )
378
379        parser = ResponseProcessor(self.source, request)
380        parser.parse()
381
382    def test_verification_wrong_signature(self):
383        """Test invalid signature fails"""
384        key = load_fixture("fixtures/signature_cert.pem")
385        kp = CertificateKeyPair.objects.create(
386            name=generate_id(),
387            certificate_data=key,
388        )
389        self.source.verification_kp = kp
390        self.source.signed_assertion = True
391        request = self.factory.post(
392            "/",
393            data={
394                "SAMLResponse": b64encode(
395                    # Same as response_signed_assertion.xml but the role name is altered
396                    load_fixture("fixtures/response_signed_error.xml").encode()
397                ).decode()
398            },
399        )
400
401        parser = ResponseProcessor(self.source, request)
402
403        with self.assertRaisesMessage(InvalidSignature, ""):
404            parser.parse()
405
406    @freeze_time("2022-10-14T14:15:00")
407    def test_verification_no_signature(self):
408        """Test rejecting response without signature when signed_assertion is True"""
409        key = load_fixture("fixtures/signature_cert.pem")
410        kp = CertificateKeyPair.objects.create(
411            name=generate_id(),
412            certificate_data=key,
413        )
414        self.source.verification_kp = kp
415        self.source.signed_assertion = True
416        request = self.factory.post(
417            "/",
418            data={
419                "SAMLResponse": b64encode(
420                    load_fixture("fixtures/response_success.xml").encode()
421                ).decode()
422            },
423        )
424
425        parser = ResponseProcessor(self.source, request)
426
427        with self.assertRaisesMessage(InvalidSignature, ""):
428            parser.parse()
429
430    def test_verification_incorrect_response(self):
431        """Test verifying signature inside response"""
432        key = load_fixture("fixtures/signature_cert.pem")
433        kp = CertificateKeyPair.objects.create(
434            name=generate_id(),
435            certificate_data=key,
436        )
437        self.source.verification_kp = kp
438        self.source.signed_response = True
439        self.source.signed_assertion = False
440        request = self.factory.post(
441            "/",
442            data={
443                "SAMLResponse": b64encode(
444                    load_fixture("fixtures/response_incorrect_signed_response.xml").encode()
445                ).decode()
446            },
447        )
448
449        parser = ResponseProcessor(self.source, request)
450        with self.assertRaisesMessage(InvalidSignature, ""):
451            parser.parse()
452
453    @freeze_time("2025-10-30T05:45:47.619Z")
454    def test_signed_encrypted_response(self):
455        """Test signed & encrypted response"""
456        verification_key = load_fixture("fixtures/signature_cert2.pem")
457        vkp = CertificateKeyPair.objects.create(
458            name=generate_id(),
459            certificate_data=verification_key,
460        )
461
462        encrypted_key = load_fixture("fixtures/encrypted-key2.pem")
463        ekp = CertificateKeyPair.objects.create(name=generate_id(), key_data=encrypted_key)
464
465        self.source.verification_kp = vkp
466        self.source.encryption_kp = ekp
467        self.source.signed_response = True
468        self.source.signed_assertion = False
469        request = self.factory.post(
470            "/",
471            data={
472                "SAMLResponse": b64encode(
473                    load_fixture("fixtures/response_signed_encrypted.xml").encode()
474                ).decode()
475            },
476        )
477
478        parser = ResponseProcessor(self.source, request)
479        parser.parse()
480
481    @freeze_time("2026-01-21T14:23")
482    def test_transient(self):
483        """Test SAML transient NameID"""
484        verification_key = load_fixture("fixtures/signature_cert2.pem")
485        vkp = CertificateKeyPair.objects.create(
486            name=generate_id(),
487            certificate_data=verification_key,
488        )
489        self.source.verification_kp = vkp
490        self.source.signed_response = True
491        self.source.signed_assertion = False
492        request = self.factory.post(
493            "/",
494            data={
495                "SAMLResponse": b64encode(
496                    load_fixture("fixtures/response_transient.xml").encode()
497                ).decode()
498            },
499        )
500
501        parser = ResponseProcessor(self.source, request)
502        parser.parse()
503        parser.prepare_flow_manager()
class TestResponseProcessor(django.test.testcases.TestCase):
 18class TestResponseProcessor(TestCase):
 19    """Test ResponseProcessor"""
 20
 21    def setUp(self):
 22        self.factory = RequestFactory()
 23        self.source = SAMLSource.objects.create(
 24            name=generate_id(),
 25            slug=generate_id(),
 26            issuer="authentik",
 27            allow_idp_initiated=True,
 28            pre_authentication_flow=create_test_flow(),
 29        )
 30
 31    def test_status_error(self):
 32        """Test error status"""
 33        request = self.factory.post(
 34            "/",
 35            data={
 36                "SAMLResponse": b64encode(
 37                    load_fixture("fixtures/response_error.xml").encode()
 38                ).decode()
 39            },
 40        )
 41
 42        with self.assertRaisesMessage(
 43            ValueError,
 44            (
 45                "Invalid request, ACS Url in request http://localhost:9000/source/saml/google/acs/ "
 46                "doesn't match configured ACS Url https://127.0.0.1:9443/source/saml/google/acs/."
 47            ),
 48        ):
 49            ResponseProcessor(self.source, request).parse()
 50
 51    @freeze_time("2022-10-14T14:15:00")
 52    def test_success(self):
 53        """Test success"""
 54        request = self.factory.post(
 55            "/",
 56            data={
 57                "SAMLResponse": b64encode(
 58                    load_fixture("fixtures/response_success.xml").encode()
 59                ).decode()
 60            },
 61        )
 62
 63        parser = ResponseProcessor(self.source, request)
 64        parser.parse()
 65        sfm = parser.prepare_flow_manager()
 66        self.assertEqual(
 67            sfm.user_properties,
 68            {
 69                "email": "foo@bar.baz",
 70                "name": "foo",
 71                "sn": "bar",
 72                "username": "jens@goauthentik.io",
 73                "attributes": {},
 74                "path": self.source.get_user_path(),
 75            },
 76        )
 77
 78    @freeze_time("2022-10-14T14:16:40Z")
 79    def test_success_with_status_message_and_detail(self):
 80        """Test success with StatusMessage and StatusDetail present (should not raise error)"""
 81        request = self.factory.post(
 82            "/",
 83            data={
 84                "SAMLResponse": b64encode(
 85                    load_fixture("fixtures/response_success_with_message.xml").encode()
 86                ).decode()
 87            },
 88        )
 89
 90        parser = ResponseProcessor(self.source, request)
 91        parser.parse()
 92        sfm = parser.prepare_flow_manager()
 93        self.assertEqual(sfm.user_properties["username"], "jens@goauthentik.io")
 94
 95    @freeze_time("2022-10-14T14:16:40Z")
 96    def test_error_with_message_and_detail(self):
 97        """Test error status with StatusMessage and StatusDetail includes both in error"""
 98        request = self.factory.post(
 99            "/",
100            data={
101                "SAMLResponse": b64encode(
102                    load_fixture("fixtures/response_error_with_detail.xml").encode()
103                ).decode()
104            },
105        )
106
107        with self.assertRaises(ValueError) as ctx:
108            ResponseProcessor(self.source, request).parse()
109        # Should contain both detail and message
110        self.assertIn("User account is disabled", str(ctx.exception))
111        self.assertIn("Authentication failed", str(ctx.exception))
112
113    @freeze_time("2024-08-07T15:48:09.325Z")
114    def test_encrypted_correct(self):
115        """Test encrypted"""
116        key = load_fixture("fixtures/encrypted-key.pem")
117        kp = CertificateKeyPair.objects.create(
118            name=generate_id(),
119            key_data=key,
120        )
121        self.source.encryption_kp = kp
122        request = self.factory.post(
123            "/",
124            data={
125                "SAMLResponse": b64encode(
126                    load_fixture("fixtures/response_encrypted.xml").encode()
127                ).decode()
128            },
129        )
130
131        parser = ResponseProcessor(self.source, request)
132        parser.parse()
133
134    def test_encrypted_incorrect_key(self):
135        """Test encrypted"""
136        kp = create_test_cert()
137        self.source.encryption_kp = kp
138        request = self.factory.post(
139            "/",
140            data={
141                "SAMLResponse": b64encode(
142                    load_fixture("fixtures/response_encrypted.xml").encode()
143                ).decode()
144            },
145        )
146
147        parser = ResponseProcessor(self.source, request)
148        with self.assertRaises(InvalidEncryption):
149            parser.parse()
150
151    @freeze_time("2022-10-14T14:16:40Z")
152    def test_verification_assertion(self):
153        """Test verifying signature inside assertion"""
154        key = load_fixture("fixtures/signature_cert.pem")
155        kp = CertificateKeyPair.objects.create(
156            name=generate_id(),
157            certificate_data=key,
158        )
159        self.source.verification_kp = kp
160        self.source.signed_assertion = True
161        self.source.signed_response = False
162        request = self.factory.post(
163            "/",
164            data={
165                "SAMLResponse": b64encode(
166                    load_fixture("fixtures/response_signed_assertion.xml").encode()
167                ).decode()
168            },
169        )
170
171        parser = ResponseProcessor(self.source, request)
172        parser.parse()
173
174    @freeze_time("2014-07-17T01:02:18Z")
175    def test_verification_assertion_duplicate(self):
176        """Test verifying signature inside assertion, where the response has another assertion
177        before our signed assertion"""
178        key = load_fixture("fixtures/signature_cert.pem")
179        kp = CertificateKeyPair.objects.create(
180            name=generate_id(),
181            certificate_data=key,
182        )
183        self.source.verification_kp = kp
184        self.source.signed_assertion = True
185        self.source.signed_response = False
186        request = self.factory.post(
187            "/",
188            data={
189                "SAMLResponse": b64encode(
190                    load_fixture("fixtures/response_signed_assertion_dup.xml").encode()
191                ).decode()
192            },
193        )
194
195        parser = ResponseProcessor(self.source, request)
196        parser.parse()
197        self.assertNotEqual(parser._get_name_id()[1], "bad")
198        self.assertEqual(parser._get_name_id()[1], "_ce3d2948b4cf20146dee0a0b3dd6f69b6cf86f62d7")
199
200    @freeze_time("2014-07-17T01:02:18Z")
201    def test_verification_assertion_xsw_nested_duplicate_id(self):
202        """Nested-duplicate-ID XSW: a forged outer Assertion shares its ID with a
203        nested copy of the original signed Assertion (placed inside <saml:Advice>),
204        so the Signature's Reference URI (#ORIG_ID) matches the outer Assertion's
205        ID *and* dereferences to legitimately-signed content. Must be rejected."""
206        key = load_fixture("fixtures/signature_cert.pem")
207        kp = CertificateKeyPair.objects.create(
208            name=generate_id(),
209            certificate_data=key,
210        )
211        self.source.verification_kp = kp
212        self.source.signed_assertion = True
213        self.source.signed_response = False
214        request = self.factory.post(
215            "/",
216            data={
217                "SAMLResponse": b64encode(
218                    load_fixture("fixtures/response_signed_assertion_xsw_nested.xml").encode()
219                ).decode()
220            },
221        )
222
223        parser = ResponseProcessor(self.source, request)
224        with self.assertRaises(InvalidSignature):
225            parser.parse()
226
227    @freeze_time("2014-07-17T01:02:18Z")
228    def test_verification_response_uri_empty(self):
229        """Some real-world IdPs (notably some Okta dev-tenant configurations
230        observed in the gosaml2 testdata corpus at saml.oktadev.com) sign the
231        Response with ds:Reference URI="" instead of URI="#<ID>". Per xmldsig
232        §4.4.3.2, URI="" covers the entire enclosing document via the
233        enveloped-signature transform — strictly more attested content than
234        "#<ID>" — so consuming the target is a subset of what was signed."""
235        key = load_fixture("fixtures/signature_cert_uri_empty.pem")
236        kp = CertificateKeyPair.objects.create(
237            name=generate_id(),
238            certificate_data=key,
239        )
240        self.source.verification_kp = kp
241        self.source.signed_response = True
242        self.source.signed_assertion = False
243        request = self.factory.post(
244            "/",
245            data={
246                "SAMLResponse": b64encode(
247                    load_fixture("fixtures/response_signed_response_uri_empty.xml").encode()
248                ).decode()
249            },
250        )
251
252        parser = ResponseProcessor(self.source, request)
253        parser.parse()
254
255    @freeze_time("2014-07-17T01:02:18Z")
256    def test_verification_assertion_uri_empty(self):
257        """Symmetric to test_verification_response_uri_empty but for an
258        Assertion-level signature: the same xmldsig "this document" semantics
259        still cover the whole enclosing document, so the Assertion we then
260        consume is part of the attested content. We have no real-world IdP
261        samples emitting this configuration, but the pre-fix code accepted it
262        and the cryptographic guarantee holds, so keep accepting it rather
263        than risk breaking an IdP we haven't sampled."""
264        key = load_fixture("fixtures/signature_cert_assertion_uri_empty.pem")
265        kp = CertificateKeyPair.objects.create(
266            name=generate_id(),
267            certificate_data=key,
268        )
269        self.source.verification_kp = kp
270        self.source.signed_assertion = True
271        self.source.signed_response = False
272        request = self.factory.post(
273            "/",
274            data={
275                "SAMLResponse": b64encode(
276                    load_fixture("fixtures/response_signed_assertion_uri_empty.xml").encode()
277                ).decode()
278            },
279        )
280
281        parser = ResponseProcessor(self.source, request)
282        parser.parse()
283
284    @freeze_time("2014-07-17T01:02:18Z")
285    def test_verification_assertion_xsw3(self):
286        """XSW-3 (signature relocation): a forged Assertion contains a Signature whose
287        ds:Reference URI points to a second Assertion in the document. The signature
288        verifies (because the digest matches the legitimate referenced Assertion),
289        but the verifier must NOT then consume the forged Assertion as if it were
290        signed."""
291        key = load_fixture("fixtures/signature_cert.pem")
292        kp = CertificateKeyPair.objects.create(
293            name=generate_id(),
294            certificate_data=key,
295        )
296        self.source.verification_kp = kp
297        self.source.signed_assertion = True
298        self.source.signed_response = False
299        request = self.factory.post(
300            "/",
301            data={
302                "SAMLResponse": b64encode(
303                    load_fixture("fixtures/response_signed_assertion_xsw3.xml").encode()
304                ).decode()
305            },
306        )
307
308        parser = ResponseProcessor(self.source, request)
309        with self.assertRaises(InvalidSignature):
310            parser.parse()
311
312    @freeze_time("2014-07-17T01:02:18Z")
313    def test_name_id_comment(self):
314        """Test comment in name ID"""
315        fixture = load_fixture("fixtures/response_signed_assertion.xml")
316        fixture = fixture.replace(
317            "_ce3d2948b4cf20146dee0a0b3dd6f69b6cf86f62d7",
318            "_ce3d2948b4cf20146dee0a0b3dd6f<!--x-->69b6cf86f62d7",
319        )
320        key = load_fixture("fixtures/signature_cert.pem")
321        kp = CertificateKeyPair.objects.create(
322            name=generate_id(),
323            certificate_data=key,
324        )
325        self.source.verification_kp = kp
326        self.source.signed_assertion = True
327        self.source.signed_response = False
328        request = self.factory.post(
329            "/",
330            data={"SAMLResponse": b64encode(fixture.encode()).decode()},
331        )
332
333        parser = ResponseProcessor(self.source, request)
334        parser.parse()
335        self.assertEqual(parser._get_name_id()[1], "_ce3d2948b4cf20146dee0a0b3dd6f69b6cf86f62d7")
336
337    @freeze_time("2014-07-17T01:02:18Z")
338    def test_verification_response(self):
339        """Test verifying signature inside response"""
340        key = load_fixture("fixtures/signature_cert.pem")
341        kp = CertificateKeyPair.objects.create(
342            name=generate_id(),
343            certificate_data=key,
344        )
345        self.source.verification_kp = kp
346        self.source.signed_response = True
347        self.source.signed_assertion = False
348        request = self.factory.post(
349            "/",
350            data={
351                "SAMLResponse": b64encode(
352                    load_fixture("fixtures/response_signed_response.xml").encode()
353                ).decode()
354            },
355        )
356
357        parser = ResponseProcessor(self.source, request)
358        parser.parse()
359
360    @freeze_time("2024-01-18T06:20:48Z")
361    def test_verification_response_and_assertion(self):
362        """Test verifying signature inside response and assertion"""
363        key = load_fixture("fixtures/signature_cert.pem")
364        kp = CertificateKeyPair.objects.create(
365            name=generate_id(),
366            certificate_data=key,
367        )
368        self.source.verification_kp = kp
369        self.source.signed_assertion = True
370        self.source.signed_response = True
371        request = self.factory.post(
372            "/",
373            data={
374                "SAMLResponse": b64encode(
375                    load_fixture("fixtures/response_signed_response_and_assertion.xml").encode()
376                ).decode()
377            },
378        )
379
380        parser = ResponseProcessor(self.source, request)
381        parser.parse()
382
383    def test_verification_wrong_signature(self):
384        """Test invalid signature fails"""
385        key = load_fixture("fixtures/signature_cert.pem")
386        kp = CertificateKeyPair.objects.create(
387            name=generate_id(),
388            certificate_data=key,
389        )
390        self.source.verification_kp = kp
391        self.source.signed_assertion = True
392        request = self.factory.post(
393            "/",
394            data={
395                "SAMLResponse": b64encode(
396                    # Same as response_signed_assertion.xml but the role name is altered
397                    load_fixture("fixtures/response_signed_error.xml").encode()
398                ).decode()
399            },
400        )
401
402        parser = ResponseProcessor(self.source, request)
403
404        with self.assertRaisesMessage(InvalidSignature, ""):
405            parser.parse()
406
407    @freeze_time("2022-10-14T14:15:00")
408    def test_verification_no_signature(self):
409        """Test rejecting response without signature when signed_assertion is True"""
410        key = load_fixture("fixtures/signature_cert.pem")
411        kp = CertificateKeyPair.objects.create(
412            name=generate_id(),
413            certificate_data=key,
414        )
415        self.source.verification_kp = kp
416        self.source.signed_assertion = True
417        request = self.factory.post(
418            "/",
419            data={
420                "SAMLResponse": b64encode(
421                    load_fixture("fixtures/response_success.xml").encode()
422                ).decode()
423            },
424        )
425
426        parser = ResponseProcessor(self.source, request)
427
428        with self.assertRaisesMessage(InvalidSignature, ""):
429            parser.parse()
430
431    def test_verification_incorrect_response(self):
432        """Test verifying signature inside response"""
433        key = load_fixture("fixtures/signature_cert.pem")
434        kp = CertificateKeyPair.objects.create(
435            name=generate_id(),
436            certificate_data=key,
437        )
438        self.source.verification_kp = kp
439        self.source.signed_response = True
440        self.source.signed_assertion = False
441        request = self.factory.post(
442            "/",
443            data={
444                "SAMLResponse": b64encode(
445                    load_fixture("fixtures/response_incorrect_signed_response.xml").encode()
446                ).decode()
447            },
448        )
449
450        parser = ResponseProcessor(self.source, request)
451        with self.assertRaisesMessage(InvalidSignature, ""):
452            parser.parse()
453
454    @freeze_time("2025-10-30T05:45:47.619Z")
455    def test_signed_encrypted_response(self):
456        """Test signed & encrypted response"""
457        verification_key = load_fixture("fixtures/signature_cert2.pem")
458        vkp = CertificateKeyPair.objects.create(
459            name=generate_id(),
460            certificate_data=verification_key,
461        )
462
463        encrypted_key = load_fixture("fixtures/encrypted-key2.pem")
464        ekp = CertificateKeyPair.objects.create(name=generate_id(), key_data=encrypted_key)
465
466        self.source.verification_kp = vkp
467        self.source.encryption_kp = ekp
468        self.source.signed_response = True
469        self.source.signed_assertion = False
470        request = self.factory.post(
471            "/",
472            data={
473                "SAMLResponse": b64encode(
474                    load_fixture("fixtures/response_signed_encrypted.xml").encode()
475                ).decode()
476            },
477        )
478
479        parser = ResponseProcessor(self.source, request)
480        parser.parse()
481
482    @freeze_time("2026-01-21T14:23")
483    def test_transient(self):
484        """Test SAML transient NameID"""
485        verification_key = load_fixture("fixtures/signature_cert2.pem")
486        vkp = CertificateKeyPair.objects.create(
487            name=generate_id(),
488            certificate_data=verification_key,
489        )
490        self.source.verification_kp = vkp
491        self.source.signed_response = True
492        self.source.signed_assertion = False
493        request = self.factory.post(
494            "/",
495            data={
496                "SAMLResponse": b64encode(
497                    load_fixture("fixtures/response_transient.xml").encode()
498                ).decode()
499            },
500        )
501
502        parser = ResponseProcessor(self.source, request)
503        parser.parse()
504        parser.prepare_flow_manager()

Test ResponseProcessor

def setUp(self):
21    def setUp(self):
22        self.factory = RequestFactory()
23        self.source = SAMLSource.objects.create(
24            name=generate_id(),
25            slug=generate_id(),
26            issuer="authentik",
27            allow_idp_initiated=True,
28            pre_authentication_flow=create_test_flow(),
29        )

Hook method for setting up the test fixture before exercising it.

def test_status_error(self):
31    def test_status_error(self):
32        """Test error status"""
33        request = self.factory.post(
34            "/",
35            data={
36                "SAMLResponse": b64encode(
37                    load_fixture("fixtures/response_error.xml").encode()
38                ).decode()
39            },
40        )
41
42        with self.assertRaisesMessage(
43            ValueError,
44            (
45                "Invalid request, ACS Url in request http://localhost:9000/source/saml/google/acs/ "
46                "doesn't match configured ACS Url https://127.0.0.1:9443/source/saml/google/acs/."
47            ),
48        ):
49            ResponseProcessor(self.source, request).parse()

Test error status

@freeze_time('2022-10-14T14:15:00')
def test_success(self):
51    @freeze_time("2022-10-14T14:15:00")
52    def test_success(self):
53        """Test success"""
54        request = self.factory.post(
55            "/",
56            data={
57                "SAMLResponse": b64encode(
58                    load_fixture("fixtures/response_success.xml").encode()
59                ).decode()
60            },
61        )
62
63        parser = ResponseProcessor(self.source, request)
64        parser.parse()
65        sfm = parser.prepare_flow_manager()
66        self.assertEqual(
67            sfm.user_properties,
68            {
69                "email": "foo@bar.baz",
70                "name": "foo",
71                "sn": "bar",
72                "username": "jens@goauthentik.io",
73                "attributes": {},
74                "path": self.source.get_user_path(),
75            },
76        )

Test success

@freeze_time('2022-10-14T14:16:40Z')
def test_success_with_status_message_and_detail(self):
78    @freeze_time("2022-10-14T14:16:40Z")
79    def test_success_with_status_message_and_detail(self):
80        """Test success with StatusMessage and StatusDetail present (should not raise error)"""
81        request = self.factory.post(
82            "/",
83            data={
84                "SAMLResponse": b64encode(
85                    load_fixture("fixtures/response_success_with_message.xml").encode()
86                ).decode()
87            },
88        )
89
90        parser = ResponseProcessor(self.source, request)
91        parser.parse()
92        sfm = parser.prepare_flow_manager()
93        self.assertEqual(sfm.user_properties["username"], "jens@goauthentik.io")

Test success with StatusMessage and StatusDetail present (should not raise error)

@freeze_time('2022-10-14T14:16:40Z')
def test_error_with_message_and_detail(self):
 95    @freeze_time("2022-10-14T14:16:40Z")
 96    def test_error_with_message_and_detail(self):
 97        """Test error status with StatusMessage and StatusDetail includes both in error"""
 98        request = self.factory.post(
 99            "/",
100            data={
101                "SAMLResponse": b64encode(
102                    load_fixture("fixtures/response_error_with_detail.xml").encode()
103                ).decode()
104            },
105        )
106
107        with self.assertRaises(ValueError) as ctx:
108            ResponseProcessor(self.source, request).parse()
109        # Should contain both detail and message
110        self.assertIn("User account is disabled", str(ctx.exception))
111        self.assertIn("Authentication failed", str(ctx.exception))

Test error status with StatusMessage and StatusDetail includes both in error

@freeze_time('2024-08-07T15:48:09.325Z')
def test_encrypted_correct(self):
113    @freeze_time("2024-08-07T15:48:09.325Z")
114    def test_encrypted_correct(self):
115        """Test encrypted"""
116        key = load_fixture("fixtures/encrypted-key.pem")
117        kp = CertificateKeyPair.objects.create(
118            name=generate_id(),
119            key_data=key,
120        )
121        self.source.encryption_kp = kp
122        request = self.factory.post(
123            "/",
124            data={
125                "SAMLResponse": b64encode(
126                    load_fixture("fixtures/response_encrypted.xml").encode()
127                ).decode()
128            },
129        )
130
131        parser = ResponseProcessor(self.source, request)
132        parser.parse()

Test encrypted

def test_encrypted_incorrect_key(self):
134    def test_encrypted_incorrect_key(self):
135        """Test encrypted"""
136        kp = create_test_cert()
137        self.source.encryption_kp = kp
138        request = self.factory.post(
139            "/",
140            data={
141                "SAMLResponse": b64encode(
142                    load_fixture("fixtures/response_encrypted.xml").encode()
143                ).decode()
144            },
145        )
146
147        parser = ResponseProcessor(self.source, request)
148        with self.assertRaises(InvalidEncryption):
149            parser.parse()

Test encrypted

@freeze_time('2022-10-14T14:16:40Z')
def test_verification_assertion(self):
151    @freeze_time("2022-10-14T14:16:40Z")
152    def test_verification_assertion(self):
153        """Test verifying signature inside assertion"""
154        key = load_fixture("fixtures/signature_cert.pem")
155        kp = CertificateKeyPair.objects.create(
156            name=generate_id(),
157            certificate_data=key,
158        )
159        self.source.verification_kp = kp
160        self.source.signed_assertion = True
161        self.source.signed_response = False
162        request = self.factory.post(
163            "/",
164            data={
165                "SAMLResponse": b64encode(
166                    load_fixture("fixtures/response_signed_assertion.xml").encode()
167                ).decode()
168            },
169        )
170
171        parser = ResponseProcessor(self.source, request)
172        parser.parse()

Test verifying signature inside assertion

@freeze_time('2014-07-17T01:02:18Z')
def test_verification_assertion_duplicate(self):
174    @freeze_time("2014-07-17T01:02:18Z")
175    def test_verification_assertion_duplicate(self):
176        """Test verifying signature inside assertion, where the response has another assertion
177        before our signed assertion"""
178        key = load_fixture("fixtures/signature_cert.pem")
179        kp = CertificateKeyPair.objects.create(
180            name=generate_id(),
181            certificate_data=key,
182        )
183        self.source.verification_kp = kp
184        self.source.signed_assertion = True
185        self.source.signed_response = False
186        request = self.factory.post(
187            "/",
188            data={
189                "SAMLResponse": b64encode(
190                    load_fixture("fixtures/response_signed_assertion_dup.xml").encode()
191                ).decode()
192            },
193        )
194
195        parser = ResponseProcessor(self.source, request)
196        parser.parse()
197        self.assertNotEqual(parser._get_name_id()[1], "bad")
198        self.assertEqual(parser._get_name_id()[1], "_ce3d2948b4cf20146dee0a0b3dd6f69b6cf86f62d7")

Test verifying signature inside assertion, where the response has another assertion before our signed assertion

@freeze_time('2014-07-17T01:02:18Z')
def test_verification_assertion_xsw_nested_duplicate_id(self):
200    @freeze_time("2014-07-17T01:02:18Z")
201    def test_verification_assertion_xsw_nested_duplicate_id(self):
202        """Nested-duplicate-ID XSW: a forged outer Assertion shares its ID with a
203        nested copy of the original signed Assertion (placed inside <saml:Advice>),
204        so the Signature's Reference URI (#ORIG_ID) matches the outer Assertion's
205        ID *and* dereferences to legitimately-signed content. Must be rejected."""
206        key = load_fixture("fixtures/signature_cert.pem")
207        kp = CertificateKeyPair.objects.create(
208            name=generate_id(),
209            certificate_data=key,
210        )
211        self.source.verification_kp = kp
212        self.source.signed_assertion = True
213        self.source.signed_response = False
214        request = self.factory.post(
215            "/",
216            data={
217                "SAMLResponse": b64encode(
218                    load_fixture("fixtures/response_signed_assertion_xsw_nested.xml").encode()
219                ).decode()
220            },
221        )
222
223        parser = ResponseProcessor(self.source, request)
224        with self.assertRaises(InvalidSignature):
225            parser.parse()

Nested-duplicate-ID XSW: a forged outer Assertion shares its ID with a nested copy of the original signed Assertion (placed inside ), so the Signature's Reference URI (#ORIG_ID) matches the outer Assertion's ID and dereferences to legitimately-signed content. Must be rejected.

@freeze_time('2014-07-17T01:02:18Z')
def test_verification_response_uri_empty(self):
227    @freeze_time("2014-07-17T01:02:18Z")
228    def test_verification_response_uri_empty(self):
229        """Some real-world IdPs (notably some Okta dev-tenant configurations
230        observed in the gosaml2 testdata corpus at saml.oktadev.com) sign the
231        Response with ds:Reference URI="" instead of URI="#<ID>". Per xmldsig
232        §4.4.3.2, URI="" covers the entire enclosing document via the
233        enveloped-signature transform — strictly more attested content than
234        "#<ID>" — so consuming the target is a subset of what was signed."""
235        key = load_fixture("fixtures/signature_cert_uri_empty.pem")
236        kp = CertificateKeyPair.objects.create(
237            name=generate_id(),
238            certificate_data=key,
239        )
240        self.source.verification_kp = kp
241        self.source.signed_response = True
242        self.source.signed_assertion = False
243        request = self.factory.post(
244            "/",
245            data={
246                "SAMLResponse": b64encode(
247                    load_fixture("fixtures/response_signed_response_uri_empty.xml").encode()
248                ).decode()
249            },
250        )
251
252        parser = ResponseProcessor(self.source, request)
253        parser.parse()

Some real-world IdPs (notably some Okta dev-tenant configurations observed in the gosaml2 testdata corpus at saml.oktadev.com) sign the Response with ds:Reference URI="" instead of URI="#". Per xmldsig §4.4.3.2, URI="" covers the entire enclosing document via the enveloped-signature transform — strictly more attested content than "#" — so consuming the target is a subset of what was signed.

@freeze_time('2014-07-17T01:02:18Z')
def test_verification_assertion_uri_empty(self):
255    @freeze_time("2014-07-17T01:02:18Z")
256    def test_verification_assertion_uri_empty(self):
257        """Symmetric to test_verification_response_uri_empty but for an
258        Assertion-level signature: the same xmldsig "this document" semantics
259        still cover the whole enclosing document, so the Assertion we then
260        consume is part of the attested content. We have no real-world IdP
261        samples emitting this configuration, but the pre-fix code accepted it
262        and the cryptographic guarantee holds, so keep accepting it rather
263        than risk breaking an IdP we haven't sampled."""
264        key = load_fixture("fixtures/signature_cert_assertion_uri_empty.pem")
265        kp = CertificateKeyPair.objects.create(
266            name=generate_id(),
267            certificate_data=key,
268        )
269        self.source.verification_kp = kp
270        self.source.signed_assertion = True
271        self.source.signed_response = False
272        request = self.factory.post(
273            "/",
274            data={
275                "SAMLResponse": b64encode(
276                    load_fixture("fixtures/response_signed_assertion_uri_empty.xml").encode()
277                ).decode()
278            },
279        )
280
281        parser = ResponseProcessor(self.source, request)
282        parser.parse()

Symmetric to test_verification_response_uri_empty but for an Assertion-level signature: the same xmldsig "this document" semantics still cover the whole enclosing document, so the Assertion we then consume is part of the attested content. We have no real-world IdP samples emitting this configuration, but the pre-fix code accepted it and the cryptographic guarantee holds, so keep accepting it rather than risk breaking an IdP we haven't sampled.

@freeze_time('2014-07-17T01:02:18Z')
def test_verification_assertion_xsw3(self):
284    @freeze_time("2014-07-17T01:02:18Z")
285    def test_verification_assertion_xsw3(self):
286        """XSW-3 (signature relocation): a forged Assertion contains a Signature whose
287        ds:Reference URI points to a second Assertion in the document. The signature
288        verifies (because the digest matches the legitimate referenced Assertion),
289        but the verifier must NOT then consume the forged Assertion as if it were
290        signed."""
291        key = load_fixture("fixtures/signature_cert.pem")
292        kp = CertificateKeyPair.objects.create(
293            name=generate_id(),
294            certificate_data=key,
295        )
296        self.source.verification_kp = kp
297        self.source.signed_assertion = True
298        self.source.signed_response = False
299        request = self.factory.post(
300            "/",
301            data={
302                "SAMLResponse": b64encode(
303                    load_fixture("fixtures/response_signed_assertion_xsw3.xml").encode()
304                ).decode()
305            },
306        )
307
308        parser = ResponseProcessor(self.source, request)
309        with self.assertRaises(InvalidSignature):
310            parser.parse()

XSW-3 (signature relocation): a forged Assertion contains a Signature whose ds:Reference URI points to a second Assertion in the document. The signature verifies (because the digest matches the legitimate referenced Assertion), but the verifier must NOT then consume the forged Assertion as if it were signed.

@freeze_time('2014-07-17T01:02:18Z')
def test_name_id_comment(self):
312    @freeze_time("2014-07-17T01:02:18Z")
313    def test_name_id_comment(self):
314        """Test comment in name ID"""
315        fixture = load_fixture("fixtures/response_signed_assertion.xml")
316        fixture = fixture.replace(
317            "_ce3d2948b4cf20146dee0a0b3dd6f69b6cf86f62d7",
318            "_ce3d2948b4cf20146dee0a0b3dd6f<!--x-->69b6cf86f62d7",
319        )
320        key = load_fixture("fixtures/signature_cert.pem")
321        kp = CertificateKeyPair.objects.create(
322            name=generate_id(),
323            certificate_data=key,
324        )
325        self.source.verification_kp = kp
326        self.source.signed_assertion = True
327        self.source.signed_response = False
328        request = self.factory.post(
329            "/",
330            data={"SAMLResponse": b64encode(fixture.encode()).decode()},
331        )
332
333        parser = ResponseProcessor(self.source, request)
334        parser.parse()
335        self.assertEqual(parser._get_name_id()[1], "_ce3d2948b4cf20146dee0a0b3dd6f69b6cf86f62d7")

Test comment in name ID

@freeze_time('2014-07-17T01:02:18Z')
def test_verification_response(self):
337    @freeze_time("2014-07-17T01:02:18Z")
338    def test_verification_response(self):
339        """Test verifying signature inside response"""
340        key = load_fixture("fixtures/signature_cert.pem")
341        kp = CertificateKeyPair.objects.create(
342            name=generate_id(),
343            certificate_data=key,
344        )
345        self.source.verification_kp = kp
346        self.source.signed_response = True
347        self.source.signed_assertion = False
348        request = self.factory.post(
349            "/",
350            data={
351                "SAMLResponse": b64encode(
352                    load_fixture("fixtures/response_signed_response.xml").encode()
353                ).decode()
354            },
355        )
356
357        parser = ResponseProcessor(self.source, request)
358        parser.parse()

Test verifying signature inside response

@freeze_time('2024-01-18T06:20:48Z')
def test_verification_response_and_assertion(self):
360    @freeze_time("2024-01-18T06:20:48Z")
361    def test_verification_response_and_assertion(self):
362        """Test verifying signature inside response and assertion"""
363        key = load_fixture("fixtures/signature_cert.pem")
364        kp = CertificateKeyPair.objects.create(
365            name=generate_id(),
366            certificate_data=key,
367        )
368        self.source.verification_kp = kp
369        self.source.signed_assertion = True
370        self.source.signed_response = True
371        request = self.factory.post(
372            "/",
373            data={
374                "SAMLResponse": b64encode(
375                    load_fixture("fixtures/response_signed_response_and_assertion.xml").encode()
376                ).decode()
377            },
378        )
379
380        parser = ResponseProcessor(self.source, request)
381        parser.parse()

Test verifying signature inside response and assertion

def test_verification_wrong_signature(self):
383    def test_verification_wrong_signature(self):
384        """Test invalid signature fails"""
385        key = load_fixture("fixtures/signature_cert.pem")
386        kp = CertificateKeyPair.objects.create(
387            name=generate_id(),
388            certificate_data=key,
389        )
390        self.source.verification_kp = kp
391        self.source.signed_assertion = True
392        request = self.factory.post(
393            "/",
394            data={
395                "SAMLResponse": b64encode(
396                    # Same as response_signed_assertion.xml but the role name is altered
397                    load_fixture("fixtures/response_signed_error.xml").encode()
398                ).decode()
399            },
400        )
401
402        parser = ResponseProcessor(self.source, request)
403
404        with self.assertRaisesMessage(InvalidSignature, ""):
405            parser.parse()

Test invalid signature fails

@freeze_time('2022-10-14T14:15:00')
def test_verification_no_signature(self):
407    @freeze_time("2022-10-14T14:15:00")
408    def test_verification_no_signature(self):
409        """Test rejecting response without signature when signed_assertion is True"""
410        key = load_fixture("fixtures/signature_cert.pem")
411        kp = CertificateKeyPair.objects.create(
412            name=generate_id(),
413            certificate_data=key,
414        )
415        self.source.verification_kp = kp
416        self.source.signed_assertion = True
417        request = self.factory.post(
418            "/",
419            data={
420                "SAMLResponse": b64encode(
421                    load_fixture("fixtures/response_success.xml").encode()
422                ).decode()
423            },
424        )
425
426        parser = ResponseProcessor(self.source, request)
427
428        with self.assertRaisesMessage(InvalidSignature, ""):
429            parser.parse()

Test rejecting response without signature when signed_assertion is True

def test_verification_incorrect_response(self):
431    def test_verification_incorrect_response(self):
432        """Test verifying signature inside response"""
433        key = load_fixture("fixtures/signature_cert.pem")
434        kp = CertificateKeyPair.objects.create(
435            name=generate_id(),
436            certificate_data=key,
437        )
438        self.source.verification_kp = kp
439        self.source.signed_response = True
440        self.source.signed_assertion = False
441        request = self.factory.post(
442            "/",
443            data={
444                "SAMLResponse": b64encode(
445                    load_fixture("fixtures/response_incorrect_signed_response.xml").encode()
446                ).decode()
447            },
448        )
449
450        parser = ResponseProcessor(self.source, request)
451        with self.assertRaisesMessage(InvalidSignature, ""):
452            parser.parse()

Test verifying signature inside response

@freeze_time('2025-10-30T05:45:47.619Z')
def test_signed_encrypted_response(self):
454    @freeze_time("2025-10-30T05:45:47.619Z")
455    def test_signed_encrypted_response(self):
456        """Test signed & encrypted response"""
457        verification_key = load_fixture("fixtures/signature_cert2.pem")
458        vkp = CertificateKeyPair.objects.create(
459            name=generate_id(),
460            certificate_data=verification_key,
461        )
462
463        encrypted_key = load_fixture("fixtures/encrypted-key2.pem")
464        ekp = CertificateKeyPair.objects.create(name=generate_id(), key_data=encrypted_key)
465
466        self.source.verification_kp = vkp
467        self.source.encryption_kp = ekp
468        self.source.signed_response = True
469        self.source.signed_assertion = False
470        request = self.factory.post(
471            "/",
472            data={
473                "SAMLResponse": b64encode(
474                    load_fixture("fixtures/response_signed_encrypted.xml").encode()
475                ).decode()
476            },
477        )
478
479        parser = ResponseProcessor(self.source, request)
480        parser.parse()

Test signed & encrypted response

@freeze_time('2026-01-21T14:23')
def test_transient(self):
482    @freeze_time("2026-01-21T14:23")
483    def test_transient(self):
484        """Test SAML transient NameID"""
485        verification_key = load_fixture("fixtures/signature_cert2.pem")
486        vkp = CertificateKeyPair.objects.create(
487            name=generate_id(),
488            certificate_data=verification_key,
489        )
490        self.source.verification_kp = vkp
491        self.source.signed_response = True
492        self.source.signed_assertion = False
493        request = self.factory.post(
494            "/",
495            data={
496                "SAMLResponse": b64encode(
497                    load_fixture("fixtures/response_transient.xml").encode()
498                ).decode()
499            },
500        )
501
502        parser = ResponseProcessor(self.source, request)
503        parser.parse()
504        parser.prepare_flow_manager()

Test SAML transient NameID