authentik.crypto.tests
Crypto tests
1"""Crypto tests""" 2 3from json import loads 4from os import makedirs 5from pathlib import Path 6from tempfile import TemporaryDirectory 7 8from cryptography.x509.extensions import SubjectAlternativeName 9from cryptography.x509.general_name import DNSName 10from django.urls import reverse 11from django.utils.timezone import now 12from rest_framework.test import APITestCase 13 14from authentik.core.api.used_by import DeleteAction 15from authentik.core.tests.utils import ( 16 create_test_admin_user, 17 create_test_cert, 18 create_test_flow, 19 create_test_user, 20) 21from authentik.crypto.api import CertificateKeyPairSerializer 22from authentik.crypto.builder import CertificateBuilder 23from authentik.crypto.models import CertificateKeyPair, generate_key_id, generate_key_id_legacy 24from authentik.crypto.tasks import MANAGED_DISCOVERED, certificate_discovery 25from authentik.lib.config import CONFIG 26from authentik.lib.generators import generate_id, generate_key 27from authentik.providers.oauth2.models import OAuth2Provider, RedirectURI, RedirectURIMatchingMode 28 29 30class TestCrypto(APITestCase): 31 """Test Crypto validation""" 32 33 def test_model_private(self): 34 """Test model private key""" 35 cert = CertificateKeyPair.objects.create( 36 name=generate_id(), 37 certificate_data="foo", 38 key_data="foo", 39 ) 40 self.assertIsNone(cert.private_key) 41 42 def test_serializer(self): 43 """Test API Validation""" 44 keypair = create_test_cert() 45 self.assertTrue( 46 CertificateKeyPairSerializer( 47 instance=keypair, 48 data={ 49 "name": keypair.name, 50 "certificate_data": keypair.certificate_data, 51 "key_data": keypair.key_data, 52 }, 53 ).is_valid() 54 ) 55 self.assertFalse( 56 CertificateKeyPairSerializer( 57 instance=keypair, 58 data={ 59 "name": keypair.name, 60 "certificate_data": "test", 61 "key_data": "test", 62 }, 63 ).is_valid() 64 ) 65 66 def test_builder(self): 67 """Test Builder""" 68 name = generate_id() 69 builder = CertificateBuilder(name) 70 with self.assertRaises(ValueError): 71 builder.save() 72 builder.build( 73 subject_alt_names=[], 74 validity_days=3, 75 ) 76 instance = builder.save() 77 _now = now() 78 self.assertEqual(instance.name, name) 79 self.assertEqual((instance.certificate.not_valid_after_utc - _now).days, 2) 80 81 def test_builder_api(self): 82 """Test Builder (via API)""" 83 self.client.force_login(create_test_admin_user()) 84 name = generate_id() 85 self.client.post( 86 reverse("authentik_api:certificatekeypair-generate"), 87 data={"common_name": name, "subject_alt_name": "bar,baz", "validity_days": 3}, 88 ) 89 key = CertificateKeyPair.objects.filter(name=name).first() 90 self.assertIsNotNone(key) 91 ext: SubjectAlternativeName = key.certificate.extensions[0].value 92 self.assertIsInstance(ext, SubjectAlternativeName) 93 self.assertIsInstance(ext[0], DNSName) 94 self.assertEqual(ext[0].value, "bar") 95 self.assertIsInstance(ext[1], DNSName) 96 self.assertEqual(ext[1].value, "baz") 97 98 def test_builder_api_duplicate(self): 99 """Test Builder (via API)""" 100 cert = create_test_cert() 101 self.client.force_login(create_test_admin_user()) 102 res = self.client.post( 103 reverse("authentik_api:certificatekeypair-generate"), 104 data={"common_name": cert.name, "subject_alt_name": "bar,baz", "validity_days": 3}, 105 ) 106 self.assertEqual(res.status_code, 400) 107 self.assertJSONEqual(res.content, {"common_name": ["This field must be unique."]}) 108 109 def test_builder_api_empty_san(self): 110 """Test Builder (via API)""" 111 self.client.force_login(create_test_admin_user()) 112 name = generate_id() 113 self.client.post( 114 reverse("authentik_api:certificatekeypair-generate"), 115 data={"common_name": name, "subject_alt_name": "", "validity_days": 3}, 116 ) 117 key = CertificateKeyPair.objects.filter(name=name).first() 118 self.assertIsNotNone(key) 119 self.assertEqual(len(key.certificate.extensions), 0) 120 121 def test_builder_api_empty_san_multiple(self): 122 """Test Builder (via API)""" 123 self.client.force_login(create_test_admin_user()) 124 name = generate_id() 125 self.client.post( 126 reverse("authentik_api:certificatekeypair-generate"), 127 data={"common_name": name, "subject_alt_name": ", ", "validity_days": 3}, 128 ) 129 key = CertificateKeyPair.objects.filter(name=name).first() 130 self.assertIsNotNone(key) 131 self.assertEqual(len(key.certificate.extensions), 0) 132 133 def test_builder_api_invalid(self): 134 """Test Builder (via API) (invalid)""" 135 self.client.force_login(create_test_admin_user()) 136 response = self.client.post( 137 reverse("authentik_api:certificatekeypair-generate"), 138 data={}, 139 ) 140 self.assertEqual(response.status_code, 400) 141 142 def test_list(self): 143 """Test API List""" 144 cert = create_test_cert() 145 self.client.force_login(create_test_admin_user()) 146 response = self.client.get( 147 reverse( 148 "authentik_api:certificatekeypair-list", 149 ), 150 data={"name": cert.name}, 151 ) 152 self.assertEqual(response.status_code, 200) 153 body = loads(response.content.decode()) 154 api_cert = [x for x in body["results"] if x["name"] == cert.name][0] 155 self.assertEqual(api_cert["fingerprint_sha1"], cert.fingerprint_sha1) 156 self.assertEqual(api_cert["fingerprint_sha256"], cert.fingerprint_sha256) 157 158 def test_list_has_key_false(self): 159 """Test API List with has_key set to false""" 160 cert = create_test_cert() 161 cert.key_data = "" 162 cert.save() 163 self.client.force_login(create_test_admin_user()) 164 response = self.client.get( 165 reverse( 166 "authentik_api:certificatekeypair-list", 167 ), 168 data={"name": cert.name, "has_key": False}, 169 ) 170 self.assertEqual(response.status_code, 200) 171 body = loads(response.content.decode()) 172 api_cert = [x for x in body["results"] if x["name"] == cert.name][0] 173 self.assertEqual(api_cert["fingerprint_sha1"], cert.fingerprint_sha1) 174 self.assertEqual(api_cert["fingerprint_sha256"], cert.fingerprint_sha256) 175 176 def test_list_always_includes_details(self): 177 """Test API List always includes certificate details""" 178 cert = create_test_cert() 179 self.client.force_login(create_test_admin_user()) 180 response = self.client.get( 181 reverse( 182 "authentik_api:certificatekeypair-list", 183 ), 184 data={"name": cert.name}, 185 ) 186 self.assertEqual(response.status_code, 200) 187 body = loads(response.content.decode()) 188 api_cert = [x for x in body["results"] if x["name"] == cert.name][0] 189 # All details should now always be included 190 self.assertEqual(api_cert["fingerprint_sha1"], cert.fingerprint_sha1) 191 self.assertEqual(api_cert["fingerprint_sha256"], cert.fingerprint_sha256) 192 self.assertIsNotNone(api_cert["cert_expiry"]) 193 self.assertIsNotNone(api_cert["cert_subject"]) 194 195 def test_certificate_download(self): 196 """Test certificate export (download)""" 197 keypair = create_test_cert() 198 user = create_test_user() 199 user.assign_perms_to_managed_role("authentik_crypto.view_certificatekeypair", keypair) 200 user.assign_perms_to_managed_role( 201 "authentik_crypto.view_certificatekeypair_certificate", keypair 202 ) 203 self.client.force_login(user) 204 response = self.client.get( 205 reverse( 206 "authentik_api:certificatekeypair-view-certificate", 207 kwargs={"pk": keypair.pk}, 208 ) 209 ) 210 self.assertEqual(response.status_code, 200) 211 response = self.client.get( 212 reverse( 213 "authentik_api:certificatekeypair-view-certificate", 214 kwargs={"pk": keypair.pk}, 215 ), 216 data={"download": True}, 217 ) 218 self.assertEqual(response.status_code, 200) 219 self.assertIn("Content-Disposition", response) 220 221 def test_private_key_download(self): 222 """Test private_key export (download)""" 223 keypair = create_test_cert() 224 user = create_test_user() 225 user.assign_perms_to_managed_role("authentik_crypto.view_certificatekeypair", keypair) 226 user.assign_perms_to_managed_role("authentik_crypto.view_certificatekeypair_key", keypair) 227 self.client.force_login(user) 228 response = self.client.get( 229 reverse( 230 "authentik_api:certificatekeypair-view-private-key", 231 kwargs={"pk": keypair.pk}, 232 ) 233 ) 234 self.assertEqual(response.status_code, 200) 235 response = self.client.get( 236 reverse( 237 "authentik_api:certificatekeypair-view-private-key", 238 kwargs={"pk": keypair.pk}, 239 ), 240 data={"download": True}, 241 ) 242 self.assertEqual(response.status_code, 200) 243 self.assertIn("Content-Disposition", response) 244 245 def test_certificate_download_denied(self): 246 """Test certificate export (download)""" 247 self.client.force_login(create_test_user()) 248 keypair = create_test_cert() 249 response = self.client.get( 250 reverse( 251 "authentik_api:certificatekeypair-view-certificate", 252 kwargs={"pk": keypair.pk}, 253 ) 254 ) 255 self.assertEqual(403, response.status_code) 256 response = self.client.get( 257 reverse( 258 "authentik_api:certificatekeypair-view-certificate", 259 kwargs={"pk": keypair.pk}, 260 ), 261 data={"download": True}, 262 ) 263 self.assertEqual(403, response.status_code) 264 265 def test_private_key_download_denied(self): 266 """Test private_key export (download)""" 267 self.client.force_login(create_test_user()) 268 keypair = create_test_cert() 269 response = self.client.get( 270 reverse( 271 "authentik_api:certificatekeypair-view-private-key", 272 kwargs={"pk": keypair.pk}, 273 ) 274 ) 275 self.assertEqual(403, response.status_code) 276 response = self.client.get( 277 reverse( 278 "authentik_api:certificatekeypair-view-private-key", 279 kwargs={"pk": keypair.pk}, 280 ), 281 data={"download": True}, 282 ) 283 self.assertEqual(403, response.status_code) 284 285 def test_used_by(self): 286 """Test used_by endpoint""" 287 self.client.force_login(create_test_admin_user()) 288 keypair = create_test_cert() 289 provider = OAuth2Provider.objects.create( 290 name=generate_id(), 291 client_id=generate_id(), 292 client_secret=generate_key(), 293 authorization_flow=create_test_flow(), 294 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")], 295 signing_key=keypair, 296 ) 297 response = self.client.get( 298 reverse( 299 "authentik_api:certificatekeypair-used-by", 300 kwargs={"pk": keypair.pk}, 301 ) 302 ) 303 self.assertEqual(response.status_code, 200) 304 self.assertJSONEqual( 305 response.content.decode(), 306 [ 307 { 308 "app": "authentik_providers_oauth2", 309 "model_name": "oauth2provider", 310 "pk": str(provider.pk), 311 "name": str(provider), 312 "action": DeleteAction.SET_NULL.value, 313 } 314 ], 315 ) 316 317 def test_used_by_denied(self): 318 """Test used_by endpoint""" 319 self.client.logout() 320 keypair = create_test_cert() 321 OAuth2Provider.objects.create( 322 name=generate_id(), 323 client_id=generate_id(), 324 client_secret=generate_key(), 325 authorization_flow=create_test_flow(), 326 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")], 327 signing_key=keypair, 328 ) 329 response = self.client.get( 330 reverse( 331 "authentik_api:certificatekeypair-used-by", 332 kwargs={"pk": keypair.pk}, 333 ) 334 ) 335 self.assertEqual(403, response.status_code) 336 337 def test_discovery(self): 338 """Test certificate discovery""" 339 # This test generates 2 separate cert/key combinations 340 # and verifies they both import properly 341 name = generate_id() 342 builder = CertificateBuilder(name) 343 with self.assertRaises(ValueError): 344 builder.save() 345 builder.build( 346 subject_alt_names=[], 347 validity_days=3, 348 ) 349 350 name2 = generate_id() 351 builder2 = CertificateBuilder(name2) 352 with self.assertRaises(ValueError): 353 builder2.save() 354 builder2.build( 355 subject_alt_names=[], 356 validity_days=3, 357 ) 358 359 name3 = generate_id() 360 builder3 = CertificateBuilder(name3) 361 with self.assertRaises(ValueError): 362 builder3.save() 363 builder3.build( 364 subject_alt_names=[], 365 validity_days=3, 366 ) 367 368 with TemporaryDirectory() as temp_dir: 369 with open(f"{temp_dir}/foo.pem", "w+", encoding="utf-8") as _cert: 370 _cert.write(builder.certificate) 371 with open(f"{temp_dir}/foo.key", "w+", encoding="utf-8") as _key: 372 _key.write(builder.private_key) 373 makedirs(f"{temp_dir}/foo.bar", exist_ok=True) 374 with open(f"{temp_dir}/foo.bar/fullchain.pem", "w+", encoding="utf-8") as _cert: 375 _cert.write(builder2.certificate) 376 with open(f"{temp_dir}/foo.bar/privkey.pem", "w+", encoding="utf-8") as _key: 377 _key.write(builder2.private_key) 378 with open(f"{temp_dir}/tls-combined.pem", "w+", encoding="utf-8") as _cert: 379 _cert.write(builder3.certificate) 380 with CONFIG.patch("cert_discovery_dir", temp_dir): 381 certificate_discovery.send() 382 keypair: CertificateKeyPair = CertificateKeyPair.objects.filter( 383 managed=MANAGED_DISCOVERED % "foo" 384 ).first() 385 self.assertIsNotNone(keypair) 386 self.assertIsNotNone(keypair.certificate) 387 self.assertIsNotNone(keypair.private_key) 388 self.assertTrue( 389 CertificateKeyPair.objects.filter(managed=MANAGED_DISCOVERED % "foo.bar").exists() 390 ) 391 self.assertFalse( 392 CertificateKeyPair.objects.filter(managed=MANAGED_DISCOVERED % "tls-combined").exists() 393 ) 394 395 def test_discovery_updating_same_private_key(self): 396 """Test certificate discovery updating certs with matching private keys""" 397 name = generate_id() 398 builder = CertificateBuilder(name) 399 builder.build( 400 subject_alt_names=[], 401 validity_days=3, 402 ) 403 404 with TemporaryDirectory() as temp_dir: 405 # First discovery: write cert as "original" 406 with open(f"{temp_dir}/original.pem", "w+", encoding="utf-8") as _cert: 407 _cert.write(builder.certificate) 408 with open(f"{temp_dir}/original.key", "w+", encoding="utf-8") as _key: 409 _key.write(builder.private_key) 410 411 with CONFIG.patch("cert_discovery_dir", temp_dir): 412 certificate_discovery.send() 413 414 # Verify "original" cert was created 415 original = CertificateKeyPair.objects.filter( 416 managed=MANAGED_DISCOVERED % "original" 417 ).first() 418 self.assertIsNotNone(original) 419 self.assertEqual(original.name, "original") 420 self.assertIsNotNone(original.private_key) 421 422 # Second discovery: write same cert/key as "renamed" 423 Path(f"{temp_dir}/original.pem").unlink() 424 Path(f"{temp_dir}/original.key").unlink() 425 426 with open(f"{temp_dir}/renamed.pem", "w+", encoding="utf-8") as _cert: 427 _cert.write(builder.certificate) 428 with open(f"{temp_dir}/renamed.key", "w+", encoding="utf-8") as _key: 429 _key.write(builder.private_key) 430 431 with CONFIG.patch("cert_discovery_dir", temp_dir): 432 certificate_discovery.send() 433 434 # Verify the cert was updated 435 renamed = CertificateKeyPair.objects.filter( 436 managed=MANAGED_DISCOVERED % "renamed" 437 ).first() 438 self.assertIsNotNone(renamed, "Renamed certificate should exist") 439 self.assertEqual(renamed.name, "renamed") 440 self.assertEqual(renamed.pk, original.pk, "Should be same database object") 441 442 # Verify no new cert was created 443 final_count = CertificateKeyPair.objects.filter( 444 managed__startswith="goauthentik.io/crypto/discovered/" 445 ).count() 446 self.assertEqual( 447 1, final_count, "Should not create duplicate cert for same private key" 448 ) 449 450 def test_metadata_extraction_with_cert_and_key(self): 451 """Test that metadata is extracted when creating keypair with certificate and key""" 452 cert = create_test_cert() 453 454 # Verify all metadata fields are populated 455 self.assertIsNotNone(cert.key_type) 456 self.assertIsNotNone(cert.cert_expiry) 457 self.assertIsNotNone(cert.cert_subject) 458 self.assertIsNotNone(cert.fingerprint_sha256) 459 self.assertIsNotNone(cert.fingerprint_sha1) 460 461 # Verify kid is generated using SHA512 for new records 462 self.assertIsNotNone(cert.kid) 463 self.assertEqual(cert.kid, generate_key_id(cert.key_data)) 464 465 def test_metadata_extraction_without_key(self): 466 """Test that metadata is extracted when creating keypair without private key""" 467 builder = CertificateBuilder(generate_id()) 468 builder.build(subject_alt_names=[], validity_days=3) 469 470 # Create keypair with only certificate, no key 471 cert = CertificateKeyPair.objects.create( 472 name=generate_id(), 473 certificate_data=builder.certificate, 474 key_data="", 475 ) 476 477 # Verify certificate metadata fields are populated 478 self.assertIsNotNone(cert.key_type) 479 self.assertIsNotNone(cert.cert_expiry) 480 self.assertIsNotNone(cert.cert_subject) 481 self.assertIsNotNone(cert.fingerprint_sha256) 482 self.assertIsNotNone(cert.fingerprint_sha1) 483 484 # Verify kid is empty when no key_data 485 self.assertEqual(cert.kid, None) 486 487 def test_metadata_extraction_invalid_cert(self): 488 """Test that invalid certificate data doesn't crash, just skips metadata""" 489 cert = CertificateKeyPair.objects.create( 490 name=generate_id(), 491 certificate_data="invalid certificate data", 492 key_data="", 493 ) 494 495 # Verify metadata fields are None for invalid cert 496 self.assertIsNone(cert.key_type) 497 self.assertIsNone(cert.cert_expiry) 498 self.assertIsNone(cert.cert_subject) 499 self.assertIsNone(cert.fingerprint_sha256) 500 self.assertIsNone(cert.fingerprint_sha1) 501 self.assertIsNone(cert.kid) 502 503 def test_kid_legacy_preservation(self): 504 """Test that legacy MD5 kid is preserved when key_data hasn't changed""" 505 cert = create_test_cert() 506 507 # Simulate a legacy MD5 kid (as if backfilled from old system) 508 legacy_kid = generate_key_id_legacy(cert.key_data) 509 CertificateKeyPair.objects.filter(pk=cert.pk).update(kid=legacy_kid) 510 cert.refresh_from_db() 511 self.assertEqual(cert.kid, legacy_kid) 512 513 # Save the cert again (e.g., name change) - kid should be preserved 514 cert.name = generate_id() 515 cert.save() 516 cert.refresh_from_db() 517 518 self.assertEqual(cert.kid, legacy_kid) 519 520 def test_kid_regenerated_on_key_change(self): 521 """Test that kid is regenerated when key_data changes""" 522 cert = create_test_cert() 523 original_kid = cert.kid 524 525 # Generate a new key and update the keypair 526 builder = CertificateBuilder(generate_id()) 527 builder.build(subject_alt_names=[], validity_days=3) 528 529 cert.key_data = builder.private_key 530 cert.certificate_data = builder.certificate 531 cert.save() 532 cert.refresh_from_db() 533 534 # Kid should be regenerated for the new key 535 self.assertNotEqual(cert.kid, original_kid) 536 self.assertEqual(cert.kid, generate_key_id(cert.key_data)) 537 538 def test_kid_regenerated_on_key_change_from_legacy(self): 539 """Test that kid is regenerated from legacy MD5 when key_data changes""" 540 cert = create_test_cert() 541 542 # Simulate a legacy MD5 kid 543 legacy_kid = generate_key_id_legacy(cert.key_data) 544 CertificateKeyPair.objects.filter(pk=cert.pk).update(kid=legacy_kid) 545 cert.refresh_from_db() 546 self.assertEqual(cert.kid, legacy_kid) 547 548 # Generate a new key and update the keypair 549 builder = CertificateBuilder(generate_id()) 550 builder.build(subject_alt_names=[], validity_days=3) 551 552 cert.key_data = builder.private_key 553 cert.certificate_data = builder.certificate 554 cert.save() 555 cert.refresh_from_db() 556 557 # Kid should now be SHA512 for the new key 558 self.assertNotEqual(cert.kid, legacy_kid) 559 self.assertEqual(cert.kid, generate_key_id(cert.key_data))
31class TestCrypto(APITestCase): 32 """Test Crypto validation""" 33 34 def test_model_private(self): 35 """Test model private key""" 36 cert = CertificateKeyPair.objects.create( 37 name=generate_id(), 38 certificate_data="foo", 39 key_data="foo", 40 ) 41 self.assertIsNone(cert.private_key) 42 43 def test_serializer(self): 44 """Test API Validation""" 45 keypair = create_test_cert() 46 self.assertTrue( 47 CertificateKeyPairSerializer( 48 instance=keypair, 49 data={ 50 "name": keypair.name, 51 "certificate_data": keypair.certificate_data, 52 "key_data": keypair.key_data, 53 }, 54 ).is_valid() 55 ) 56 self.assertFalse( 57 CertificateKeyPairSerializer( 58 instance=keypair, 59 data={ 60 "name": keypair.name, 61 "certificate_data": "test", 62 "key_data": "test", 63 }, 64 ).is_valid() 65 ) 66 67 def test_builder(self): 68 """Test Builder""" 69 name = generate_id() 70 builder = CertificateBuilder(name) 71 with self.assertRaises(ValueError): 72 builder.save() 73 builder.build( 74 subject_alt_names=[], 75 validity_days=3, 76 ) 77 instance = builder.save() 78 _now = now() 79 self.assertEqual(instance.name, name) 80 self.assertEqual((instance.certificate.not_valid_after_utc - _now).days, 2) 81 82 def test_builder_api(self): 83 """Test Builder (via API)""" 84 self.client.force_login(create_test_admin_user()) 85 name = generate_id() 86 self.client.post( 87 reverse("authentik_api:certificatekeypair-generate"), 88 data={"common_name": name, "subject_alt_name": "bar,baz", "validity_days": 3}, 89 ) 90 key = CertificateKeyPair.objects.filter(name=name).first() 91 self.assertIsNotNone(key) 92 ext: SubjectAlternativeName = key.certificate.extensions[0].value 93 self.assertIsInstance(ext, SubjectAlternativeName) 94 self.assertIsInstance(ext[0], DNSName) 95 self.assertEqual(ext[0].value, "bar") 96 self.assertIsInstance(ext[1], DNSName) 97 self.assertEqual(ext[1].value, "baz") 98 99 def test_builder_api_duplicate(self): 100 """Test Builder (via API)""" 101 cert = create_test_cert() 102 self.client.force_login(create_test_admin_user()) 103 res = self.client.post( 104 reverse("authentik_api:certificatekeypair-generate"), 105 data={"common_name": cert.name, "subject_alt_name": "bar,baz", "validity_days": 3}, 106 ) 107 self.assertEqual(res.status_code, 400) 108 self.assertJSONEqual(res.content, {"common_name": ["This field must be unique."]}) 109 110 def test_builder_api_empty_san(self): 111 """Test Builder (via API)""" 112 self.client.force_login(create_test_admin_user()) 113 name = generate_id() 114 self.client.post( 115 reverse("authentik_api:certificatekeypair-generate"), 116 data={"common_name": name, "subject_alt_name": "", "validity_days": 3}, 117 ) 118 key = CertificateKeyPair.objects.filter(name=name).first() 119 self.assertIsNotNone(key) 120 self.assertEqual(len(key.certificate.extensions), 0) 121 122 def test_builder_api_empty_san_multiple(self): 123 """Test Builder (via API)""" 124 self.client.force_login(create_test_admin_user()) 125 name = generate_id() 126 self.client.post( 127 reverse("authentik_api:certificatekeypair-generate"), 128 data={"common_name": name, "subject_alt_name": ", ", "validity_days": 3}, 129 ) 130 key = CertificateKeyPair.objects.filter(name=name).first() 131 self.assertIsNotNone(key) 132 self.assertEqual(len(key.certificate.extensions), 0) 133 134 def test_builder_api_invalid(self): 135 """Test Builder (via API) (invalid)""" 136 self.client.force_login(create_test_admin_user()) 137 response = self.client.post( 138 reverse("authentik_api:certificatekeypair-generate"), 139 data={}, 140 ) 141 self.assertEqual(response.status_code, 400) 142 143 def test_list(self): 144 """Test API List""" 145 cert = create_test_cert() 146 self.client.force_login(create_test_admin_user()) 147 response = self.client.get( 148 reverse( 149 "authentik_api:certificatekeypair-list", 150 ), 151 data={"name": cert.name}, 152 ) 153 self.assertEqual(response.status_code, 200) 154 body = loads(response.content.decode()) 155 api_cert = [x for x in body["results"] if x["name"] == cert.name][0] 156 self.assertEqual(api_cert["fingerprint_sha1"], cert.fingerprint_sha1) 157 self.assertEqual(api_cert["fingerprint_sha256"], cert.fingerprint_sha256) 158 159 def test_list_has_key_false(self): 160 """Test API List with has_key set to false""" 161 cert = create_test_cert() 162 cert.key_data = "" 163 cert.save() 164 self.client.force_login(create_test_admin_user()) 165 response = self.client.get( 166 reverse( 167 "authentik_api:certificatekeypair-list", 168 ), 169 data={"name": cert.name, "has_key": False}, 170 ) 171 self.assertEqual(response.status_code, 200) 172 body = loads(response.content.decode()) 173 api_cert = [x for x in body["results"] if x["name"] == cert.name][0] 174 self.assertEqual(api_cert["fingerprint_sha1"], cert.fingerprint_sha1) 175 self.assertEqual(api_cert["fingerprint_sha256"], cert.fingerprint_sha256) 176 177 def test_list_always_includes_details(self): 178 """Test API List always includes certificate details""" 179 cert = create_test_cert() 180 self.client.force_login(create_test_admin_user()) 181 response = self.client.get( 182 reverse( 183 "authentik_api:certificatekeypair-list", 184 ), 185 data={"name": cert.name}, 186 ) 187 self.assertEqual(response.status_code, 200) 188 body = loads(response.content.decode()) 189 api_cert = [x for x in body["results"] if x["name"] == cert.name][0] 190 # All details should now always be included 191 self.assertEqual(api_cert["fingerprint_sha1"], cert.fingerprint_sha1) 192 self.assertEqual(api_cert["fingerprint_sha256"], cert.fingerprint_sha256) 193 self.assertIsNotNone(api_cert["cert_expiry"]) 194 self.assertIsNotNone(api_cert["cert_subject"]) 195 196 def test_certificate_download(self): 197 """Test certificate export (download)""" 198 keypair = create_test_cert() 199 user = create_test_user() 200 user.assign_perms_to_managed_role("authentik_crypto.view_certificatekeypair", keypair) 201 user.assign_perms_to_managed_role( 202 "authentik_crypto.view_certificatekeypair_certificate", keypair 203 ) 204 self.client.force_login(user) 205 response = self.client.get( 206 reverse( 207 "authentik_api:certificatekeypair-view-certificate", 208 kwargs={"pk": keypair.pk}, 209 ) 210 ) 211 self.assertEqual(response.status_code, 200) 212 response = self.client.get( 213 reverse( 214 "authentik_api:certificatekeypair-view-certificate", 215 kwargs={"pk": keypair.pk}, 216 ), 217 data={"download": True}, 218 ) 219 self.assertEqual(response.status_code, 200) 220 self.assertIn("Content-Disposition", response) 221 222 def test_private_key_download(self): 223 """Test private_key export (download)""" 224 keypair = create_test_cert() 225 user = create_test_user() 226 user.assign_perms_to_managed_role("authentik_crypto.view_certificatekeypair", keypair) 227 user.assign_perms_to_managed_role("authentik_crypto.view_certificatekeypair_key", keypair) 228 self.client.force_login(user) 229 response = self.client.get( 230 reverse( 231 "authentik_api:certificatekeypair-view-private-key", 232 kwargs={"pk": keypair.pk}, 233 ) 234 ) 235 self.assertEqual(response.status_code, 200) 236 response = self.client.get( 237 reverse( 238 "authentik_api:certificatekeypair-view-private-key", 239 kwargs={"pk": keypair.pk}, 240 ), 241 data={"download": True}, 242 ) 243 self.assertEqual(response.status_code, 200) 244 self.assertIn("Content-Disposition", response) 245 246 def test_certificate_download_denied(self): 247 """Test certificate export (download)""" 248 self.client.force_login(create_test_user()) 249 keypair = create_test_cert() 250 response = self.client.get( 251 reverse( 252 "authentik_api:certificatekeypair-view-certificate", 253 kwargs={"pk": keypair.pk}, 254 ) 255 ) 256 self.assertEqual(403, response.status_code) 257 response = self.client.get( 258 reverse( 259 "authentik_api:certificatekeypair-view-certificate", 260 kwargs={"pk": keypair.pk}, 261 ), 262 data={"download": True}, 263 ) 264 self.assertEqual(403, response.status_code) 265 266 def test_private_key_download_denied(self): 267 """Test private_key export (download)""" 268 self.client.force_login(create_test_user()) 269 keypair = create_test_cert() 270 response = self.client.get( 271 reverse( 272 "authentik_api:certificatekeypair-view-private-key", 273 kwargs={"pk": keypair.pk}, 274 ) 275 ) 276 self.assertEqual(403, response.status_code) 277 response = self.client.get( 278 reverse( 279 "authentik_api:certificatekeypair-view-private-key", 280 kwargs={"pk": keypair.pk}, 281 ), 282 data={"download": True}, 283 ) 284 self.assertEqual(403, response.status_code) 285 286 def test_used_by(self): 287 """Test used_by endpoint""" 288 self.client.force_login(create_test_admin_user()) 289 keypair = create_test_cert() 290 provider = OAuth2Provider.objects.create( 291 name=generate_id(), 292 client_id=generate_id(), 293 client_secret=generate_key(), 294 authorization_flow=create_test_flow(), 295 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")], 296 signing_key=keypair, 297 ) 298 response = self.client.get( 299 reverse( 300 "authentik_api:certificatekeypair-used-by", 301 kwargs={"pk": keypair.pk}, 302 ) 303 ) 304 self.assertEqual(response.status_code, 200) 305 self.assertJSONEqual( 306 response.content.decode(), 307 [ 308 { 309 "app": "authentik_providers_oauth2", 310 "model_name": "oauth2provider", 311 "pk": str(provider.pk), 312 "name": str(provider), 313 "action": DeleteAction.SET_NULL.value, 314 } 315 ], 316 ) 317 318 def test_used_by_denied(self): 319 """Test used_by endpoint""" 320 self.client.logout() 321 keypair = create_test_cert() 322 OAuth2Provider.objects.create( 323 name=generate_id(), 324 client_id=generate_id(), 325 client_secret=generate_key(), 326 authorization_flow=create_test_flow(), 327 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")], 328 signing_key=keypair, 329 ) 330 response = self.client.get( 331 reverse( 332 "authentik_api:certificatekeypair-used-by", 333 kwargs={"pk": keypair.pk}, 334 ) 335 ) 336 self.assertEqual(403, response.status_code) 337 338 def test_discovery(self): 339 """Test certificate discovery""" 340 # This test generates 2 separate cert/key combinations 341 # and verifies they both import properly 342 name = generate_id() 343 builder = CertificateBuilder(name) 344 with self.assertRaises(ValueError): 345 builder.save() 346 builder.build( 347 subject_alt_names=[], 348 validity_days=3, 349 ) 350 351 name2 = generate_id() 352 builder2 = CertificateBuilder(name2) 353 with self.assertRaises(ValueError): 354 builder2.save() 355 builder2.build( 356 subject_alt_names=[], 357 validity_days=3, 358 ) 359 360 name3 = generate_id() 361 builder3 = CertificateBuilder(name3) 362 with self.assertRaises(ValueError): 363 builder3.save() 364 builder3.build( 365 subject_alt_names=[], 366 validity_days=3, 367 ) 368 369 with TemporaryDirectory() as temp_dir: 370 with open(f"{temp_dir}/foo.pem", "w+", encoding="utf-8") as _cert: 371 _cert.write(builder.certificate) 372 with open(f"{temp_dir}/foo.key", "w+", encoding="utf-8") as _key: 373 _key.write(builder.private_key) 374 makedirs(f"{temp_dir}/foo.bar", exist_ok=True) 375 with open(f"{temp_dir}/foo.bar/fullchain.pem", "w+", encoding="utf-8") as _cert: 376 _cert.write(builder2.certificate) 377 with open(f"{temp_dir}/foo.bar/privkey.pem", "w+", encoding="utf-8") as _key: 378 _key.write(builder2.private_key) 379 with open(f"{temp_dir}/tls-combined.pem", "w+", encoding="utf-8") as _cert: 380 _cert.write(builder3.certificate) 381 with CONFIG.patch("cert_discovery_dir", temp_dir): 382 certificate_discovery.send() 383 keypair: CertificateKeyPair = CertificateKeyPair.objects.filter( 384 managed=MANAGED_DISCOVERED % "foo" 385 ).first() 386 self.assertIsNotNone(keypair) 387 self.assertIsNotNone(keypair.certificate) 388 self.assertIsNotNone(keypair.private_key) 389 self.assertTrue( 390 CertificateKeyPair.objects.filter(managed=MANAGED_DISCOVERED % "foo.bar").exists() 391 ) 392 self.assertFalse( 393 CertificateKeyPair.objects.filter(managed=MANAGED_DISCOVERED % "tls-combined").exists() 394 ) 395 396 def test_discovery_updating_same_private_key(self): 397 """Test certificate discovery updating certs with matching private keys""" 398 name = generate_id() 399 builder = CertificateBuilder(name) 400 builder.build( 401 subject_alt_names=[], 402 validity_days=3, 403 ) 404 405 with TemporaryDirectory() as temp_dir: 406 # First discovery: write cert as "original" 407 with open(f"{temp_dir}/original.pem", "w+", encoding="utf-8") as _cert: 408 _cert.write(builder.certificate) 409 with open(f"{temp_dir}/original.key", "w+", encoding="utf-8") as _key: 410 _key.write(builder.private_key) 411 412 with CONFIG.patch("cert_discovery_dir", temp_dir): 413 certificate_discovery.send() 414 415 # Verify "original" cert was created 416 original = CertificateKeyPair.objects.filter( 417 managed=MANAGED_DISCOVERED % "original" 418 ).first() 419 self.assertIsNotNone(original) 420 self.assertEqual(original.name, "original") 421 self.assertIsNotNone(original.private_key) 422 423 # Second discovery: write same cert/key as "renamed" 424 Path(f"{temp_dir}/original.pem").unlink() 425 Path(f"{temp_dir}/original.key").unlink() 426 427 with open(f"{temp_dir}/renamed.pem", "w+", encoding="utf-8") as _cert: 428 _cert.write(builder.certificate) 429 with open(f"{temp_dir}/renamed.key", "w+", encoding="utf-8") as _key: 430 _key.write(builder.private_key) 431 432 with CONFIG.patch("cert_discovery_dir", temp_dir): 433 certificate_discovery.send() 434 435 # Verify the cert was updated 436 renamed = CertificateKeyPair.objects.filter( 437 managed=MANAGED_DISCOVERED % "renamed" 438 ).first() 439 self.assertIsNotNone(renamed, "Renamed certificate should exist") 440 self.assertEqual(renamed.name, "renamed") 441 self.assertEqual(renamed.pk, original.pk, "Should be same database object") 442 443 # Verify no new cert was created 444 final_count = CertificateKeyPair.objects.filter( 445 managed__startswith="goauthentik.io/crypto/discovered/" 446 ).count() 447 self.assertEqual( 448 1, final_count, "Should not create duplicate cert for same private key" 449 ) 450 451 def test_metadata_extraction_with_cert_and_key(self): 452 """Test that metadata is extracted when creating keypair with certificate and key""" 453 cert = create_test_cert() 454 455 # Verify all metadata fields are populated 456 self.assertIsNotNone(cert.key_type) 457 self.assertIsNotNone(cert.cert_expiry) 458 self.assertIsNotNone(cert.cert_subject) 459 self.assertIsNotNone(cert.fingerprint_sha256) 460 self.assertIsNotNone(cert.fingerprint_sha1) 461 462 # Verify kid is generated using SHA512 for new records 463 self.assertIsNotNone(cert.kid) 464 self.assertEqual(cert.kid, generate_key_id(cert.key_data)) 465 466 def test_metadata_extraction_without_key(self): 467 """Test that metadata is extracted when creating keypair without private key""" 468 builder = CertificateBuilder(generate_id()) 469 builder.build(subject_alt_names=[], validity_days=3) 470 471 # Create keypair with only certificate, no key 472 cert = CertificateKeyPair.objects.create( 473 name=generate_id(), 474 certificate_data=builder.certificate, 475 key_data="", 476 ) 477 478 # Verify certificate metadata fields are populated 479 self.assertIsNotNone(cert.key_type) 480 self.assertIsNotNone(cert.cert_expiry) 481 self.assertIsNotNone(cert.cert_subject) 482 self.assertIsNotNone(cert.fingerprint_sha256) 483 self.assertIsNotNone(cert.fingerprint_sha1) 484 485 # Verify kid is empty when no key_data 486 self.assertEqual(cert.kid, None) 487 488 def test_metadata_extraction_invalid_cert(self): 489 """Test that invalid certificate data doesn't crash, just skips metadata""" 490 cert = CertificateKeyPair.objects.create( 491 name=generate_id(), 492 certificate_data="invalid certificate data", 493 key_data="", 494 ) 495 496 # Verify metadata fields are None for invalid cert 497 self.assertIsNone(cert.key_type) 498 self.assertIsNone(cert.cert_expiry) 499 self.assertIsNone(cert.cert_subject) 500 self.assertIsNone(cert.fingerprint_sha256) 501 self.assertIsNone(cert.fingerprint_sha1) 502 self.assertIsNone(cert.kid) 503 504 def test_kid_legacy_preservation(self): 505 """Test that legacy MD5 kid is preserved when key_data hasn't changed""" 506 cert = create_test_cert() 507 508 # Simulate a legacy MD5 kid (as if backfilled from old system) 509 legacy_kid = generate_key_id_legacy(cert.key_data) 510 CertificateKeyPair.objects.filter(pk=cert.pk).update(kid=legacy_kid) 511 cert.refresh_from_db() 512 self.assertEqual(cert.kid, legacy_kid) 513 514 # Save the cert again (e.g., name change) - kid should be preserved 515 cert.name = generate_id() 516 cert.save() 517 cert.refresh_from_db() 518 519 self.assertEqual(cert.kid, legacy_kid) 520 521 def test_kid_regenerated_on_key_change(self): 522 """Test that kid is regenerated when key_data changes""" 523 cert = create_test_cert() 524 original_kid = cert.kid 525 526 # Generate a new key and update the keypair 527 builder = CertificateBuilder(generate_id()) 528 builder.build(subject_alt_names=[], validity_days=3) 529 530 cert.key_data = builder.private_key 531 cert.certificate_data = builder.certificate 532 cert.save() 533 cert.refresh_from_db() 534 535 # Kid should be regenerated for the new key 536 self.assertNotEqual(cert.kid, original_kid) 537 self.assertEqual(cert.kid, generate_key_id(cert.key_data)) 538 539 def test_kid_regenerated_on_key_change_from_legacy(self): 540 """Test that kid is regenerated from legacy MD5 when key_data changes""" 541 cert = create_test_cert() 542 543 # Simulate a legacy MD5 kid 544 legacy_kid = generate_key_id_legacy(cert.key_data) 545 CertificateKeyPair.objects.filter(pk=cert.pk).update(kid=legacy_kid) 546 cert.refresh_from_db() 547 self.assertEqual(cert.kid, legacy_kid) 548 549 # Generate a new key and update the keypair 550 builder = CertificateBuilder(generate_id()) 551 builder.build(subject_alt_names=[], validity_days=3) 552 553 cert.key_data = builder.private_key 554 cert.certificate_data = builder.certificate 555 cert.save() 556 cert.refresh_from_db() 557 558 # Kid should now be SHA512 for the new key 559 self.assertNotEqual(cert.kid, legacy_kid) 560 self.assertEqual(cert.kid, generate_key_id(cert.key_data))
Test Crypto validation
34 def test_model_private(self): 35 """Test model private key""" 36 cert = CertificateKeyPair.objects.create( 37 name=generate_id(), 38 certificate_data="foo", 39 key_data="foo", 40 ) 41 self.assertIsNone(cert.private_key)
Test model private key
43 def test_serializer(self): 44 """Test API Validation""" 45 keypair = create_test_cert() 46 self.assertTrue( 47 CertificateKeyPairSerializer( 48 instance=keypair, 49 data={ 50 "name": keypair.name, 51 "certificate_data": keypair.certificate_data, 52 "key_data": keypair.key_data, 53 }, 54 ).is_valid() 55 ) 56 self.assertFalse( 57 CertificateKeyPairSerializer( 58 instance=keypair, 59 data={ 60 "name": keypair.name, 61 "certificate_data": "test", 62 "key_data": "test", 63 }, 64 ).is_valid() 65 )
Test API Validation
67 def test_builder(self): 68 """Test Builder""" 69 name = generate_id() 70 builder = CertificateBuilder(name) 71 with self.assertRaises(ValueError): 72 builder.save() 73 builder.build( 74 subject_alt_names=[], 75 validity_days=3, 76 ) 77 instance = builder.save() 78 _now = now() 79 self.assertEqual(instance.name, name) 80 self.assertEqual((instance.certificate.not_valid_after_utc - _now).days, 2)
Test Builder
82 def test_builder_api(self): 83 """Test Builder (via API)""" 84 self.client.force_login(create_test_admin_user()) 85 name = generate_id() 86 self.client.post( 87 reverse("authentik_api:certificatekeypair-generate"), 88 data={"common_name": name, "subject_alt_name": "bar,baz", "validity_days": 3}, 89 ) 90 key = CertificateKeyPair.objects.filter(name=name).first() 91 self.assertIsNotNone(key) 92 ext: SubjectAlternativeName = key.certificate.extensions[0].value 93 self.assertIsInstance(ext, SubjectAlternativeName) 94 self.assertIsInstance(ext[0], DNSName) 95 self.assertEqual(ext[0].value, "bar") 96 self.assertIsInstance(ext[1], DNSName) 97 self.assertEqual(ext[1].value, "baz")
Test Builder (via API)
99 def test_builder_api_duplicate(self): 100 """Test Builder (via API)""" 101 cert = create_test_cert() 102 self.client.force_login(create_test_admin_user()) 103 res = self.client.post( 104 reverse("authentik_api:certificatekeypair-generate"), 105 data={"common_name": cert.name, "subject_alt_name": "bar,baz", "validity_days": 3}, 106 ) 107 self.assertEqual(res.status_code, 400) 108 self.assertJSONEqual(res.content, {"common_name": ["This field must be unique."]})
Test Builder (via API)
110 def test_builder_api_empty_san(self): 111 """Test Builder (via API)""" 112 self.client.force_login(create_test_admin_user()) 113 name = generate_id() 114 self.client.post( 115 reverse("authentik_api:certificatekeypair-generate"), 116 data={"common_name": name, "subject_alt_name": "", "validity_days": 3}, 117 ) 118 key = CertificateKeyPair.objects.filter(name=name).first() 119 self.assertIsNotNone(key) 120 self.assertEqual(len(key.certificate.extensions), 0)
Test Builder (via API)
122 def test_builder_api_empty_san_multiple(self): 123 """Test Builder (via API)""" 124 self.client.force_login(create_test_admin_user()) 125 name = generate_id() 126 self.client.post( 127 reverse("authentik_api:certificatekeypair-generate"), 128 data={"common_name": name, "subject_alt_name": ", ", "validity_days": 3}, 129 ) 130 key = CertificateKeyPair.objects.filter(name=name).first() 131 self.assertIsNotNone(key) 132 self.assertEqual(len(key.certificate.extensions), 0)
Test Builder (via API)
134 def test_builder_api_invalid(self): 135 """Test Builder (via API) (invalid)""" 136 self.client.force_login(create_test_admin_user()) 137 response = self.client.post( 138 reverse("authentik_api:certificatekeypair-generate"), 139 data={}, 140 ) 141 self.assertEqual(response.status_code, 400)
Test Builder (via API) (invalid)
143 def test_list(self): 144 """Test API List""" 145 cert = create_test_cert() 146 self.client.force_login(create_test_admin_user()) 147 response = self.client.get( 148 reverse( 149 "authentik_api:certificatekeypair-list", 150 ), 151 data={"name": cert.name}, 152 ) 153 self.assertEqual(response.status_code, 200) 154 body = loads(response.content.decode()) 155 api_cert = [x for x in body["results"] if x["name"] == cert.name][0] 156 self.assertEqual(api_cert["fingerprint_sha1"], cert.fingerprint_sha1) 157 self.assertEqual(api_cert["fingerprint_sha256"], cert.fingerprint_sha256)
Test API List
159 def test_list_has_key_false(self): 160 """Test API List with has_key set to false""" 161 cert = create_test_cert() 162 cert.key_data = "" 163 cert.save() 164 self.client.force_login(create_test_admin_user()) 165 response = self.client.get( 166 reverse( 167 "authentik_api:certificatekeypair-list", 168 ), 169 data={"name": cert.name, "has_key": False}, 170 ) 171 self.assertEqual(response.status_code, 200) 172 body = loads(response.content.decode()) 173 api_cert = [x for x in body["results"] if x["name"] == cert.name][0] 174 self.assertEqual(api_cert["fingerprint_sha1"], cert.fingerprint_sha1) 175 self.assertEqual(api_cert["fingerprint_sha256"], cert.fingerprint_sha256)
Test API List with has_key set to false
177 def test_list_always_includes_details(self): 178 """Test API List always includes certificate details""" 179 cert = create_test_cert() 180 self.client.force_login(create_test_admin_user()) 181 response = self.client.get( 182 reverse( 183 "authentik_api:certificatekeypair-list", 184 ), 185 data={"name": cert.name}, 186 ) 187 self.assertEqual(response.status_code, 200) 188 body = loads(response.content.decode()) 189 api_cert = [x for x in body["results"] if x["name"] == cert.name][0] 190 # All details should now always be included 191 self.assertEqual(api_cert["fingerprint_sha1"], cert.fingerprint_sha1) 192 self.assertEqual(api_cert["fingerprint_sha256"], cert.fingerprint_sha256) 193 self.assertIsNotNone(api_cert["cert_expiry"]) 194 self.assertIsNotNone(api_cert["cert_subject"])
Test API List always includes certificate details
196 def test_certificate_download(self): 197 """Test certificate export (download)""" 198 keypair = create_test_cert() 199 user = create_test_user() 200 user.assign_perms_to_managed_role("authentik_crypto.view_certificatekeypair", keypair) 201 user.assign_perms_to_managed_role( 202 "authentik_crypto.view_certificatekeypair_certificate", keypair 203 ) 204 self.client.force_login(user) 205 response = self.client.get( 206 reverse( 207 "authentik_api:certificatekeypair-view-certificate", 208 kwargs={"pk": keypair.pk}, 209 ) 210 ) 211 self.assertEqual(response.status_code, 200) 212 response = self.client.get( 213 reverse( 214 "authentik_api:certificatekeypair-view-certificate", 215 kwargs={"pk": keypair.pk}, 216 ), 217 data={"download": True}, 218 ) 219 self.assertEqual(response.status_code, 200) 220 self.assertIn("Content-Disposition", response)
Test certificate export (download)
222 def test_private_key_download(self): 223 """Test private_key export (download)""" 224 keypair = create_test_cert() 225 user = create_test_user() 226 user.assign_perms_to_managed_role("authentik_crypto.view_certificatekeypair", keypair) 227 user.assign_perms_to_managed_role("authentik_crypto.view_certificatekeypair_key", keypair) 228 self.client.force_login(user) 229 response = self.client.get( 230 reverse( 231 "authentik_api:certificatekeypair-view-private-key", 232 kwargs={"pk": keypair.pk}, 233 ) 234 ) 235 self.assertEqual(response.status_code, 200) 236 response = self.client.get( 237 reverse( 238 "authentik_api:certificatekeypair-view-private-key", 239 kwargs={"pk": keypair.pk}, 240 ), 241 data={"download": True}, 242 ) 243 self.assertEqual(response.status_code, 200) 244 self.assertIn("Content-Disposition", response)
Test private_key export (download)
246 def test_certificate_download_denied(self): 247 """Test certificate export (download)""" 248 self.client.force_login(create_test_user()) 249 keypair = create_test_cert() 250 response = self.client.get( 251 reverse( 252 "authentik_api:certificatekeypair-view-certificate", 253 kwargs={"pk": keypair.pk}, 254 ) 255 ) 256 self.assertEqual(403, response.status_code) 257 response = self.client.get( 258 reverse( 259 "authentik_api:certificatekeypair-view-certificate", 260 kwargs={"pk": keypair.pk}, 261 ), 262 data={"download": True}, 263 ) 264 self.assertEqual(403, response.status_code)
Test certificate export (download)
266 def test_private_key_download_denied(self): 267 """Test private_key export (download)""" 268 self.client.force_login(create_test_user()) 269 keypair = create_test_cert() 270 response = self.client.get( 271 reverse( 272 "authentik_api:certificatekeypair-view-private-key", 273 kwargs={"pk": keypair.pk}, 274 ) 275 ) 276 self.assertEqual(403, response.status_code) 277 response = self.client.get( 278 reverse( 279 "authentik_api:certificatekeypair-view-private-key", 280 kwargs={"pk": keypair.pk}, 281 ), 282 data={"download": True}, 283 ) 284 self.assertEqual(403, response.status_code)
Test private_key export (download)
286 def test_used_by(self): 287 """Test used_by endpoint""" 288 self.client.force_login(create_test_admin_user()) 289 keypair = create_test_cert() 290 provider = OAuth2Provider.objects.create( 291 name=generate_id(), 292 client_id=generate_id(), 293 client_secret=generate_key(), 294 authorization_flow=create_test_flow(), 295 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")], 296 signing_key=keypair, 297 ) 298 response = self.client.get( 299 reverse( 300 "authentik_api:certificatekeypair-used-by", 301 kwargs={"pk": keypair.pk}, 302 ) 303 ) 304 self.assertEqual(response.status_code, 200) 305 self.assertJSONEqual( 306 response.content.decode(), 307 [ 308 { 309 "app": "authentik_providers_oauth2", 310 "model_name": "oauth2provider", 311 "pk": str(provider.pk), 312 "name": str(provider), 313 "action": DeleteAction.SET_NULL.value, 314 } 315 ], 316 )
Test used_by endpoint
318 def test_used_by_denied(self): 319 """Test used_by endpoint""" 320 self.client.logout() 321 keypair = create_test_cert() 322 OAuth2Provider.objects.create( 323 name=generate_id(), 324 client_id=generate_id(), 325 client_secret=generate_key(), 326 authorization_flow=create_test_flow(), 327 redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")], 328 signing_key=keypair, 329 ) 330 response = self.client.get( 331 reverse( 332 "authentik_api:certificatekeypair-used-by", 333 kwargs={"pk": keypair.pk}, 334 ) 335 ) 336 self.assertEqual(403, response.status_code)
Test used_by endpoint
338 def test_discovery(self): 339 """Test certificate discovery""" 340 # This test generates 2 separate cert/key combinations 341 # and verifies they both import properly 342 name = generate_id() 343 builder = CertificateBuilder(name) 344 with self.assertRaises(ValueError): 345 builder.save() 346 builder.build( 347 subject_alt_names=[], 348 validity_days=3, 349 ) 350 351 name2 = generate_id() 352 builder2 = CertificateBuilder(name2) 353 with self.assertRaises(ValueError): 354 builder2.save() 355 builder2.build( 356 subject_alt_names=[], 357 validity_days=3, 358 ) 359 360 name3 = generate_id() 361 builder3 = CertificateBuilder(name3) 362 with self.assertRaises(ValueError): 363 builder3.save() 364 builder3.build( 365 subject_alt_names=[], 366 validity_days=3, 367 ) 368 369 with TemporaryDirectory() as temp_dir: 370 with open(f"{temp_dir}/foo.pem", "w+", encoding="utf-8") as _cert: 371 _cert.write(builder.certificate) 372 with open(f"{temp_dir}/foo.key", "w+", encoding="utf-8") as _key: 373 _key.write(builder.private_key) 374 makedirs(f"{temp_dir}/foo.bar", exist_ok=True) 375 with open(f"{temp_dir}/foo.bar/fullchain.pem", "w+", encoding="utf-8") as _cert: 376 _cert.write(builder2.certificate) 377 with open(f"{temp_dir}/foo.bar/privkey.pem", "w+", encoding="utf-8") as _key: 378 _key.write(builder2.private_key) 379 with open(f"{temp_dir}/tls-combined.pem", "w+", encoding="utf-8") as _cert: 380 _cert.write(builder3.certificate) 381 with CONFIG.patch("cert_discovery_dir", temp_dir): 382 certificate_discovery.send() 383 keypair: CertificateKeyPair = CertificateKeyPair.objects.filter( 384 managed=MANAGED_DISCOVERED % "foo" 385 ).first() 386 self.assertIsNotNone(keypair) 387 self.assertIsNotNone(keypair.certificate) 388 self.assertIsNotNone(keypair.private_key) 389 self.assertTrue( 390 CertificateKeyPair.objects.filter(managed=MANAGED_DISCOVERED % "foo.bar").exists() 391 ) 392 self.assertFalse( 393 CertificateKeyPair.objects.filter(managed=MANAGED_DISCOVERED % "tls-combined").exists() 394 )
Test certificate discovery
396 def test_discovery_updating_same_private_key(self): 397 """Test certificate discovery updating certs with matching private keys""" 398 name = generate_id() 399 builder = CertificateBuilder(name) 400 builder.build( 401 subject_alt_names=[], 402 validity_days=3, 403 ) 404 405 with TemporaryDirectory() as temp_dir: 406 # First discovery: write cert as "original" 407 with open(f"{temp_dir}/original.pem", "w+", encoding="utf-8") as _cert: 408 _cert.write(builder.certificate) 409 with open(f"{temp_dir}/original.key", "w+", encoding="utf-8") as _key: 410 _key.write(builder.private_key) 411 412 with CONFIG.patch("cert_discovery_dir", temp_dir): 413 certificate_discovery.send() 414 415 # Verify "original" cert was created 416 original = CertificateKeyPair.objects.filter( 417 managed=MANAGED_DISCOVERED % "original" 418 ).first() 419 self.assertIsNotNone(original) 420 self.assertEqual(original.name, "original") 421 self.assertIsNotNone(original.private_key) 422 423 # Second discovery: write same cert/key as "renamed" 424 Path(f"{temp_dir}/original.pem").unlink() 425 Path(f"{temp_dir}/original.key").unlink() 426 427 with open(f"{temp_dir}/renamed.pem", "w+", encoding="utf-8") as _cert: 428 _cert.write(builder.certificate) 429 with open(f"{temp_dir}/renamed.key", "w+", encoding="utf-8") as _key: 430 _key.write(builder.private_key) 431 432 with CONFIG.patch("cert_discovery_dir", temp_dir): 433 certificate_discovery.send() 434 435 # Verify the cert was updated 436 renamed = CertificateKeyPair.objects.filter( 437 managed=MANAGED_DISCOVERED % "renamed" 438 ).first() 439 self.assertIsNotNone(renamed, "Renamed certificate should exist") 440 self.assertEqual(renamed.name, "renamed") 441 self.assertEqual(renamed.pk, original.pk, "Should be same database object") 442 443 # Verify no new cert was created 444 final_count = CertificateKeyPair.objects.filter( 445 managed__startswith="goauthentik.io/crypto/discovered/" 446 ).count() 447 self.assertEqual( 448 1, final_count, "Should not create duplicate cert for same private key" 449 )
Test certificate discovery updating certs with matching private keys
451 def test_metadata_extraction_with_cert_and_key(self): 452 """Test that metadata is extracted when creating keypair with certificate and key""" 453 cert = create_test_cert() 454 455 # Verify all metadata fields are populated 456 self.assertIsNotNone(cert.key_type) 457 self.assertIsNotNone(cert.cert_expiry) 458 self.assertIsNotNone(cert.cert_subject) 459 self.assertIsNotNone(cert.fingerprint_sha256) 460 self.assertIsNotNone(cert.fingerprint_sha1) 461 462 # Verify kid is generated using SHA512 for new records 463 self.assertIsNotNone(cert.kid) 464 self.assertEqual(cert.kid, generate_key_id(cert.key_data))
Test that metadata is extracted when creating keypair with certificate and key
466 def test_metadata_extraction_without_key(self): 467 """Test that metadata is extracted when creating keypair without private key""" 468 builder = CertificateBuilder(generate_id()) 469 builder.build(subject_alt_names=[], validity_days=3) 470 471 # Create keypair with only certificate, no key 472 cert = CertificateKeyPair.objects.create( 473 name=generate_id(), 474 certificate_data=builder.certificate, 475 key_data="", 476 ) 477 478 # Verify certificate metadata fields are populated 479 self.assertIsNotNone(cert.key_type) 480 self.assertIsNotNone(cert.cert_expiry) 481 self.assertIsNotNone(cert.cert_subject) 482 self.assertIsNotNone(cert.fingerprint_sha256) 483 self.assertIsNotNone(cert.fingerprint_sha1) 484 485 # Verify kid is empty when no key_data 486 self.assertEqual(cert.kid, None)
Test that metadata is extracted when creating keypair without private key
488 def test_metadata_extraction_invalid_cert(self): 489 """Test that invalid certificate data doesn't crash, just skips metadata""" 490 cert = CertificateKeyPair.objects.create( 491 name=generate_id(), 492 certificate_data="invalid certificate data", 493 key_data="", 494 ) 495 496 # Verify metadata fields are None for invalid cert 497 self.assertIsNone(cert.key_type) 498 self.assertIsNone(cert.cert_expiry) 499 self.assertIsNone(cert.cert_subject) 500 self.assertIsNone(cert.fingerprint_sha256) 501 self.assertIsNone(cert.fingerprint_sha1) 502 self.assertIsNone(cert.kid)
Test that invalid certificate data doesn't crash, just skips metadata
504 def test_kid_legacy_preservation(self): 505 """Test that legacy MD5 kid is preserved when key_data hasn't changed""" 506 cert = create_test_cert() 507 508 # Simulate a legacy MD5 kid (as if backfilled from old system) 509 legacy_kid = generate_key_id_legacy(cert.key_data) 510 CertificateKeyPair.objects.filter(pk=cert.pk).update(kid=legacy_kid) 511 cert.refresh_from_db() 512 self.assertEqual(cert.kid, legacy_kid) 513 514 # Save the cert again (e.g., name change) - kid should be preserved 515 cert.name = generate_id() 516 cert.save() 517 cert.refresh_from_db() 518 519 self.assertEqual(cert.kid, legacy_kid)
Test that legacy MD5 kid is preserved when key_data hasn't changed
521 def test_kid_regenerated_on_key_change(self): 522 """Test that kid is regenerated when key_data changes""" 523 cert = create_test_cert() 524 original_kid = cert.kid 525 526 # Generate a new key and update the keypair 527 builder = CertificateBuilder(generate_id()) 528 builder.build(subject_alt_names=[], validity_days=3) 529 530 cert.key_data = builder.private_key 531 cert.certificate_data = builder.certificate 532 cert.save() 533 cert.refresh_from_db() 534 535 # Kid should be regenerated for the new key 536 self.assertNotEqual(cert.kid, original_kid) 537 self.assertEqual(cert.kid, generate_key_id(cert.key_data))
Test that kid is regenerated when key_data changes
539 def test_kid_regenerated_on_key_change_from_legacy(self): 540 """Test that kid is regenerated from legacy MD5 when key_data changes""" 541 cert = create_test_cert() 542 543 # Simulate a legacy MD5 kid 544 legacy_kid = generate_key_id_legacy(cert.key_data) 545 CertificateKeyPair.objects.filter(pk=cert.pk).update(kid=legacy_kid) 546 cert.refresh_from_db() 547 self.assertEqual(cert.kid, legacy_kid) 548 549 # Generate a new key and update the keypair 550 builder = CertificateBuilder(generate_id()) 551 builder.build(subject_alt_names=[], validity_days=3) 552 553 cert.key_data = builder.private_key 554 cert.certificate_data = builder.certificate 555 cert.save() 556 cert.refresh_from_db() 557 558 # Kid should now be SHA512 for the new key 559 self.assertNotEqual(cert.kid, legacy_kid) 560 self.assertEqual(cert.kid, generate_key_id(cert.key_data))
Test that kid is regenerated from legacy MD5 when key_data changes