authentik.crypto.tests

Crypto tests

  1"""Crypto tests"""
  2
  3from json import loads
  4from os import makedirs
  5from pathlib import Path
  6from tempfile import TemporaryDirectory
  7
  8from cryptography.x509.extensions import SubjectAlternativeName
  9from cryptography.x509.general_name import DNSName
 10from django.urls import reverse
 11from django.utils.timezone import now
 12from rest_framework.test import APITestCase
 13
 14from authentik.core.api.used_by import DeleteAction
 15from authentik.core.tests.utils import (
 16    create_test_admin_user,
 17    create_test_cert,
 18    create_test_flow,
 19    create_test_user,
 20)
 21from authentik.crypto.api import CertificateKeyPairSerializer
 22from authentik.crypto.builder import CertificateBuilder
 23from authentik.crypto.models import CertificateKeyPair, generate_key_id, generate_key_id_legacy
 24from authentik.crypto.tasks import MANAGED_DISCOVERED, certificate_discovery
 25from authentik.lib.config import CONFIG
 26from authentik.lib.generators import generate_id, generate_key
 27from authentik.providers.oauth2.models import OAuth2Provider, RedirectURI, RedirectURIMatchingMode
 28
 29
 30class TestCrypto(APITestCase):
 31    """Test Crypto validation"""
 32
 33    def test_model_private(self):
 34        """Test model private key"""
 35        cert = CertificateKeyPair.objects.create(
 36            name=generate_id(),
 37            certificate_data="foo",
 38            key_data="foo",
 39        )
 40        self.assertIsNone(cert.private_key)
 41
 42    def test_serializer(self):
 43        """Test API Validation"""
 44        keypair = create_test_cert()
 45        self.assertTrue(
 46            CertificateKeyPairSerializer(
 47                instance=keypair,
 48                data={
 49                    "name": keypair.name,
 50                    "certificate_data": keypair.certificate_data,
 51                    "key_data": keypair.key_data,
 52                },
 53            ).is_valid()
 54        )
 55        self.assertFalse(
 56            CertificateKeyPairSerializer(
 57                instance=keypair,
 58                data={
 59                    "name": keypair.name,
 60                    "certificate_data": "test",
 61                    "key_data": "test",
 62                },
 63            ).is_valid()
 64        )
 65
 66    def test_builder(self):
 67        """Test Builder"""
 68        name = generate_id()
 69        builder = CertificateBuilder(name)
 70        with self.assertRaises(ValueError):
 71            builder.save()
 72        builder.build(
 73            subject_alt_names=[],
 74            validity_days=3,
 75        )
 76        instance = builder.save()
 77        _now = now()
 78        self.assertEqual(instance.name, name)
 79        self.assertEqual((instance.certificate.not_valid_after_utc - _now).days, 2)
 80
 81    def test_builder_api(self):
 82        """Test Builder (via API)"""
 83        self.client.force_login(create_test_admin_user())
 84        name = generate_id()
 85        self.client.post(
 86            reverse("authentik_api:certificatekeypair-generate"),
 87            data={"common_name": name, "subject_alt_name": "bar,baz", "validity_days": 3},
 88        )
 89        key = CertificateKeyPair.objects.filter(name=name).first()
 90        self.assertIsNotNone(key)
 91        ext: SubjectAlternativeName = key.certificate.extensions[0].value
 92        self.assertIsInstance(ext, SubjectAlternativeName)
 93        self.assertIsInstance(ext[0], DNSName)
 94        self.assertEqual(ext[0].value, "bar")
 95        self.assertIsInstance(ext[1], DNSName)
 96        self.assertEqual(ext[1].value, "baz")
 97
 98    def test_builder_api_duplicate(self):
 99        """Test Builder (via API)"""
100        cert = create_test_cert()
101        self.client.force_login(create_test_admin_user())
102        res = self.client.post(
103            reverse("authentik_api:certificatekeypair-generate"),
104            data={"common_name": cert.name, "subject_alt_name": "bar,baz", "validity_days": 3},
105        )
106        self.assertEqual(res.status_code, 400)
107        self.assertJSONEqual(res.content, {"common_name": ["This field must be unique."]})
108
109    def test_builder_api_empty_san(self):
110        """Test Builder (via API)"""
111        self.client.force_login(create_test_admin_user())
112        name = generate_id()
113        self.client.post(
114            reverse("authentik_api:certificatekeypair-generate"),
115            data={"common_name": name, "subject_alt_name": "", "validity_days": 3},
116        )
117        key = CertificateKeyPair.objects.filter(name=name).first()
118        self.assertIsNotNone(key)
119        self.assertEqual(len(key.certificate.extensions), 0)
120
121    def test_builder_api_empty_san_multiple(self):
122        """Test Builder (via API)"""
123        self.client.force_login(create_test_admin_user())
124        name = generate_id()
125        self.client.post(
126            reverse("authentik_api:certificatekeypair-generate"),
127            data={"common_name": name, "subject_alt_name": ", ", "validity_days": 3},
128        )
129        key = CertificateKeyPair.objects.filter(name=name).first()
130        self.assertIsNotNone(key)
131        self.assertEqual(len(key.certificate.extensions), 0)
132
133    def test_builder_api_invalid(self):
134        """Test Builder (via API) (invalid)"""
135        self.client.force_login(create_test_admin_user())
136        response = self.client.post(
137            reverse("authentik_api:certificatekeypair-generate"),
138            data={},
139        )
140        self.assertEqual(response.status_code, 400)
141
142    def test_list(self):
143        """Test API List"""
144        cert = create_test_cert()
145        self.client.force_login(create_test_admin_user())
146        response = self.client.get(
147            reverse(
148                "authentik_api:certificatekeypair-list",
149            ),
150            data={"name": cert.name},
151        )
152        self.assertEqual(response.status_code, 200)
153        body = loads(response.content.decode())
154        api_cert = [x for x in body["results"] if x["name"] == cert.name][0]
155        self.assertEqual(api_cert["fingerprint_sha1"], cert.fingerprint_sha1)
156        self.assertEqual(api_cert["fingerprint_sha256"], cert.fingerprint_sha256)
157
158    def test_list_has_key_false(self):
159        """Test API List with has_key set to false"""
160        cert = create_test_cert()
161        cert.key_data = ""
162        cert.save()
163        self.client.force_login(create_test_admin_user())
164        response = self.client.get(
165            reverse(
166                "authentik_api:certificatekeypair-list",
167            ),
168            data={"name": cert.name, "has_key": False},
169        )
170        self.assertEqual(response.status_code, 200)
171        body = loads(response.content.decode())
172        api_cert = [x for x in body["results"] if x["name"] == cert.name][0]
173        self.assertEqual(api_cert["fingerprint_sha1"], cert.fingerprint_sha1)
174        self.assertEqual(api_cert["fingerprint_sha256"], cert.fingerprint_sha256)
175
176    def test_list_always_includes_details(self):
177        """Test API List always includes certificate details"""
178        cert = create_test_cert()
179        self.client.force_login(create_test_admin_user())
180        response = self.client.get(
181            reverse(
182                "authentik_api:certificatekeypair-list",
183            ),
184            data={"name": cert.name},
185        )
186        self.assertEqual(response.status_code, 200)
187        body = loads(response.content.decode())
188        api_cert = [x for x in body["results"] if x["name"] == cert.name][0]
189        # All details should now always be included
190        self.assertEqual(api_cert["fingerprint_sha1"], cert.fingerprint_sha1)
191        self.assertEqual(api_cert["fingerprint_sha256"], cert.fingerprint_sha256)
192        self.assertIsNotNone(api_cert["cert_expiry"])
193        self.assertIsNotNone(api_cert["cert_subject"])
194
195    def test_certificate_download(self):
196        """Test certificate export (download)"""
197        keypair = create_test_cert()
198        user = create_test_user()
199        user.assign_perms_to_managed_role("authentik_crypto.view_certificatekeypair", keypair)
200        user.assign_perms_to_managed_role(
201            "authentik_crypto.view_certificatekeypair_certificate", keypair
202        )
203        self.client.force_login(user)
204        response = self.client.get(
205            reverse(
206                "authentik_api:certificatekeypair-view-certificate",
207                kwargs={"pk": keypair.pk},
208            )
209        )
210        self.assertEqual(response.status_code, 200)
211        response = self.client.get(
212            reverse(
213                "authentik_api:certificatekeypair-view-certificate",
214                kwargs={"pk": keypair.pk},
215            ),
216            data={"download": True},
217        )
218        self.assertEqual(response.status_code, 200)
219        self.assertIn("Content-Disposition", response)
220
221    def test_private_key_download(self):
222        """Test private_key export (download)"""
223        keypair = create_test_cert()
224        user = create_test_user()
225        user.assign_perms_to_managed_role("authentik_crypto.view_certificatekeypair", keypair)
226        user.assign_perms_to_managed_role("authentik_crypto.view_certificatekeypair_key", keypair)
227        self.client.force_login(user)
228        response = self.client.get(
229            reverse(
230                "authentik_api:certificatekeypair-view-private-key",
231                kwargs={"pk": keypair.pk},
232            )
233        )
234        self.assertEqual(response.status_code, 200)
235        response = self.client.get(
236            reverse(
237                "authentik_api:certificatekeypair-view-private-key",
238                kwargs={"pk": keypair.pk},
239            ),
240            data={"download": True},
241        )
242        self.assertEqual(response.status_code, 200)
243        self.assertIn("Content-Disposition", response)
244
245    def test_certificate_download_denied(self):
246        """Test certificate export (download)"""
247        self.client.force_login(create_test_user())
248        keypair = create_test_cert()
249        response = self.client.get(
250            reverse(
251                "authentik_api:certificatekeypair-view-certificate",
252                kwargs={"pk": keypair.pk},
253            )
254        )
255        self.assertEqual(403, response.status_code)
256        response = self.client.get(
257            reverse(
258                "authentik_api:certificatekeypair-view-certificate",
259                kwargs={"pk": keypair.pk},
260            ),
261            data={"download": True},
262        )
263        self.assertEqual(403, response.status_code)
264
265    def test_private_key_download_denied(self):
266        """Test private_key export (download)"""
267        self.client.force_login(create_test_user())
268        keypair = create_test_cert()
269        response = self.client.get(
270            reverse(
271                "authentik_api:certificatekeypair-view-private-key",
272                kwargs={"pk": keypair.pk},
273            )
274        )
275        self.assertEqual(403, response.status_code)
276        response = self.client.get(
277            reverse(
278                "authentik_api:certificatekeypair-view-private-key",
279                kwargs={"pk": keypair.pk},
280            ),
281            data={"download": True},
282        )
283        self.assertEqual(403, response.status_code)
284
285    def test_used_by(self):
286        """Test used_by endpoint"""
287        self.client.force_login(create_test_admin_user())
288        keypair = create_test_cert()
289        provider = OAuth2Provider.objects.create(
290            name=generate_id(),
291            client_id=generate_id(),
292            client_secret=generate_key(),
293            authorization_flow=create_test_flow(),
294            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")],
295            signing_key=keypair,
296        )
297        response = self.client.get(
298            reverse(
299                "authentik_api:certificatekeypair-used-by",
300                kwargs={"pk": keypair.pk},
301            )
302        )
303        self.assertEqual(response.status_code, 200)
304        self.assertJSONEqual(
305            response.content.decode(),
306            [
307                {
308                    "app": "authentik_providers_oauth2",
309                    "model_name": "oauth2provider",
310                    "pk": str(provider.pk),
311                    "name": str(provider),
312                    "action": DeleteAction.SET_NULL.value,
313                }
314            ],
315        )
316
317    def test_used_by_denied(self):
318        """Test used_by endpoint"""
319        self.client.logout()
320        keypair = create_test_cert()
321        OAuth2Provider.objects.create(
322            name=generate_id(),
323            client_id=generate_id(),
324            client_secret=generate_key(),
325            authorization_flow=create_test_flow(),
326            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")],
327            signing_key=keypair,
328        )
329        response = self.client.get(
330            reverse(
331                "authentik_api:certificatekeypair-used-by",
332                kwargs={"pk": keypair.pk},
333            )
334        )
335        self.assertEqual(403, response.status_code)
336
337    def test_discovery(self):
338        """Test certificate discovery"""
339        # This test generates 2 separate cert/key combinations
340        # and verifies they both import properly
341        name = generate_id()
342        builder = CertificateBuilder(name)
343        with self.assertRaises(ValueError):
344            builder.save()
345        builder.build(
346            subject_alt_names=[],
347            validity_days=3,
348        )
349
350        name2 = generate_id()
351        builder2 = CertificateBuilder(name2)
352        with self.assertRaises(ValueError):
353            builder2.save()
354        builder2.build(
355            subject_alt_names=[],
356            validity_days=3,
357        )
358
359        name3 = generate_id()
360        builder3 = CertificateBuilder(name3)
361        with self.assertRaises(ValueError):
362            builder3.save()
363        builder3.build(
364            subject_alt_names=[],
365            validity_days=3,
366        )
367
368        with TemporaryDirectory() as temp_dir:
369            with open(f"{temp_dir}/foo.pem", "w+", encoding="utf-8") as _cert:
370                _cert.write(builder.certificate)
371            with open(f"{temp_dir}/foo.key", "w+", encoding="utf-8") as _key:
372                _key.write(builder.private_key)
373            makedirs(f"{temp_dir}/foo.bar", exist_ok=True)
374            with open(f"{temp_dir}/foo.bar/fullchain.pem", "w+", encoding="utf-8") as _cert:
375                _cert.write(builder2.certificate)
376            with open(f"{temp_dir}/foo.bar/privkey.pem", "w+", encoding="utf-8") as _key:
377                _key.write(builder2.private_key)
378            with open(f"{temp_dir}/tls-combined.pem", "w+", encoding="utf-8") as _cert:
379                _cert.write(builder3.certificate)
380            with CONFIG.patch("cert_discovery_dir", temp_dir):
381                certificate_discovery.send()
382        keypair: CertificateKeyPair = CertificateKeyPair.objects.filter(
383            managed=MANAGED_DISCOVERED % "foo"
384        ).first()
385        self.assertIsNotNone(keypair)
386        self.assertIsNotNone(keypair.certificate)
387        self.assertIsNotNone(keypair.private_key)
388        self.assertTrue(
389            CertificateKeyPair.objects.filter(managed=MANAGED_DISCOVERED % "foo.bar").exists()
390        )
391        self.assertFalse(
392            CertificateKeyPair.objects.filter(managed=MANAGED_DISCOVERED % "tls-combined").exists()
393        )
394
395    def test_discovery_updating_same_private_key(self):
396        """Test certificate discovery updating certs with matching private keys"""
397        name = generate_id()
398        builder = CertificateBuilder(name)
399        builder.build(
400            subject_alt_names=[],
401            validity_days=3,
402        )
403
404        with TemporaryDirectory() as temp_dir:
405            # First discovery: write cert as "original"
406            with open(f"{temp_dir}/original.pem", "w+", encoding="utf-8") as _cert:
407                _cert.write(builder.certificate)
408            with open(f"{temp_dir}/original.key", "w+", encoding="utf-8") as _key:
409                _key.write(builder.private_key)
410
411            with CONFIG.patch("cert_discovery_dir", temp_dir):
412                certificate_discovery.send()
413
414            # Verify "original" cert was created
415            original = CertificateKeyPair.objects.filter(
416                managed=MANAGED_DISCOVERED % "original"
417            ).first()
418            self.assertIsNotNone(original)
419            self.assertEqual(original.name, "original")
420            self.assertIsNotNone(original.private_key)
421
422            # Second discovery: write same cert/key as "renamed"
423            Path(f"{temp_dir}/original.pem").unlink()
424            Path(f"{temp_dir}/original.key").unlink()
425
426            with open(f"{temp_dir}/renamed.pem", "w+", encoding="utf-8") as _cert:
427                _cert.write(builder.certificate)
428            with open(f"{temp_dir}/renamed.key", "w+", encoding="utf-8") as _key:
429                _key.write(builder.private_key)
430
431            with CONFIG.patch("cert_discovery_dir", temp_dir):
432                certificate_discovery.send()
433
434            # Verify the cert was updated
435            renamed = CertificateKeyPair.objects.filter(
436                managed=MANAGED_DISCOVERED % "renamed"
437            ).first()
438            self.assertIsNotNone(renamed, "Renamed certificate should exist")
439            self.assertEqual(renamed.name, "renamed")
440            self.assertEqual(renamed.pk, original.pk, "Should be same database object")
441
442            # Verify no new cert was created
443            final_count = CertificateKeyPair.objects.filter(
444                managed__startswith="goauthentik.io/crypto/discovered/"
445            ).count()
446            self.assertEqual(
447                1, final_count, "Should not create duplicate cert for same private key"
448            )
449
450    def test_metadata_extraction_with_cert_and_key(self):
451        """Test that metadata is extracted when creating keypair with certificate and key"""
452        cert = create_test_cert()
453
454        # Verify all metadata fields are populated
455        self.assertIsNotNone(cert.key_type)
456        self.assertIsNotNone(cert.cert_expiry)
457        self.assertIsNotNone(cert.cert_subject)
458        self.assertIsNotNone(cert.fingerprint_sha256)
459        self.assertIsNotNone(cert.fingerprint_sha1)
460
461        # Verify kid is generated using SHA512 for new records
462        self.assertIsNotNone(cert.kid)
463        self.assertEqual(cert.kid, generate_key_id(cert.key_data))
464
465    def test_metadata_extraction_without_key(self):
466        """Test that metadata is extracted when creating keypair without private key"""
467        builder = CertificateBuilder(generate_id())
468        builder.build(subject_alt_names=[], validity_days=3)
469
470        # Create keypair with only certificate, no key
471        cert = CertificateKeyPair.objects.create(
472            name=generate_id(),
473            certificate_data=builder.certificate,
474            key_data="",
475        )
476
477        # Verify certificate metadata fields are populated
478        self.assertIsNotNone(cert.key_type)
479        self.assertIsNotNone(cert.cert_expiry)
480        self.assertIsNotNone(cert.cert_subject)
481        self.assertIsNotNone(cert.fingerprint_sha256)
482        self.assertIsNotNone(cert.fingerprint_sha1)
483
484        # Verify kid is empty when no key_data
485        self.assertEqual(cert.kid, None)
486
487    def test_metadata_extraction_invalid_cert(self):
488        """Test that invalid certificate data doesn't crash, just skips metadata"""
489        cert = CertificateKeyPair.objects.create(
490            name=generate_id(),
491            certificate_data="invalid certificate data",
492            key_data="",
493        )
494
495        # Verify metadata fields are None for invalid cert
496        self.assertIsNone(cert.key_type)
497        self.assertIsNone(cert.cert_expiry)
498        self.assertIsNone(cert.cert_subject)
499        self.assertIsNone(cert.fingerprint_sha256)
500        self.assertIsNone(cert.fingerprint_sha1)
501        self.assertIsNone(cert.kid)
502
503    def test_kid_legacy_preservation(self):
504        """Test that legacy MD5 kid is preserved when key_data hasn't changed"""
505        cert = create_test_cert()
506
507        # Simulate a legacy MD5 kid (as if backfilled from old system)
508        legacy_kid = generate_key_id_legacy(cert.key_data)
509        CertificateKeyPair.objects.filter(pk=cert.pk).update(kid=legacy_kid)
510        cert.refresh_from_db()
511        self.assertEqual(cert.kid, legacy_kid)
512
513        # Save the cert again (e.g., name change) - kid should be preserved
514        cert.name = generate_id()
515        cert.save()
516        cert.refresh_from_db()
517
518        self.assertEqual(cert.kid, legacy_kid)
519
520    def test_kid_regenerated_on_key_change(self):
521        """Test that kid is regenerated when key_data changes"""
522        cert = create_test_cert()
523        original_kid = cert.kid
524
525        # Generate a new key and update the keypair
526        builder = CertificateBuilder(generate_id())
527        builder.build(subject_alt_names=[], validity_days=3)
528
529        cert.key_data = builder.private_key
530        cert.certificate_data = builder.certificate
531        cert.save()
532        cert.refresh_from_db()
533
534        # Kid should be regenerated for the new key
535        self.assertNotEqual(cert.kid, original_kid)
536        self.assertEqual(cert.kid, generate_key_id(cert.key_data))
537
538    def test_kid_regenerated_on_key_change_from_legacy(self):
539        """Test that kid is regenerated from legacy MD5 when key_data changes"""
540        cert = create_test_cert()
541
542        # Simulate a legacy MD5 kid
543        legacy_kid = generate_key_id_legacy(cert.key_data)
544        CertificateKeyPair.objects.filter(pk=cert.pk).update(kid=legacy_kid)
545        cert.refresh_from_db()
546        self.assertEqual(cert.kid, legacy_kid)
547
548        # Generate a new key and update the keypair
549        builder = CertificateBuilder(generate_id())
550        builder.build(subject_alt_names=[], validity_days=3)
551
552        cert.key_data = builder.private_key
553        cert.certificate_data = builder.certificate
554        cert.save()
555        cert.refresh_from_db()
556
557        # Kid should now be SHA512 for the new key
558        self.assertNotEqual(cert.kid, legacy_kid)
559        self.assertEqual(cert.kid, generate_key_id(cert.key_data))
class TestCrypto(rest_framework.test.APITestCase):
 31class TestCrypto(APITestCase):
 32    """Test Crypto validation"""
 33
 34    def test_model_private(self):
 35        """Test model private key"""
 36        cert = CertificateKeyPair.objects.create(
 37            name=generate_id(),
 38            certificate_data="foo",
 39            key_data="foo",
 40        )
 41        self.assertIsNone(cert.private_key)
 42
 43    def test_serializer(self):
 44        """Test API Validation"""
 45        keypair = create_test_cert()
 46        self.assertTrue(
 47            CertificateKeyPairSerializer(
 48                instance=keypair,
 49                data={
 50                    "name": keypair.name,
 51                    "certificate_data": keypair.certificate_data,
 52                    "key_data": keypair.key_data,
 53                },
 54            ).is_valid()
 55        )
 56        self.assertFalse(
 57            CertificateKeyPairSerializer(
 58                instance=keypair,
 59                data={
 60                    "name": keypair.name,
 61                    "certificate_data": "test",
 62                    "key_data": "test",
 63                },
 64            ).is_valid()
 65        )
 66
 67    def test_builder(self):
 68        """Test Builder"""
 69        name = generate_id()
 70        builder = CertificateBuilder(name)
 71        with self.assertRaises(ValueError):
 72            builder.save()
 73        builder.build(
 74            subject_alt_names=[],
 75            validity_days=3,
 76        )
 77        instance = builder.save()
 78        _now = now()
 79        self.assertEqual(instance.name, name)
 80        self.assertEqual((instance.certificate.not_valid_after_utc - _now).days, 2)
 81
 82    def test_builder_api(self):
 83        """Test Builder (via API)"""
 84        self.client.force_login(create_test_admin_user())
 85        name = generate_id()
 86        self.client.post(
 87            reverse("authentik_api:certificatekeypair-generate"),
 88            data={"common_name": name, "subject_alt_name": "bar,baz", "validity_days": 3},
 89        )
 90        key = CertificateKeyPair.objects.filter(name=name).first()
 91        self.assertIsNotNone(key)
 92        ext: SubjectAlternativeName = key.certificate.extensions[0].value
 93        self.assertIsInstance(ext, SubjectAlternativeName)
 94        self.assertIsInstance(ext[0], DNSName)
 95        self.assertEqual(ext[0].value, "bar")
 96        self.assertIsInstance(ext[1], DNSName)
 97        self.assertEqual(ext[1].value, "baz")
 98
 99    def test_builder_api_duplicate(self):
100        """Test Builder (via API)"""
101        cert = create_test_cert()
102        self.client.force_login(create_test_admin_user())
103        res = self.client.post(
104            reverse("authentik_api:certificatekeypair-generate"),
105            data={"common_name": cert.name, "subject_alt_name": "bar,baz", "validity_days": 3},
106        )
107        self.assertEqual(res.status_code, 400)
108        self.assertJSONEqual(res.content, {"common_name": ["This field must be unique."]})
109
110    def test_builder_api_empty_san(self):
111        """Test Builder (via API)"""
112        self.client.force_login(create_test_admin_user())
113        name = generate_id()
114        self.client.post(
115            reverse("authentik_api:certificatekeypair-generate"),
116            data={"common_name": name, "subject_alt_name": "", "validity_days": 3},
117        )
118        key = CertificateKeyPair.objects.filter(name=name).first()
119        self.assertIsNotNone(key)
120        self.assertEqual(len(key.certificate.extensions), 0)
121
122    def test_builder_api_empty_san_multiple(self):
123        """Test Builder (via API)"""
124        self.client.force_login(create_test_admin_user())
125        name = generate_id()
126        self.client.post(
127            reverse("authentik_api:certificatekeypair-generate"),
128            data={"common_name": name, "subject_alt_name": ", ", "validity_days": 3},
129        )
130        key = CertificateKeyPair.objects.filter(name=name).first()
131        self.assertIsNotNone(key)
132        self.assertEqual(len(key.certificate.extensions), 0)
133
134    def test_builder_api_invalid(self):
135        """Test Builder (via API) (invalid)"""
136        self.client.force_login(create_test_admin_user())
137        response = self.client.post(
138            reverse("authentik_api:certificatekeypair-generate"),
139            data={},
140        )
141        self.assertEqual(response.status_code, 400)
142
143    def test_list(self):
144        """Test API List"""
145        cert = create_test_cert()
146        self.client.force_login(create_test_admin_user())
147        response = self.client.get(
148            reverse(
149                "authentik_api:certificatekeypair-list",
150            ),
151            data={"name": cert.name},
152        )
153        self.assertEqual(response.status_code, 200)
154        body = loads(response.content.decode())
155        api_cert = [x for x in body["results"] if x["name"] == cert.name][0]
156        self.assertEqual(api_cert["fingerprint_sha1"], cert.fingerprint_sha1)
157        self.assertEqual(api_cert["fingerprint_sha256"], cert.fingerprint_sha256)
158
159    def test_list_has_key_false(self):
160        """Test API List with has_key set to false"""
161        cert = create_test_cert()
162        cert.key_data = ""
163        cert.save()
164        self.client.force_login(create_test_admin_user())
165        response = self.client.get(
166            reverse(
167                "authentik_api:certificatekeypair-list",
168            ),
169            data={"name": cert.name, "has_key": False},
170        )
171        self.assertEqual(response.status_code, 200)
172        body = loads(response.content.decode())
173        api_cert = [x for x in body["results"] if x["name"] == cert.name][0]
174        self.assertEqual(api_cert["fingerprint_sha1"], cert.fingerprint_sha1)
175        self.assertEqual(api_cert["fingerprint_sha256"], cert.fingerprint_sha256)
176
177    def test_list_always_includes_details(self):
178        """Test API List always includes certificate details"""
179        cert = create_test_cert()
180        self.client.force_login(create_test_admin_user())
181        response = self.client.get(
182            reverse(
183                "authentik_api:certificatekeypair-list",
184            ),
185            data={"name": cert.name},
186        )
187        self.assertEqual(response.status_code, 200)
188        body = loads(response.content.decode())
189        api_cert = [x for x in body["results"] if x["name"] == cert.name][0]
190        # All details should now always be included
191        self.assertEqual(api_cert["fingerprint_sha1"], cert.fingerprint_sha1)
192        self.assertEqual(api_cert["fingerprint_sha256"], cert.fingerprint_sha256)
193        self.assertIsNotNone(api_cert["cert_expiry"])
194        self.assertIsNotNone(api_cert["cert_subject"])
195
196    def test_certificate_download(self):
197        """Test certificate export (download)"""
198        keypair = create_test_cert()
199        user = create_test_user()
200        user.assign_perms_to_managed_role("authentik_crypto.view_certificatekeypair", keypair)
201        user.assign_perms_to_managed_role(
202            "authentik_crypto.view_certificatekeypair_certificate", keypair
203        )
204        self.client.force_login(user)
205        response = self.client.get(
206            reverse(
207                "authentik_api:certificatekeypair-view-certificate",
208                kwargs={"pk": keypair.pk},
209            )
210        )
211        self.assertEqual(response.status_code, 200)
212        response = self.client.get(
213            reverse(
214                "authentik_api:certificatekeypair-view-certificate",
215                kwargs={"pk": keypair.pk},
216            ),
217            data={"download": True},
218        )
219        self.assertEqual(response.status_code, 200)
220        self.assertIn("Content-Disposition", response)
221
222    def test_private_key_download(self):
223        """Test private_key export (download)"""
224        keypair = create_test_cert()
225        user = create_test_user()
226        user.assign_perms_to_managed_role("authentik_crypto.view_certificatekeypair", keypair)
227        user.assign_perms_to_managed_role("authentik_crypto.view_certificatekeypair_key", keypair)
228        self.client.force_login(user)
229        response = self.client.get(
230            reverse(
231                "authentik_api:certificatekeypair-view-private-key",
232                kwargs={"pk": keypair.pk},
233            )
234        )
235        self.assertEqual(response.status_code, 200)
236        response = self.client.get(
237            reverse(
238                "authentik_api:certificatekeypair-view-private-key",
239                kwargs={"pk": keypair.pk},
240            ),
241            data={"download": True},
242        )
243        self.assertEqual(response.status_code, 200)
244        self.assertIn("Content-Disposition", response)
245
246    def test_certificate_download_denied(self):
247        """Test certificate export (download)"""
248        self.client.force_login(create_test_user())
249        keypair = create_test_cert()
250        response = self.client.get(
251            reverse(
252                "authentik_api:certificatekeypair-view-certificate",
253                kwargs={"pk": keypair.pk},
254            )
255        )
256        self.assertEqual(403, response.status_code)
257        response = self.client.get(
258            reverse(
259                "authentik_api:certificatekeypair-view-certificate",
260                kwargs={"pk": keypair.pk},
261            ),
262            data={"download": True},
263        )
264        self.assertEqual(403, response.status_code)
265
266    def test_private_key_download_denied(self):
267        """Test private_key export (download)"""
268        self.client.force_login(create_test_user())
269        keypair = create_test_cert()
270        response = self.client.get(
271            reverse(
272                "authentik_api:certificatekeypair-view-private-key",
273                kwargs={"pk": keypair.pk},
274            )
275        )
276        self.assertEqual(403, response.status_code)
277        response = self.client.get(
278            reverse(
279                "authentik_api:certificatekeypair-view-private-key",
280                kwargs={"pk": keypair.pk},
281            ),
282            data={"download": True},
283        )
284        self.assertEqual(403, response.status_code)
285
286    def test_used_by(self):
287        """Test used_by endpoint"""
288        self.client.force_login(create_test_admin_user())
289        keypair = create_test_cert()
290        provider = OAuth2Provider.objects.create(
291            name=generate_id(),
292            client_id=generate_id(),
293            client_secret=generate_key(),
294            authorization_flow=create_test_flow(),
295            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")],
296            signing_key=keypair,
297        )
298        response = self.client.get(
299            reverse(
300                "authentik_api:certificatekeypair-used-by",
301                kwargs={"pk": keypair.pk},
302            )
303        )
304        self.assertEqual(response.status_code, 200)
305        self.assertJSONEqual(
306            response.content.decode(),
307            [
308                {
309                    "app": "authentik_providers_oauth2",
310                    "model_name": "oauth2provider",
311                    "pk": str(provider.pk),
312                    "name": str(provider),
313                    "action": DeleteAction.SET_NULL.value,
314                }
315            ],
316        )
317
318    def test_used_by_denied(self):
319        """Test used_by endpoint"""
320        self.client.logout()
321        keypair = create_test_cert()
322        OAuth2Provider.objects.create(
323            name=generate_id(),
324            client_id=generate_id(),
325            client_secret=generate_key(),
326            authorization_flow=create_test_flow(),
327            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")],
328            signing_key=keypair,
329        )
330        response = self.client.get(
331            reverse(
332                "authentik_api:certificatekeypair-used-by",
333                kwargs={"pk": keypair.pk},
334            )
335        )
336        self.assertEqual(403, response.status_code)
337
338    def test_discovery(self):
339        """Test certificate discovery"""
340        # This test generates 2 separate cert/key combinations
341        # and verifies they both import properly
342        name = generate_id()
343        builder = CertificateBuilder(name)
344        with self.assertRaises(ValueError):
345            builder.save()
346        builder.build(
347            subject_alt_names=[],
348            validity_days=3,
349        )
350
351        name2 = generate_id()
352        builder2 = CertificateBuilder(name2)
353        with self.assertRaises(ValueError):
354            builder2.save()
355        builder2.build(
356            subject_alt_names=[],
357            validity_days=3,
358        )
359
360        name3 = generate_id()
361        builder3 = CertificateBuilder(name3)
362        with self.assertRaises(ValueError):
363            builder3.save()
364        builder3.build(
365            subject_alt_names=[],
366            validity_days=3,
367        )
368
369        with TemporaryDirectory() as temp_dir:
370            with open(f"{temp_dir}/foo.pem", "w+", encoding="utf-8") as _cert:
371                _cert.write(builder.certificate)
372            with open(f"{temp_dir}/foo.key", "w+", encoding="utf-8") as _key:
373                _key.write(builder.private_key)
374            makedirs(f"{temp_dir}/foo.bar", exist_ok=True)
375            with open(f"{temp_dir}/foo.bar/fullchain.pem", "w+", encoding="utf-8") as _cert:
376                _cert.write(builder2.certificate)
377            with open(f"{temp_dir}/foo.bar/privkey.pem", "w+", encoding="utf-8") as _key:
378                _key.write(builder2.private_key)
379            with open(f"{temp_dir}/tls-combined.pem", "w+", encoding="utf-8") as _cert:
380                _cert.write(builder3.certificate)
381            with CONFIG.patch("cert_discovery_dir", temp_dir):
382                certificate_discovery.send()
383        keypair: CertificateKeyPair = CertificateKeyPair.objects.filter(
384            managed=MANAGED_DISCOVERED % "foo"
385        ).first()
386        self.assertIsNotNone(keypair)
387        self.assertIsNotNone(keypair.certificate)
388        self.assertIsNotNone(keypair.private_key)
389        self.assertTrue(
390            CertificateKeyPair.objects.filter(managed=MANAGED_DISCOVERED % "foo.bar").exists()
391        )
392        self.assertFalse(
393            CertificateKeyPair.objects.filter(managed=MANAGED_DISCOVERED % "tls-combined").exists()
394        )
395
396    def test_discovery_updating_same_private_key(self):
397        """Test certificate discovery updating certs with matching private keys"""
398        name = generate_id()
399        builder = CertificateBuilder(name)
400        builder.build(
401            subject_alt_names=[],
402            validity_days=3,
403        )
404
405        with TemporaryDirectory() as temp_dir:
406            # First discovery: write cert as "original"
407            with open(f"{temp_dir}/original.pem", "w+", encoding="utf-8") as _cert:
408                _cert.write(builder.certificate)
409            with open(f"{temp_dir}/original.key", "w+", encoding="utf-8") as _key:
410                _key.write(builder.private_key)
411
412            with CONFIG.patch("cert_discovery_dir", temp_dir):
413                certificate_discovery.send()
414
415            # Verify "original" cert was created
416            original = CertificateKeyPair.objects.filter(
417                managed=MANAGED_DISCOVERED % "original"
418            ).first()
419            self.assertIsNotNone(original)
420            self.assertEqual(original.name, "original")
421            self.assertIsNotNone(original.private_key)
422
423            # Second discovery: write same cert/key as "renamed"
424            Path(f"{temp_dir}/original.pem").unlink()
425            Path(f"{temp_dir}/original.key").unlink()
426
427            with open(f"{temp_dir}/renamed.pem", "w+", encoding="utf-8") as _cert:
428                _cert.write(builder.certificate)
429            with open(f"{temp_dir}/renamed.key", "w+", encoding="utf-8") as _key:
430                _key.write(builder.private_key)
431
432            with CONFIG.patch("cert_discovery_dir", temp_dir):
433                certificate_discovery.send()
434
435            # Verify the cert was updated
436            renamed = CertificateKeyPair.objects.filter(
437                managed=MANAGED_DISCOVERED % "renamed"
438            ).first()
439            self.assertIsNotNone(renamed, "Renamed certificate should exist")
440            self.assertEqual(renamed.name, "renamed")
441            self.assertEqual(renamed.pk, original.pk, "Should be same database object")
442
443            # Verify no new cert was created
444            final_count = CertificateKeyPair.objects.filter(
445                managed__startswith="goauthentik.io/crypto/discovered/"
446            ).count()
447            self.assertEqual(
448                1, final_count, "Should not create duplicate cert for same private key"
449            )
450
451    def test_metadata_extraction_with_cert_and_key(self):
452        """Test that metadata is extracted when creating keypair with certificate and key"""
453        cert = create_test_cert()
454
455        # Verify all metadata fields are populated
456        self.assertIsNotNone(cert.key_type)
457        self.assertIsNotNone(cert.cert_expiry)
458        self.assertIsNotNone(cert.cert_subject)
459        self.assertIsNotNone(cert.fingerprint_sha256)
460        self.assertIsNotNone(cert.fingerprint_sha1)
461
462        # Verify kid is generated using SHA512 for new records
463        self.assertIsNotNone(cert.kid)
464        self.assertEqual(cert.kid, generate_key_id(cert.key_data))
465
466    def test_metadata_extraction_without_key(self):
467        """Test that metadata is extracted when creating keypair without private key"""
468        builder = CertificateBuilder(generate_id())
469        builder.build(subject_alt_names=[], validity_days=3)
470
471        # Create keypair with only certificate, no key
472        cert = CertificateKeyPair.objects.create(
473            name=generate_id(),
474            certificate_data=builder.certificate,
475            key_data="",
476        )
477
478        # Verify certificate metadata fields are populated
479        self.assertIsNotNone(cert.key_type)
480        self.assertIsNotNone(cert.cert_expiry)
481        self.assertIsNotNone(cert.cert_subject)
482        self.assertIsNotNone(cert.fingerprint_sha256)
483        self.assertIsNotNone(cert.fingerprint_sha1)
484
485        # Verify kid is empty when no key_data
486        self.assertEqual(cert.kid, None)
487
488    def test_metadata_extraction_invalid_cert(self):
489        """Test that invalid certificate data doesn't crash, just skips metadata"""
490        cert = CertificateKeyPair.objects.create(
491            name=generate_id(),
492            certificate_data="invalid certificate data",
493            key_data="",
494        )
495
496        # Verify metadata fields are None for invalid cert
497        self.assertIsNone(cert.key_type)
498        self.assertIsNone(cert.cert_expiry)
499        self.assertIsNone(cert.cert_subject)
500        self.assertIsNone(cert.fingerprint_sha256)
501        self.assertIsNone(cert.fingerprint_sha1)
502        self.assertIsNone(cert.kid)
503
504    def test_kid_legacy_preservation(self):
505        """Test that legacy MD5 kid is preserved when key_data hasn't changed"""
506        cert = create_test_cert()
507
508        # Simulate a legacy MD5 kid (as if backfilled from old system)
509        legacy_kid = generate_key_id_legacy(cert.key_data)
510        CertificateKeyPair.objects.filter(pk=cert.pk).update(kid=legacy_kid)
511        cert.refresh_from_db()
512        self.assertEqual(cert.kid, legacy_kid)
513
514        # Save the cert again (e.g., name change) - kid should be preserved
515        cert.name = generate_id()
516        cert.save()
517        cert.refresh_from_db()
518
519        self.assertEqual(cert.kid, legacy_kid)
520
521    def test_kid_regenerated_on_key_change(self):
522        """Test that kid is regenerated when key_data changes"""
523        cert = create_test_cert()
524        original_kid = cert.kid
525
526        # Generate a new key and update the keypair
527        builder = CertificateBuilder(generate_id())
528        builder.build(subject_alt_names=[], validity_days=3)
529
530        cert.key_data = builder.private_key
531        cert.certificate_data = builder.certificate
532        cert.save()
533        cert.refresh_from_db()
534
535        # Kid should be regenerated for the new key
536        self.assertNotEqual(cert.kid, original_kid)
537        self.assertEqual(cert.kid, generate_key_id(cert.key_data))
538
539    def test_kid_regenerated_on_key_change_from_legacy(self):
540        """Test that kid is regenerated from legacy MD5 when key_data changes"""
541        cert = create_test_cert()
542
543        # Simulate a legacy MD5 kid
544        legacy_kid = generate_key_id_legacy(cert.key_data)
545        CertificateKeyPair.objects.filter(pk=cert.pk).update(kid=legacy_kid)
546        cert.refresh_from_db()
547        self.assertEqual(cert.kid, legacy_kid)
548
549        # Generate a new key and update the keypair
550        builder = CertificateBuilder(generate_id())
551        builder.build(subject_alt_names=[], validity_days=3)
552
553        cert.key_data = builder.private_key
554        cert.certificate_data = builder.certificate
555        cert.save()
556        cert.refresh_from_db()
557
558        # Kid should now be SHA512 for the new key
559        self.assertNotEqual(cert.kid, legacy_kid)
560        self.assertEqual(cert.kid, generate_key_id(cert.key_data))

Test Crypto validation

def test_model_private(self):
34    def test_model_private(self):
35        """Test model private key"""
36        cert = CertificateKeyPair.objects.create(
37            name=generate_id(),
38            certificate_data="foo",
39            key_data="foo",
40        )
41        self.assertIsNone(cert.private_key)

Test model private key

def test_serializer(self):
43    def test_serializer(self):
44        """Test API Validation"""
45        keypair = create_test_cert()
46        self.assertTrue(
47            CertificateKeyPairSerializer(
48                instance=keypair,
49                data={
50                    "name": keypair.name,
51                    "certificate_data": keypair.certificate_data,
52                    "key_data": keypair.key_data,
53                },
54            ).is_valid()
55        )
56        self.assertFalse(
57            CertificateKeyPairSerializer(
58                instance=keypair,
59                data={
60                    "name": keypair.name,
61                    "certificate_data": "test",
62                    "key_data": "test",
63                },
64            ).is_valid()
65        )

Test API Validation

def test_builder(self):
67    def test_builder(self):
68        """Test Builder"""
69        name = generate_id()
70        builder = CertificateBuilder(name)
71        with self.assertRaises(ValueError):
72            builder.save()
73        builder.build(
74            subject_alt_names=[],
75            validity_days=3,
76        )
77        instance = builder.save()
78        _now = now()
79        self.assertEqual(instance.name, name)
80        self.assertEqual((instance.certificate.not_valid_after_utc - _now).days, 2)

Test Builder

def test_builder_api(self):
82    def test_builder_api(self):
83        """Test Builder (via API)"""
84        self.client.force_login(create_test_admin_user())
85        name = generate_id()
86        self.client.post(
87            reverse("authentik_api:certificatekeypair-generate"),
88            data={"common_name": name, "subject_alt_name": "bar,baz", "validity_days": 3},
89        )
90        key = CertificateKeyPair.objects.filter(name=name).first()
91        self.assertIsNotNone(key)
92        ext: SubjectAlternativeName = key.certificate.extensions[0].value
93        self.assertIsInstance(ext, SubjectAlternativeName)
94        self.assertIsInstance(ext[0], DNSName)
95        self.assertEqual(ext[0].value, "bar")
96        self.assertIsInstance(ext[1], DNSName)
97        self.assertEqual(ext[1].value, "baz")

Test Builder (via API)

def test_builder_api_duplicate(self):
 99    def test_builder_api_duplicate(self):
100        """Test Builder (via API)"""
101        cert = create_test_cert()
102        self.client.force_login(create_test_admin_user())
103        res = self.client.post(
104            reverse("authentik_api:certificatekeypair-generate"),
105            data={"common_name": cert.name, "subject_alt_name": "bar,baz", "validity_days": 3},
106        )
107        self.assertEqual(res.status_code, 400)
108        self.assertJSONEqual(res.content, {"common_name": ["This field must be unique."]})

Test Builder (via API)

def test_builder_api_empty_san(self):
110    def test_builder_api_empty_san(self):
111        """Test Builder (via API)"""
112        self.client.force_login(create_test_admin_user())
113        name = generate_id()
114        self.client.post(
115            reverse("authentik_api:certificatekeypair-generate"),
116            data={"common_name": name, "subject_alt_name": "", "validity_days": 3},
117        )
118        key = CertificateKeyPair.objects.filter(name=name).first()
119        self.assertIsNotNone(key)
120        self.assertEqual(len(key.certificate.extensions), 0)

Test Builder (via API)

def test_builder_api_empty_san_multiple(self):
122    def test_builder_api_empty_san_multiple(self):
123        """Test Builder (via API)"""
124        self.client.force_login(create_test_admin_user())
125        name = generate_id()
126        self.client.post(
127            reverse("authentik_api:certificatekeypair-generate"),
128            data={"common_name": name, "subject_alt_name": ", ", "validity_days": 3},
129        )
130        key = CertificateKeyPair.objects.filter(name=name).first()
131        self.assertIsNotNone(key)
132        self.assertEqual(len(key.certificate.extensions), 0)

Test Builder (via API)

def test_builder_api_invalid(self):
134    def test_builder_api_invalid(self):
135        """Test Builder (via API) (invalid)"""
136        self.client.force_login(create_test_admin_user())
137        response = self.client.post(
138            reverse("authentik_api:certificatekeypair-generate"),
139            data={},
140        )
141        self.assertEqual(response.status_code, 400)

Test Builder (via API) (invalid)

def test_list(self):
143    def test_list(self):
144        """Test API List"""
145        cert = create_test_cert()
146        self.client.force_login(create_test_admin_user())
147        response = self.client.get(
148            reverse(
149                "authentik_api:certificatekeypair-list",
150            ),
151            data={"name": cert.name},
152        )
153        self.assertEqual(response.status_code, 200)
154        body = loads(response.content.decode())
155        api_cert = [x for x in body["results"] if x["name"] == cert.name][0]
156        self.assertEqual(api_cert["fingerprint_sha1"], cert.fingerprint_sha1)
157        self.assertEqual(api_cert["fingerprint_sha256"], cert.fingerprint_sha256)

Test API List

def test_list_has_key_false(self):
159    def test_list_has_key_false(self):
160        """Test API List with has_key set to false"""
161        cert = create_test_cert()
162        cert.key_data = ""
163        cert.save()
164        self.client.force_login(create_test_admin_user())
165        response = self.client.get(
166            reverse(
167                "authentik_api:certificatekeypair-list",
168            ),
169            data={"name": cert.name, "has_key": False},
170        )
171        self.assertEqual(response.status_code, 200)
172        body = loads(response.content.decode())
173        api_cert = [x for x in body["results"] if x["name"] == cert.name][0]
174        self.assertEqual(api_cert["fingerprint_sha1"], cert.fingerprint_sha1)
175        self.assertEqual(api_cert["fingerprint_sha256"], cert.fingerprint_sha256)

Test API List with has_key set to false

def test_list_always_includes_details(self):
177    def test_list_always_includes_details(self):
178        """Test API List always includes certificate details"""
179        cert = create_test_cert()
180        self.client.force_login(create_test_admin_user())
181        response = self.client.get(
182            reverse(
183                "authentik_api:certificatekeypair-list",
184            ),
185            data={"name": cert.name},
186        )
187        self.assertEqual(response.status_code, 200)
188        body = loads(response.content.decode())
189        api_cert = [x for x in body["results"] if x["name"] == cert.name][0]
190        # All details should now always be included
191        self.assertEqual(api_cert["fingerprint_sha1"], cert.fingerprint_sha1)
192        self.assertEqual(api_cert["fingerprint_sha256"], cert.fingerprint_sha256)
193        self.assertIsNotNone(api_cert["cert_expiry"])
194        self.assertIsNotNone(api_cert["cert_subject"])

Test API List always includes certificate details

def test_certificate_download(self):
196    def test_certificate_download(self):
197        """Test certificate export (download)"""
198        keypair = create_test_cert()
199        user = create_test_user()
200        user.assign_perms_to_managed_role("authentik_crypto.view_certificatekeypair", keypair)
201        user.assign_perms_to_managed_role(
202            "authentik_crypto.view_certificatekeypair_certificate", keypair
203        )
204        self.client.force_login(user)
205        response = self.client.get(
206            reverse(
207                "authentik_api:certificatekeypair-view-certificate",
208                kwargs={"pk": keypair.pk},
209            )
210        )
211        self.assertEqual(response.status_code, 200)
212        response = self.client.get(
213            reverse(
214                "authentik_api:certificatekeypair-view-certificate",
215                kwargs={"pk": keypair.pk},
216            ),
217            data={"download": True},
218        )
219        self.assertEqual(response.status_code, 200)
220        self.assertIn("Content-Disposition", response)

Test certificate export (download)

def test_private_key_download(self):
222    def test_private_key_download(self):
223        """Test private_key export (download)"""
224        keypair = create_test_cert()
225        user = create_test_user()
226        user.assign_perms_to_managed_role("authentik_crypto.view_certificatekeypair", keypair)
227        user.assign_perms_to_managed_role("authentik_crypto.view_certificatekeypair_key", keypair)
228        self.client.force_login(user)
229        response = self.client.get(
230            reverse(
231                "authentik_api:certificatekeypair-view-private-key",
232                kwargs={"pk": keypair.pk},
233            )
234        )
235        self.assertEqual(response.status_code, 200)
236        response = self.client.get(
237            reverse(
238                "authentik_api:certificatekeypair-view-private-key",
239                kwargs={"pk": keypair.pk},
240            ),
241            data={"download": True},
242        )
243        self.assertEqual(response.status_code, 200)
244        self.assertIn("Content-Disposition", response)

Test private_key export (download)

def test_certificate_download_denied(self):
246    def test_certificate_download_denied(self):
247        """Test certificate export (download)"""
248        self.client.force_login(create_test_user())
249        keypair = create_test_cert()
250        response = self.client.get(
251            reverse(
252                "authentik_api:certificatekeypair-view-certificate",
253                kwargs={"pk": keypair.pk},
254            )
255        )
256        self.assertEqual(403, response.status_code)
257        response = self.client.get(
258            reverse(
259                "authentik_api:certificatekeypair-view-certificate",
260                kwargs={"pk": keypair.pk},
261            ),
262            data={"download": True},
263        )
264        self.assertEqual(403, response.status_code)

Test certificate export (download)

def test_private_key_download_denied(self):
266    def test_private_key_download_denied(self):
267        """Test private_key export (download)"""
268        self.client.force_login(create_test_user())
269        keypair = create_test_cert()
270        response = self.client.get(
271            reverse(
272                "authentik_api:certificatekeypair-view-private-key",
273                kwargs={"pk": keypair.pk},
274            )
275        )
276        self.assertEqual(403, response.status_code)
277        response = self.client.get(
278            reverse(
279                "authentik_api:certificatekeypair-view-private-key",
280                kwargs={"pk": keypair.pk},
281            ),
282            data={"download": True},
283        )
284        self.assertEqual(403, response.status_code)

Test private_key export (download)

def test_used_by(self):
286    def test_used_by(self):
287        """Test used_by endpoint"""
288        self.client.force_login(create_test_admin_user())
289        keypair = create_test_cert()
290        provider = OAuth2Provider.objects.create(
291            name=generate_id(),
292            client_id=generate_id(),
293            client_secret=generate_key(),
294            authorization_flow=create_test_flow(),
295            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")],
296            signing_key=keypair,
297        )
298        response = self.client.get(
299            reverse(
300                "authentik_api:certificatekeypair-used-by",
301                kwargs={"pk": keypair.pk},
302            )
303        )
304        self.assertEqual(response.status_code, 200)
305        self.assertJSONEqual(
306            response.content.decode(),
307            [
308                {
309                    "app": "authentik_providers_oauth2",
310                    "model_name": "oauth2provider",
311                    "pk": str(provider.pk),
312                    "name": str(provider),
313                    "action": DeleteAction.SET_NULL.value,
314                }
315            ],
316        )

Test used_by endpoint

def test_used_by_denied(self):
318    def test_used_by_denied(self):
319        """Test used_by endpoint"""
320        self.client.logout()
321        keypair = create_test_cert()
322        OAuth2Provider.objects.create(
323            name=generate_id(),
324            client_id=generate_id(),
325            client_secret=generate_key(),
326            authorization_flow=create_test_flow(),
327            redirect_uris=[RedirectURI(RedirectURIMatchingMode.STRICT, "http://localhost")],
328            signing_key=keypair,
329        )
330        response = self.client.get(
331            reverse(
332                "authentik_api:certificatekeypair-used-by",
333                kwargs={"pk": keypair.pk},
334            )
335        )
336        self.assertEqual(403, response.status_code)

Test used_by endpoint

def test_discovery(self):
338    def test_discovery(self):
339        """Test certificate discovery"""
340        # This test generates 2 separate cert/key combinations
341        # and verifies they both import properly
342        name = generate_id()
343        builder = CertificateBuilder(name)
344        with self.assertRaises(ValueError):
345            builder.save()
346        builder.build(
347            subject_alt_names=[],
348            validity_days=3,
349        )
350
351        name2 = generate_id()
352        builder2 = CertificateBuilder(name2)
353        with self.assertRaises(ValueError):
354            builder2.save()
355        builder2.build(
356            subject_alt_names=[],
357            validity_days=3,
358        )
359
360        name3 = generate_id()
361        builder3 = CertificateBuilder(name3)
362        with self.assertRaises(ValueError):
363            builder3.save()
364        builder3.build(
365            subject_alt_names=[],
366            validity_days=3,
367        )
368
369        with TemporaryDirectory() as temp_dir:
370            with open(f"{temp_dir}/foo.pem", "w+", encoding="utf-8") as _cert:
371                _cert.write(builder.certificate)
372            with open(f"{temp_dir}/foo.key", "w+", encoding="utf-8") as _key:
373                _key.write(builder.private_key)
374            makedirs(f"{temp_dir}/foo.bar", exist_ok=True)
375            with open(f"{temp_dir}/foo.bar/fullchain.pem", "w+", encoding="utf-8") as _cert:
376                _cert.write(builder2.certificate)
377            with open(f"{temp_dir}/foo.bar/privkey.pem", "w+", encoding="utf-8") as _key:
378                _key.write(builder2.private_key)
379            with open(f"{temp_dir}/tls-combined.pem", "w+", encoding="utf-8") as _cert:
380                _cert.write(builder3.certificate)
381            with CONFIG.patch("cert_discovery_dir", temp_dir):
382                certificate_discovery.send()
383        keypair: CertificateKeyPair = CertificateKeyPair.objects.filter(
384            managed=MANAGED_DISCOVERED % "foo"
385        ).first()
386        self.assertIsNotNone(keypair)
387        self.assertIsNotNone(keypair.certificate)
388        self.assertIsNotNone(keypair.private_key)
389        self.assertTrue(
390            CertificateKeyPair.objects.filter(managed=MANAGED_DISCOVERED % "foo.bar").exists()
391        )
392        self.assertFalse(
393            CertificateKeyPair.objects.filter(managed=MANAGED_DISCOVERED % "tls-combined").exists()
394        )

Test certificate discovery

def test_discovery_updating_same_private_key(self):
396    def test_discovery_updating_same_private_key(self):
397        """Test certificate discovery updating certs with matching private keys"""
398        name = generate_id()
399        builder = CertificateBuilder(name)
400        builder.build(
401            subject_alt_names=[],
402            validity_days=3,
403        )
404
405        with TemporaryDirectory() as temp_dir:
406            # First discovery: write cert as "original"
407            with open(f"{temp_dir}/original.pem", "w+", encoding="utf-8") as _cert:
408                _cert.write(builder.certificate)
409            with open(f"{temp_dir}/original.key", "w+", encoding="utf-8") as _key:
410                _key.write(builder.private_key)
411
412            with CONFIG.patch("cert_discovery_dir", temp_dir):
413                certificate_discovery.send()
414
415            # Verify "original" cert was created
416            original = CertificateKeyPair.objects.filter(
417                managed=MANAGED_DISCOVERED % "original"
418            ).first()
419            self.assertIsNotNone(original)
420            self.assertEqual(original.name, "original")
421            self.assertIsNotNone(original.private_key)
422
423            # Second discovery: write same cert/key as "renamed"
424            Path(f"{temp_dir}/original.pem").unlink()
425            Path(f"{temp_dir}/original.key").unlink()
426
427            with open(f"{temp_dir}/renamed.pem", "w+", encoding="utf-8") as _cert:
428                _cert.write(builder.certificate)
429            with open(f"{temp_dir}/renamed.key", "w+", encoding="utf-8") as _key:
430                _key.write(builder.private_key)
431
432            with CONFIG.patch("cert_discovery_dir", temp_dir):
433                certificate_discovery.send()
434
435            # Verify the cert was updated
436            renamed = CertificateKeyPair.objects.filter(
437                managed=MANAGED_DISCOVERED % "renamed"
438            ).first()
439            self.assertIsNotNone(renamed, "Renamed certificate should exist")
440            self.assertEqual(renamed.name, "renamed")
441            self.assertEqual(renamed.pk, original.pk, "Should be same database object")
442
443            # Verify no new cert was created
444            final_count = CertificateKeyPair.objects.filter(
445                managed__startswith="goauthentik.io/crypto/discovered/"
446            ).count()
447            self.assertEqual(
448                1, final_count, "Should not create duplicate cert for same private key"
449            )

Test certificate discovery updating certs with matching private keys

def test_metadata_extraction_with_cert_and_key(self):
451    def test_metadata_extraction_with_cert_and_key(self):
452        """Test that metadata is extracted when creating keypair with certificate and key"""
453        cert = create_test_cert()
454
455        # Verify all metadata fields are populated
456        self.assertIsNotNone(cert.key_type)
457        self.assertIsNotNone(cert.cert_expiry)
458        self.assertIsNotNone(cert.cert_subject)
459        self.assertIsNotNone(cert.fingerprint_sha256)
460        self.assertIsNotNone(cert.fingerprint_sha1)
461
462        # Verify kid is generated using SHA512 for new records
463        self.assertIsNotNone(cert.kid)
464        self.assertEqual(cert.kid, generate_key_id(cert.key_data))

Test that metadata is extracted when creating keypair with certificate and key

def test_metadata_extraction_without_key(self):
466    def test_metadata_extraction_without_key(self):
467        """Test that metadata is extracted when creating keypair without private key"""
468        builder = CertificateBuilder(generate_id())
469        builder.build(subject_alt_names=[], validity_days=3)
470
471        # Create keypair with only certificate, no key
472        cert = CertificateKeyPair.objects.create(
473            name=generate_id(),
474            certificate_data=builder.certificate,
475            key_data="",
476        )
477
478        # Verify certificate metadata fields are populated
479        self.assertIsNotNone(cert.key_type)
480        self.assertIsNotNone(cert.cert_expiry)
481        self.assertIsNotNone(cert.cert_subject)
482        self.assertIsNotNone(cert.fingerprint_sha256)
483        self.assertIsNotNone(cert.fingerprint_sha1)
484
485        # Verify kid is empty when no key_data
486        self.assertEqual(cert.kid, None)

Test that metadata is extracted when creating keypair without private key

def test_metadata_extraction_invalid_cert(self):
488    def test_metadata_extraction_invalid_cert(self):
489        """Test that invalid certificate data doesn't crash, just skips metadata"""
490        cert = CertificateKeyPair.objects.create(
491            name=generate_id(),
492            certificate_data="invalid certificate data",
493            key_data="",
494        )
495
496        # Verify metadata fields are None for invalid cert
497        self.assertIsNone(cert.key_type)
498        self.assertIsNone(cert.cert_expiry)
499        self.assertIsNone(cert.cert_subject)
500        self.assertIsNone(cert.fingerprint_sha256)
501        self.assertIsNone(cert.fingerprint_sha1)
502        self.assertIsNone(cert.kid)

Test that invalid certificate data doesn't crash, just skips metadata

def test_kid_legacy_preservation(self):
504    def test_kid_legacy_preservation(self):
505        """Test that legacy MD5 kid is preserved when key_data hasn't changed"""
506        cert = create_test_cert()
507
508        # Simulate a legacy MD5 kid (as if backfilled from old system)
509        legacy_kid = generate_key_id_legacy(cert.key_data)
510        CertificateKeyPair.objects.filter(pk=cert.pk).update(kid=legacy_kid)
511        cert.refresh_from_db()
512        self.assertEqual(cert.kid, legacy_kid)
513
514        # Save the cert again (e.g., name change) - kid should be preserved
515        cert.name = generate_id()
516        cert.save()
517        cert.refresh_from_db()
518
519        self.assertEqual(cert.kid, legacy_kid)

Test that legacy MD5 kid is preserved when key_data hasn't changed

def test_kid_regenerated_on_key_change(self):
521    def test_kid_regenerated_on_key_change(self):
522        """Test that kid is regenerated when key_data changes"""
523        cert = create_test_cert()
524        original_kid = cert.kid
525
526        # Generate a new key and update the keypair
527        builder = CertificateBuilder(generate_id())
528        builder.build(subject_alt_names=[], validity_days=3)
529
530        cert.key_data = builder.private_key
531        cert.certificate_data = builder.certificate
532        cert.save()
533        cert.refresh_from_db()
534
535        # Kid should be regenerated for the new key
536        self.assertNotEqual(cert.kid, original_kid)
537        self.assertEqual(cert.kid, generate_key_id(cert.key_data))

Test that kid is regenerated when key_data changes

def test_kid_regenerated_on_key_change_from_legacy(self):
539    def test_kid_regenerated_on_key_change_from_legacy(self):
540        """Test that kid is regenerated from legacy MD5 when key_data changes"""
541        cert = create_test_cert()
542
543        # Simulate a legacy MD5 kid
544        legacy_kid = generate_key_id_legacy(cert.key_data)
545        CertificateKeyPair.objects.filter(pk=cert.pk).update(kid=legacy_kid)
546        cert.refresh_from_db()
547        self.assertEqual(cert.kid, legacy_kid)
548
549        # Generate a new key and update the keypair
550        builder = CertificateBuilder(generate_id())
551        builder.build(subject_alt_names=[], validity_days=3)
552
553        cert.key_data = builder.private_key
554        cert.certificate_data = builder.certificate
555        cert.save()
556        cert.refresh_from_db()
557
558        # Kid should now be SHA512 for the new key
559        self.assertNotEqual(cert.kid, legacy_kid)
560        self.assertEqual(cert.kid, generate_key_id(cert.key_data))

Test that kid is regenerated from legacy MD5 when key_data changes