authentik.sources.saml.tests.test_response
SAML Source tests
1"""SAML Source tests""" 2 3from base64 import b64encode 4 5from django.test import TestCase 6 7from authentik.core.tests.utils import RequestFactory, create_test_cert, create_test_flow 8from authentik.crypto.models import CertificateKeyPair 9from authentik.lib.generators import generate_id 10from authentik.lib.tests.utils import load_fixture 11from authentik.sources.saml.exceptions import InvalidEncryption, InvalidSignature 12from authentik.sources.saml.models import SAMLSource 13from authentik.sources.saml.processors.response import ResponseProcessor 14 15 16class TestResponseProcessor(TestCase): 17 """Test ResponseProcessor""" 18 19 def setUp(self): 20 self.factory = RequestFactory() 21 self.source = SAMLSource.objects.create( 22 name=generate_id(), 23 slug=generate_id(), 24 issuer="authentik", 25 allow_idp_initiated=True, 26 pre_authentication_flow=create_test_flow(), 27 ) 28 29 def test_status_error(self): 30 """Test error status""" 31 request = self.factory.post( 32 "/", 33 data={ 34 "SAMLResponse": b64encode( 35 load_fixture("fixtures/response_error.xml").encode() 36 ).decode() 37 }, 38 ) 39 40 with self.assertRaisesMessage( 41 ValueError, 42 ( 43 "Invalid request, ACS Url in request http://localhost:9000/source/saml/google/acs/ " 44 "doesn't match configured ACS Url https://127.0.0.1:9443/source/saml/google/acs/." 45 ), 46 ): 47 ResponseProcessor(self.source, request).parse() 48 49 def test_success(self): 50 """Test success""" 51 request = self.factory.post( 52 "/", 53 data={ 54 "SAMLResponse": b64encode( 55 load_fixture("fixtures/response_success.xml").encode() 56 ).decode() 57 }, 58 ) 59 60 parser = ResponseProcessor(self.source, request) 61 parser.parse() 62 sfm = parser.prepare_flow_manager() 63 self.assertEqual( 64 sfm.user_properties, 65 { 66 "email": "foo@bar.baz", 67 "name": "foo", 68 "sn": "bar", 69 "username": "jens@goauthentik.io", 70 "attributes": {}, 71 "path": self.source.get_user_path(), 72 }, 73 ) 74 75 def test_success_with_status_message_and_detail(self): 76 """Test success with StatusMessage and StatusDetail present (should not raise error)""" 77 request = self.factory.post( 78 "/", 79 data={ 80 "SAMLResponse": b64encode( 81 load_fixture("fixtures/response_success_with_message.xml").encode() 82 ).decode() 83 }, 84 ) 85 86 parser = ResponseProcessor(self.source, request) 87 parser.parse() 88 sfm = parser.prepare_flow_manager() 89 self.assertEqual(sfm.user_properties["username"], "jens@goauthentik.io") 90 91 def test_error_with_message_and_detail(self): 92 """Test error status with StatusMessage and StatusDetail includes both in error""" 93 request = self.factory.post( 94 "/", 95 data={ 96 "SAMLResponse": b64encode( 97 load_fixture("fixtures/response_error_with_detail.xml").encode() 98 ).decode() 99 }, 100 ) 101 102 with self.assertRaises(ValueError) as ctx: 103 ResponseProcessor(self.source, request).parse() 104 # Should contain both detail and message 105 self.assertIn("User account is disabled", str(ctx.exception)) 106 self.assertIn("Authentication failed", str(ctx.exception)) 107 108 def test_encrypted_correct(self): 109 """Test encrypted""" 110 key = load_fixture("fixtures/encrypted-key.pem") 111 kp = CertificateKeyPair.objects.create( 112 name=generate_id(), 113 key_data=key, 114 ) 115 self.source.encryption_kp = kp 116 request = self.factory.post( 117 "/", 118 data={ 119 "SAMLResponse": b64encode( 120 load_fixture("fixtures/response_encrypted.xml").encode() 121 ).decode() 122 }, 123 ) 124 125 parser = ResponseProcessor(self.source, request) 126 parser.parse() 127 128 def test_encrypted_incorrect_key(self): 129 """Test encrypted""" 130 kp = create_test_cert() 131 self.source.encryption_kp = kp 132 request = self.factory.post( 133 "/", 134 data={ 135 "SAMLResponse": b64encode( 136 load_fixture("fixtures/response_encrypted.xml").encode() 137 ).decode() 138 }, 139 ) 140 141 parser = ResponseProcessor(self.source, request) 142 with self.assertRaises(InvalidEncryption): 143 parser.parse() 144 145 def test_verification_assertion(self): 146 """Test verifying signature inside assertion""" 147 key = load_fixture("fixtures/signature_cert.pem") 148 kp = CertificateKeyPair.objects.create( 149 name=generate_id(), 150 certificate_data=key, 151 ) 152 self.source.verification_kp = kp 153 self.source.signed_assertion = True 154 self.source.signed_response = False 155 request = self.factory.post( 156 "/", 157 data={ 158 "SAMLResponse": b64encode( 159 load_fixture("fixtures/response_signed_assertion.xml").encode() 160 ).decode() 161 }, 162 ) 163 164 parser = ResponseProcessor(self.source, request) 165 parser.parse() 166 167 def test_verification_assertion_duplicate(self): 168 """Test verifying signature inside assertion, where the response has another assertion 169 before our signed assertion""" 170 key = load_fixture("fixtures/signature_cert.pem") 171 kp = CertificateKeyPair.objects.create( 172 name=generate_id(), 173 certificate_data=key, 174 ) 175 self.source.verification_kp = kp 176 self.source.signed_assertion = True 177 self.source.signed_response = False 178 request = self.factory.post( 179 "/", 180 data={ 181 "SAMLResponse": b64encode( 182 load_fixture("fixtures/response_signed_assertion_dup.xml").encode() 183 ).decode() 184 }, 185 ) 186 187 parser = ResponseProcessor(self.source, request) 188 parser.parse() 189 self.assertNotEqual(parser._get_name_id().text, "bad") 190 self.assertEqual(parser._get_name_id().text, "_ce3d2948b4cf20146dee0a0b3dd6f69b6cf86f62d7") 191 192 def test_verification_response(self): 193 """Test verifying signature inside response""" 194 key = load_fixture("fixtures/signature_cert.pem") 195 kp = CertificateKeyPair.objects.create( 196 name=generate_id(), 197 certificate_data=key, 198 ) 199 self.source.verification_kp = kp 200 self.source.signed_response = True 201 self.source.signed_assertion = False 202 request = self.factory.post( 203 "/", 204 data={ 205 "SAMLResponse": b64encode( 206 load_fixture("fixtures/response_signed_response.xml").encode() 207 ).decode() 208 }, 209 ) 210 211 parser = ResponseProcessor(self.source, request) 212 parser.parse() 213 214 def test_verification_response_and_assertion(self): 215 """Test verifying signature inside response and assertion""" 216 key = load_fixture("fixtures/signature_cert.pem") 217 kp = CertificateKeyPair.objects.create( 218 name=generate_id(), 219 certificate_data=key, 220 ) 221 self.source.verification_kp = kp 222 self.source.signed_assertion = True 223 self.source.signed_response = True 224 request = self.factory.post( 225 "/", 226 data={ 227 "SAMLResponse": b64encode( 228 load_fixture("fixtures/response_signed_response_and_assertion.xml").encode() 229 ).decode() 230 }, 231 ) 232 233 parser = ResponseProcessor(self.source, request) 234 parser.parse() 235 236 def test_verification_wrong_signature(self): 237 """Test invalid signature fails""" 238 key = load_fixture("fixtures/signature_cert.pem") 239 kp = CertificateKeyPair.objects.create( 240 name=generate_id(), 241 certificate_data=key, 242 ) 243 self.source.verification_kp = kp 244 self.source.signed_assertion = True 245 request = self.factory.post( 246 "/", 247 data={ 248 "SAMLResponse": b64encode( 249 # Same as response_signed_assertion.xml but the role name is altered 250 load_fixture("fixtures/response_signed_error.xml").encode() 251 ).decode() 252 }, 253 ) 254 255 parser = ResponseProcessor(self.source, request) 256 257 with self.assertRaisesMessage(InvalidSignature, ""): 258 parser.parse() 259 260 def test_verification_no_signature(self): 261 """Test rejecting response without signature when signed_assertion is True""" 262 key = load_fixture("fixtures/signature_cert.pem") 263 kp = CertificateKeyPair.objects.create( 264 name=generate_id(), 265 certificate_data=key, 266 ) 267 self.source.verification_kp = kp 268 self.source.signed_assertion = True 269 request = self.factory.post( 270 "/", 271 data={ 272 "SAMLResponse": b64encode( 273 load_fixture("fixtures/response_success.xml").encode() 274 ).decode() 275 }, 276 ) 277 278 parser = ResponseProcessor(self.source, request) 279 280 with self.assertRaisesMessage(InvalidSignature, ""): 281 parser.parse() 282 283 def test_verification_incorrect_response(self): 284 """Test verifying signature inside response""" 285 key = load_fixture("fixtures/signature_cert.pem") 286 kp = CertificateKeyPair.objects.create( 287 name=generate_id(), 288 certificate_data=key, 289 ) 290 self.source.verification_kp = kp 291 self.source.signed_response = True 292 self.source.signed_assertion = False 293 request = self.factory.post( 294 "/", 295 data={ 296 "SAMLResponse": b64encode( 297 load_fixture("fixtures/response_incorrect_signed_response.xml").encode() 298 ).decode() 299 }, 300 ) 301 302 parser = ResponseProcessor(self.source, request) 303 with self.assertRaisesMessage(InvalidSignature, ""): 304 parser.parse() 305 306 def test_signed_encrypted_response(self): 307 """Test signed & encrypted response""" 308 verification_key = load_fixture("fixtures/signature_cert2.pem") 309 vkp = CertificateKeyPair.objects.create( 310 name=generate_id(), 311 certificate_data=verification_key, 312 ) 313 314 encrypted_key = load_fixture("fixtures/encrypted-key2.pem") 315 ekp = CertificateKeyPair.objects.create(name=generate_id(), key_data=encrypted_key) 316 317 self.source.verification_kp = vkp 318 self.source.encryption_kp = ekp 319 self.source.signed_response = True 320 self.source.signed_assertion = False 321 request = self.factory.post( 322 "/", 323 data={ 324 "SAMLResponse": b64encode( 325 load_fixture("fixtures/response_signed_encrypted.xml").encode() 326 ).decode() 327 }, 328 ) 329 330 parser = ResponseProcessor(self.source, request) 331 parser.parse() 332 333 def test_transient(self): 334 """Test SAML transient NameID""" 335 verification_key = load_fixture("fixtures/signature_cert2.pem") 336 vkp = CertificateKeyPair.objects.create( 337 name=generate_id(), 338 certificate_data=verification_key, 339 ) 340 self.source.verification_kp = vkp 341 self.source.signed_response = True 342 self.source.signed_assertion = False 343 request = self.factory.post( 344 "/", 345 data={ 346 "SAMLResponse": b64encode( 347 load_fixture("fixtures/response_transient.xml").encode() 348 ).decode() 349 }, 350 ) 351 352 parser = ResponseProcessor(self.source, request) 353 parser.parse() 354 parser.prepare_flow_manager()
17class TestResponseProcessor(TestCase): 18 """Test ResponseProcessor""" 19 20 def setUp(self): 21 self.factory = RequestFactory() 22 self.source = SAMLSource.objects.create( 23 name=generate_id(), 24 slug=generate_id(), 25 issuer="authentik", 26 allow_idp_initiated=True, 27 pre_authentication_flow=create_test_flow(), 28 ) 29 30 def test_status_error(self): 31 """Test error status""" 32 request = self.factory.post( 33 "/", 34 data={ 35 "SAMLResponse": b64encode( 36 load_fixture("fixtures/response_error.xml").encode() 37 ).decode() 38 }, 39 ) 40 41 with self.assertRaisesMessage( 42 ValueError, 43 ( 44 "Invalid request, ACS Url in request http://localhost:9000/source/saml/google/acs/ " 45 "doesn't match configured ACS Url https://127.0.0.1:9443/source/saml/google/acs/." 46 ), 47 ): 48 ResponseProcessor(self.source, request).parse() 49 50 def test_success(self): 51 """Test success""" 52 request = self.factory.post( 53 "/", 54 data={ 55 "SAMLResponse": b64encode( 56 load_fixture("fixtures/response_success.xml").encode() 57 ).decode() 58 }, 59 ) 60 61 parser = ResponseProcessor(self.source, request) 62 parser.parse() 63 sfm = parser.prepare_flow_manager() 64 self.assertEqual( 65 sfm.user_properties, 66 { 67 "email": "foo@bar.baz", 68 "name": "foo", 69 "sn": "bar", 70 "username": "jens@goauthentik.io", 71 "attributes": {}, 72 "path": self.source.get_user_path(), 73 }, 74 ) 75 76 def test_success_with_status_message_and_detail(self): 77 """Test success with StatusMessage and StatusDetail present (should not raise error)""" 78 request = self.factory.post( 79 "/", 80 data={ 81 "SAMLResponse": b64encode( 82 load_fixture("fixtures/response_success_with_message.xml").encode() 83 ).decode() 84 }, 85 ) 86 87 parser = ResponseProcessor(self.source, request) 88 parser.parse() 89 sfm = parser.prepare_flow_manager() 90 self.assertEqual(sfm.user_properties["username"], "jens@goauthentik.io") 91 92 def test_error_with_message_and_detail(self): 93 """Test error status with StatusMessage and StatusDetail includes both in error""" 94 request = self.factory.post( 95 "/", 96 data={ 97 "SAMLResponse": b64encode( 98 load_fixture("fixtures/response_error_with_detail.xml").encode() 99 ).decode() 100 }, 101 ) 102 103 with self.assertRaises(ValueError) as ctx: 104 ResponseProcessor(self.source, request).parse() 105 # Should contain both detail and message 106 self.assertIn("User account is disabled", str(ctx.exception)) 107 self.assertIn("Authentication failed", str(ctx.exception)) 108 109 def test_encrypted_correct(self): 110 """Test encrypted""" 111 key = load_fixture("fixtures/encrypted-key.pem") 112 kp = CertificateKeyPair.objects.create( 113 name=generate_id(), 114 key_data=key, 115 ) 116 self.source.encryption_kp = kp 117 request = self.factory.post( 118 "/", 119 data={ 120 "SAMLResponse": b64encode( 121 load_fixture("fixtures/response_encrypted.xml").encode() 122 ).decode() 123 }, 124 ) 125 126 parser = ResponseProcessor(self.source, request) 127 parser.parse() 128 129 def test_encrypted_incorrect_key(self): 130 """Test encrypted""" 131 kp = create_test_cert() 132 self.source.encryption_kp = kp 133 request = self.factory.post( 134 "/", 135 data={ 136 "SAMLResponse": b64encode( 137 load_fixture("fixtures/response_encrypted.xml").encode() 138 ).decode() 139 }, 140 ) 141 142 parser = ResponseProcessor(self.source, request) 143 with self.assertRaises(InvalidEncryption): 144 parser.parse() 145 146 def test_verification_assertion(self): 147 """Test verifying signature inside assertion""" 148 key = load_fixture("fixtures/signature_cert.pem") 149 kp = CertificateKeyPair.objects.create( 150 name=generate_id(), 151 certificate_data=key, 152 ) 153 self.source.verification_kp = kp 154 self.source.signed_assertion = True 155 self.source.signed_response = False 156 request = self.factory.post( 157 "/", 158 data={ 159 "SAMLResponse": b64encode( 160 load_fixture("fixtures/response_signed_assertion.xml").encode() 161 ).decode() 162 }, 163 ) 164 165 parser = ResponseProcessor(self.source, request) 166 parser.parse() 167 168 def test_verification_assertion_duplicate(self): 169 """Test verifying signature inside assertion, where the response has another assertion 170 before our signed assertion""" 171 key = load_fixture("fixtures/signature_cert.pem") 172 kp = CertificateKeyPair.objects.create( 173 name=generate_id(), 174 certificate_data=key, 175 ) 176 self.source.verification_kp = kp 177 self.source.signed_assertion = True 178 self.source.signed_response = False 179 request = self.factory.post( 180 "/", 181 data={ 182 "SAMLResponse": b64encode( 183 load_fixture("fixtures/response_signed_assertion_dup.xml").encode() 184 ).decode() 185 }, 186 ) 187 188 parser = ResponseProcessor(self.source, request) 189 parser.parse() 190 self.assertNotEqual(parser._get_name_id().text, "bad") 191 self.assertEqual(parser._get_name_id().text, "_ce3d2948b4cf20146dee0a0b3dd6f69b6cf86f62d7") 192 193 def test_verification_response(self): 194 """Test verifying signature inside response""" 195 key = load_fixture("fixtures/signature_cert.pem") 196 kp = CertificateKeyPair.objects.create( 197 name=generate_id(), 198 certificate_data=key, 199 ) 200 self.source.verification_kp = kp 201 self.source.signed_response = True 202 self.source.signed_assertion = False 203 request = self.factory.post( 204 "/", 205 data={ 206 "SAMLResponse": b64encode( 207 load_fixture("fixtures/response_signed_response.xml").encode() 208 ).decode() 209 }, 210 ) 211 212 parser = ResponseProcessor(self.source, request) 213 parser.parse() 214 215 def test_verification_response_and_assertion(self): 216 """Test verifying signature inside response and assertion""" 217 key = load_fixture("fixtures/signature_cert.pem") 218 kp = CertificateKeyPair.objects.create( 219 name=generate_id(), 220 certificate_data=key, 221 ) 222 self.source.verification_kp = kp 223 self.source.signed_assertion = True 224 self.source.signed_response = True 225 request = self.factory.post( 226 "/", 227 data={ 228 "SAMLResponse": b64encode( 229 load_fixture("fixtures/response_signed_response_and_assertion.xml").encode() 230 ).decode() 231 }, 232 ) 233 234 parser = ResponseProcessor(self.source, request) 235 parser.parse() 236 237 def test_verification_wrong_signature(self): 238 """Test invalid signature fails""" 239 key = load_fixture("fixtures/signature_cert.pem") 240 kp = CertificateKeyPair.objects.create( 241 name=generate_id(), 242 certificate_data=key, 243 ) 244 self.source.verification_kp = kp 245 self.source.signed_assertion = True 246 request = self.factory.post( 247 "/", 248 data={ 249 "SAMLResponse": b64encode( 250 # Same as response_signed_assertion.xml but the role name is altered 251 load_fixture("fixtures/response_signed_error.xml").encode() 252 ).decode() 253 }, 254 ) 255 256 parser = ResponseProcessor(self.source, request) 257 258 with self.assertRaisesMessage(InvalidSignature, ""): 259 parser.parse() 260 261 def test_verification_no_signature(self): 262 """Test rejecting response without signature when signed_assertion is True""" 263 key = load_fixture("fixtures/signature_cert.pem") 264 kp = CertificateKeyPair.objects.create( 265 name=generate_id(), 266 certificate_data=key, 267 ) 268 self.source.verification_kp = kp 269 self.source.signed_assertion = True 270 request = self.factory.post( 271 "/", 272 data={ 273 "SAMLResponse": b64encode( 274 load_fixture("fixtures/response_success.xml").encode() 275 ).decode() 276 }, 277 ) 278 279 parser = ResponseProcessor(self.source, request) 280 281 with self.assertRaisesMessage(InvalidSignature, ""): 282 parser.parse() 283 284 def test_verification_incorrect_response(self): 285 """Test verifying signature inside response""" 286 key = load_fixture("fixtures/signature_cert.pem") 287 kp = CertificateKeyPair.objects.create( 288 name=generate_id(), 289 certificate_data=key, 290 ) 291 self.source.verification_kp = kp 292 self.source.signed_response = True 293 self.source.signed_assertion = False 294 request = self.factory.post( 295 "/", 296 data={ 297 "SAMLResponse": b64encode( 298 load_fixture("fixtures/response_incorrect_signed_response.xml").encode() 299 ).decode() 300 }, 301 ) 302 303 parser = ResponseProcessor(self.source, request) 304 with self.assertRaisesMessage(InvalidSignature, ""): 305 parser.parse() 306 307 def test_signed_encrypted_response(self): 308 """Test signed & encrypted response""" 309 verification_key = load_fixture("fixtures/signature_cert2.pem") 310 vkp = CertificateKeyPair.objects.create( 311 name=generate_id(), 312 certificate_data=verification_key, 313 ) 314 315 encrypted_key = load_fixture("fixtures/encrypted-key2.pem") 316 ekp = CertificateKeyPair.objects.create(name=generate_id(), key_data=encrypted_key) 317 318 self.source.verification_kp = vkp 319 self.source.encryption_kp = ekp 320 self.source.signed_response = True 321 self.source.signed_assertion = False 322 request = self.factory.post( 323 "/", 324 data={ 325 "SAMLResponse": b64encode( 326 load_fixture("fixtures/response_signed_encrypted.xml").encode() 327 ).decode() 328 }, 329 ) 330 331 parser = ResponseProcessor(self.source, request) 332 parser.parse() 333 334 def test_transient(self): 335 """Test SAML transient NameID""" 336 verification_key = load_fixture("fixtures/signature_cert2.pem") 337 vkp = CertificateKeyPair.objects.create( 338 name=generate_id(), 339 certificate_data=verification_key, 340 ) 341 self.source.verification_kp = vkp 342 self.source.signed_response = True 343 self.source.signed_assertion = False 344 request = self.factory.post( 345 "/", 346 data={ 347 "SAMLResponse": b64encode( 348 load_fixture("fixtures/response_transient.xml").encode() 349 ).decode() 350 }, 351 ) 352 353 parser = ResponseProcessor(self.source, request) 354 parser.parse() 355 parser.prepare_flow_manager()
Test ResponseProcessor
20 def setUp(self): 21 self.factory = RequestFactory() 22 self.source = SAMLSource.objects.create( 23 name=generate_id(), 24 slug=generate_id(), 25 issuer="authentik", 26 allow_idp_initiated=True, 27 pre_authentication_flow=create_test_flow(), 28 )
Hook method for setting up the test fixture before exercising it.
30 def test_status_error(self): 31 """Test error status""" 32 request = self.factory.post( 33 "/", 34 data={ 35 "SAMLResponse": b64encode( 36 load_fixture("fixtures/response_error.xml").encode() 37 ).decode() 38 }, 39 ) 40 41 with self.assertRaisesMessage( 42 ValueError, 43 ( 44 "Invalid request, ACS Url in request http://localhost:9000/source/saml/google/acs/ " 45 "doesn't match configured ACS Url https://127.0.0.1:9443/source/saml/google/acs/." 46 ), 47 ): 48 ResponseProcessor(self.source, request).parse()
Test error status
50 def test_success(self): 51 """Test success""" 52 request = self.factory.post( 53 "/", 54 data={ 55 "SAMLResponse": b64encode( 56 load_fixture("fixtures/response_success.xml").encode() 57 ).decode() 58 }, 59 ) 60 61 parser = ResponseProcessor(self.source, request) 62 parser.parse() 63 sfm = parser.prepare_flow_manager() 64 self.assertEqual( 65 sfm.user_properties, 66 { 67 "email": "foo@bar.baz", 68 "name": "foo", 69 "sn": "bar", 70 "username": "jens@goauthentik.io", 71 "attributes": {}, 72 "path": self.source.get_user_path(), 73 }, 74 )
Test success
76 def test_success_with_status_message_and_detail(self): 77 """Test success with StatusMessage and StatusDetail present (should not raise error)""" 78 request = self.factory.post( 79 "/", 80 data={ 81 "SAMLResponse": b64encode( 82 load_fixture("fixtures/response_success_with_message.xml").encode() 83 ).decode() 84 }, 85 ) 86 87 parser = ResponseProcessor(self.source, request) 88 parser.parse() 89 sfm = parser.prepare_flow_manager() 90 self.assertEqual(sfm.user_properties["username"], "jens@goauthentik.io")
Test success with StatusMessage and StatusDetail present (should not raise error)
92 def test_error_with_message_and_detail(self): 93 """Test error status with StatusMessage and StatusDetail includes both in error""" 94 request = self.factory.post( 95 "/", 96 data={ 97 "SAMLResponse": b64encode( 98 load_fixture("fixtures/response_error_with_detail.xml").encode() 99 ).decode() 100 }, 101 ) 102 103 with self.assertRaises(ValueError) as ctx: 104 ResponseProcessor(self.source, request).parse() 105 # Should contain both detail and message 106 self.assertIn("User account is disabled", str(ctx.exception)) 107 self.assertIn("Authentication failed", str(ctx.exception))
Test error status with StatusMessage and StatusDetail includes both in error
109 def test_encrypted_correct(self): 110 """Test encrypted""" 111 key = load_fixture("fixtures/encrypted-key.pem") 112 kp = CertificateKeyPair.objects.create( 113 name=generate_id(), 114 key_data=key, 115 ) 116 self.source.encryption_kp = kp 117 request = self.factory.post( 118 "/", 119 data={ 120 "SAMLResponse": b64encode( 121 load_fixture("fixtures/response_encrypted.xml").encode() 122 ).decode() 123 }, 124 ) 125 126 parser = ResponseProcessor(self.source, request) 127 parser.parse()
Test encrypted
129 def test_encrypted_incorrect_key(self): 130 """Test encrypted""" 131 kp = create_test_cert() 132 self.source.encryption_kp = kp 133 request = self.factory.post( 134 "/", 135 data={ 136 "SAMLResponse": b64encode( 137 load_fixture("fixtures/response_encrypted.xml").encode() 138 ).decode() 139 }, 140 ) 141 142 parser = ResponseProcessor(self.source, request) 143 with self.assertRaises(InvalidEncryption): 144 parser.parse()
Test encrypted
146 def test_verification_assertion(self): 147 """Test verifying signature inside assertion""" 148 key = load_fixture("fixtures/signature_cert.pem") 149 kp = CertificateKeyPair.objects.create( 150 name=generate_id(), 151 certificate_data=key, 152 ) 153 self.source.verification_kp = kp 154 self.source.signed_assertion = True 155 self.source.signed_response = False 156 request = self.factory.post( 157 "/", 158 data={ 159 "SAMLResponse": b64encode( 160 load_fixture("fixtures/response_signed_assertion.xml").encode() 161 ).decode() 162 }, 163 ) 164 165 parser = ResponseProcessor(self.source, request) 166 parser.parse()
Test verifying signature inside assertion
168 def test_verification_assertion_duplicate(self): 169 """Test verifying signature inside assertion, where the response has another assertion 170 before our signed assertion""" 171 key = load_fixture("fixtures/signature_cert.pem") 172 kp = CertificateKeyPair.objects.create( 173 name=generate_id(), 174 certificate_data=key, 175 ) 176 self.source.verification_kp = kp 177 self.source.signed_assertion = True 178 self.source.signed_response = False 179 request = self.factory.post( 180 "/", 181 data={ 182 "SAMLResponse": b64encode( 183 load_fixture("fixtures/response_signed_assertion_dup.xml").encode() 184 ).decode() 185 }, 186 ) 187 188 parser = ResponseProcessor(self.source, request) 189 parser.parse() 190 self.assertNotEqual(parser._get_name_id().text, "bad") 191 self.assertEqual(parser._get_name_id().text, "_ce3d2948b4cf20146dee0a0b3dd6f69b6cf86f62d7")
Test verifying signature inside assertion, where the response has another assertion before our signed assertion
193 def test_verification_response(self): 194 """Test verifying signature inside response""" 195 key = load_fixture("fixtures/signature_cert.pem") 196 kp = CertificateKeyPair.objects.create( 197 name=generate_id(), 198 certificate_data=key, 199 ) 200 self.source.verification_kp = kp 201 self.source.signed_response = True 202 self.source.signed_assertion = False 203 request = self.factory.post( 204 "/", 205 data={ 206 "SAMLResponse": b64encode( 207 load_fixture("fixtures/response_signed_response.xml").encode() 208 ).decode() 209 }, 210 ) 211 212 parser = ResponseProcessor(self.source, request) 213 parser.parse()
Test verifying signature inside response
215 def test_verification_response_and_assertion(self): 216 """Test verifying signature inside response and assertion""" 217 key = load_fixture("fixtures/signature_cert.pem") 218 kp = CertificateKeyPair.objects.create( 219 name=generate_id(), 220 certificate_data=key, 221 ) 222 self.source.verification_kp = kp 223 self.source.signed_assertion = True 224 self.source.signed_response = True 225 request = self.factory.post( 226 "/", 227 data={ 228 "SAMLResponse": b64encode( 229 load_fixture("fixtures/response_signed_response_and_assertion.xml").encode() 230 ).decode() 231 }, 232 ) 233 234 parser = ResponseProcessor(self.source, request) 235 parser.parse()
Test verifying signature inside response and assertion
237 def test_verification_wrong_signature(self): 238 """Test invalid signature fails""" 239 key = load_fixture("fixtures/signature_cert.pem") 240 kp = CertificateKeyPair.objects.create( 241 name=generate_id(), 242 certificate_data=key, 243 ) 244 self.source.verification_kp = kp 245 self.source.signed_assertion = True 246 request = self.factory.post( 247 "/", 248 data={ 249 "SAMLResponse": b64encode( 250 # Same as response_signed_assertion.xml but the role name is altered 251 load_fixture("fixtures/response_signed_error.xml").encode() 252 ).decode() 253 }, 254 ) 255 256 parser = ResponseProcessor(self.source, request) 257 258 with self.assertRaisesMessage(InvalidSignature, ""): 259 parser.parse()
Test invalid signature fails
261 def test_verification_no_signature(self): 262 """Test rejecting response without signature when signed_assertion is True""" 263 key = load_fixture("fixtures/signature_cert.pem") 264 kp = CertificateKeyPair.objects.create( 265 name=generate_id(), 266 certificate_data=key, 267 ) 268 self.source.verification_kp = kp 269 self.source.signed_assertion = True 270 request = self.factory.post( 271 "/", 272 data={ 273 "SAMLResponse": b64encode( 274 load_fixture("fixtures/response_success.xml").encode() 275 ).decode() 276 }, 277 ) 278 279 parser = ResponseProcessor(self.source, request) 280 281 with self.assertRaisesMessage(InvalidSignature, ""): 282 parser.parse()
Test rejecting response without signature when signed_assertion is True
284 def test_verification_incorrect_response(self): 285 """Test verifying signature inside response""" 286 key = load_fixture("fixtures/signature_cert.pem") 287 kp = CertificateKeyPair.objects.create( 288 name=generate_id(), 289 certificate_data=key, 290 ) 291 self.source.verification_kp = kp 292 self.source.signed_response = True 293 self.source.signed_assertion = False 294 request = self.factory.post( 295 "/", 296 data={ 297 "SAMLResponse": b64encode( 298 load_fixture("fixtures/response_incorrect_signed_response.xml").encode() 299 ).decode() 300 }, 301 ) 302 303 parser = ResponseProcessor(self.source, request) 304 with self.assertRaisesMessage(InvalidSignature, ""): 305 parser.parse()
Test verifying signature inside response
307 def test_signed_encrypted_response(self): 308 """Test signed & encrypted response""" 309 verification_key = load_fixture("fixtures/signature_cert2.pem") 310 vkp = CertificateKeyPair.objects.create( 311 name=generate_id(), 312 certificate_data=verification_key, 313 ) 314 315 encrypted_key = load_fixture("fixtures/encrypted-key2.pem") 316 ekp = CertificateKeyPair.objects.create(name=generate_id(), key_data=encrypted_key) 317 318 self.source.verification_kp = vkp 319 self.source.encryption_kp = ekp 320 self.source.signed_response = True 321 self.source.signed_assertion = False 322 request = self.factory.post( 323 "/", 324 data={ 325 "SAMLResponse": b64encode( 326 load_fixture("fixtures/response_signed_encrypted.xml").encode() 327 ).decode() 328 }, 329 ) 330 331 parser = ResponseProcessor(self.source, request) 332 parser.parse()
Test signed & encrypted response
334 def test_transient(self): 335 """Test SAML transient NameID""" 336 verification_key = load_fixture("fixtures/signature_cert2.pem") 337 vkp = CertificateKeyPair.objects.create( 338 name=generate_id(), 339 certificate_data=verification_key, 340 ) 341 self.source.verification_kp = vkp 342 self.source.signed_response = True 343 self.source.signed_assertion = False 344 request = self.factory.post( 345 "/", 346 data={ 347 "SAMLResponse": b64encode( 348 load_fixture("fixtures/response_transient.xml").encode() 349 ).decode() 350 }, 351 ) 352 353 parser = ResponseProcessor(self.source, request) 354 parser.parse() 355 parser.prepare_flow_manager()
Test SAML transient NameID