authentik.crypto.api

Crypto API Views

  1"""Crypto API Views"""
  2
  3from cryptography.hazmat.backends import default_backend
  4from cryptography.hazmat.primitives.serialization import load_pem_private_key
  5from cryptography.x509 import load_pem_x509_certificate
  6from django.http.response import HttpResponse
  7from django.urls import reverse
  8from django.utils.translation import gettext_lazy as _
  9from django_filters import FilterSet
 10from django_filters.filters import BooleanFilter, MultipleChoiceFilter
 11from drf_spectacular.types import OpenApiTypes
 12from drf_spectacular.utils import (
 13    OpenApiParameter,
 14    OpenApiResponse,
 15    extend_schema,
 16)
 17from rest_framework.decorators import action
 18from rest_framework.exceptions import ValidationError
 19from rest_framework.fields import (
 20    CharField,
 21    ChoiceField,
 22    IntegerField,
 23    SerializerMethodField,
 24)
 25from rest_framework.filters import OrderingFilter, SearchFilter
 26from rest_framework.permissions import IsAuthenticated
 27from rest_framework.request import Request
 28from rest_framework.response import Response
 29from rest_framework.validators import UniqueValidator
 30from rest_framework.viewsets import ModelViewSet
 31from structlog.stdlib import get_logger
 32
 33from authentik.api.validation import validate
 34from authentik.core.api.used_by import UsedByMixin
 35from authentik.core.api.utils import ModelSerializer, PassiveSerializer
 36from authentik.core.models import UserTypes
 37from authentik.crypto.apps import MANAGED_KEY
 38from authentik.crypto.builder import CertificateBuilder, PrivateKeyAlg
 39from authentik.crypto.models import CertificateKeyPair, KeyType
 40from authentik.events.models import Event, EventAction
 41from authentik.rbac.decorators import permission_required
 42from authentik.rbac.filters import SecretKeyFilter
 43
 44LOGGER = get_logger()
 45
 46
 47class CertificateKeyPairSerializer(ModelSerializer):
 48    """CertificateKeyPair Serializer"""
 49
 50    private_key_available = SerializerMethodField()
 51
 52    certificate_download_url = SerializerMethodField()
 53    private_key_download_url = SerializerMethodField()
 54
 55    def get_private_key_available(self, instance: CertificateKeyPair) -> bool:
 56        """Show if this keypair has a private key configured or not"""
 57        return instance.key_data != "" and instance.key_data is not None
 58
 59    def get_certificate_download_url(self, instance: CertificateKeyPair) -> str:
 60        """Get URL to download certificate"""
 61        return (
 62            reverse(
 63                "authentik_api:certificatekeypair-view-certificate",
 64                kwargs={"pk": instance.pk},
 65            )
 66            + "?download"
 67        )
 68
 69    def get_private_key_download_url(self, instance: CertificateKeyPair) -> str:
 70        """Get URL to download private key"""
 71        return (
 72            reverse(
 73                "authentik_api:certificatekeypair-view-private-key",
 74                kwargs={"pk": instance.pk},
 75            )
 76            + "?download"
 77        )
 78
 79    def validate_certificate_data(self, value: str) -> str:
 80        """Verify that input is a valid PEM x509 Certificate"""
 81        try:
 82            # Cast to string to fully load and parse certificate
 83            # Prevents issues like https://github.com/goauthentik/authentik/issues/2082
 84            str(load_pem_x509_certificate(value.encode("utf-8"), default_backend()))
 85        except ValueError as exc:
 86            LOGGER.warning("Failed to load certificate", exc=exc)
 87            raise ValidationError("Unable to load certificate.") from None
 88        return value
 89
 90    def validate_key_data(self, value: str) -> str:
 91        """Verify that input is a valid PEM Key"""
 92        # Since this field is optional, data can be empty.
 93        if value != "":
 94            try:
 95                # Cast to string to fully load and parse certificate
 96                # Prevents issues like https://github.com/goauthentik/authentik/issues/2082
 97                str(
 98                    load_pem_private_key(
 99                        str.encode("\n".join([x.strip() for x in value.split("\n")])),
100                        password=None,
101                        backend=default_backend(),
102                    )
103                )
104            except (ValueError, TypeError) as exc:
105                LOGGER.warning("Failed to load private key", exc=exc)
106                raise ValidationError("Unable to load private key (possibly encrypted?).") from None
107        return value
108
109    class Meta:
110        model = CertificateKeyPair
111        fields = [
112            "pk",
113            "name",
114            "fingerprint_sha256",
115            "fingerprint_sha1",
116            "certificate_data",
117            "key_data",
118            "cert_expiry",
119            "cert_subject",
120            "private_key_available",
121            "key_type",
122            "certificate_download_url",
123            "private_key_download_url",
124            "managed",
125        ]
126        extra_kwargs = {
127            "managed": {"read_only": True},
128            "key_data": {"write_only": True},
129            "certificate_data": {"write_only": True},
130            "fingerprint_sha256": {"read_only": True},
131            "fingerprint_sha1": {"read_only": True},
132            "cert_expiry": {"read_only": True},
133            "cert_subject": {"read_only": True},
134            "key_type": {"read_only": True},
135        }
136
137
138class CertificateDataSerializer(PassiveSerializer):
139    """Get CertificateKeyPair's data"""
140
141    data = CharField(read_only=True)
142
143
144class CertificateGenerationSerializer(PassiveSerializer):
145    """Certificate generation parameters"""
146
147    common_name = CharField(
148        validators=[UniqueValidator(queryset=CertificateKeyPair.objects.all())],
149        source="name",
150    )
151    subject_alt_name = CharField(required=False, allow_blank=True, label=_("Subject-alt name"))
152    validity_days = IntegerField(initial=365)
153    alg = ChoiceField(default=PrivateKeyAlg.RSA, choices=PrivateKeyAlg.choices)
154
155
156class CertificateKeyPairFilter(FilterSet):
157    """Filter for certificates"""
158
159    has_key = BooleanFilter(
160        label="Only return certificate-key pairs with keys", method="filter_has_key"
161    )
162
163    key_type = MultipleChoiceFilter(
164        choices=KeyType.choices,
165        label="Filter by key algorithm type",
166        method="filter_key_type",
167    )
168
169    def filter_has_key(self, queryset, name, value):  # pragma: no cover
170        """Only return certificate-key pairs with keys"""
171        if not value:
172            return queryset
173        return queryset.exclude(key_data__exact="")
174
175    def filter_key_type(self, queryset, name, value):  # pragma: no cover
176        """Filter certificates by key type using the stored database field"""
177        if not value:
178            return queryset
179
180        # value is a list of KeyType enum values from MultipleChoiceFilter
181        return queryset.filter(key_type__in=value)
182
183    class Meta:
184        model = CertificateKeyPair
185        fields = ["name", "managed"]
186
187
188class CertificateKeyPairViewSet(UsedByMixin, ModelViewSet):
189    """CertificateKeyPair Viewset"""
190
191    queryset = CertificateKeyPair.objects.exclude(managed=MANAGED_KEY)
192    serializer_class = CertificateKeyPairSerializer
193    filterset_class = CertificateKeyPairFilter
194    ordering = ["name"]
195    search_fields = ["name"]
196    filter_backends = [SecretKeyFilter, OrderingFilter, SearchFilter]
197
198    @extend_schema(
199        parameters=[
200            # Override the type for `has_key` above
201            OpenApiParameter(
202                "has_key",
203                bool,
204                required=False,
205                description="Only return certificate-key pairs with keys",
206            ),
207            OpenApiParameter(
208                "key_type",
209                OpenApiTypes.STR,
210                required=False,
211                many=True,
212                enum=[choice[0] for choice in KeyType.choices],
213                description=(
214                    "Filter by key algorithm type (RSA, EC, DSA, etc). "
215                    "Can be specified multiple times (e.g. '?key_type=rsa&key_type=ec')"
216                ),
217            ),
218        ]
219    )
220    def list(self, request, *args, **kwargs):
221        return super().list(request, *args, **kwargs)
222
223    @permission_required(None, ["authentik_crypto.add_certificatekeypair"])
224    @extend_schema(
225        request=CertificateGenerationSerializer(),
226        responses={
227            200: CertificateKeyPairSerializer,
228            400: OpenApiResponse(description="Bad request"),
229        },
230    )
231    @action(detail=False, methods=["POST"])
232    @validate(CertificateGenerationSerializer)
233    def generate(self, request: Request, body: CertificateGenerationSerializer) -> Response:
234        """Generate a new, self-signed certificate-key pair"""
235        raw_san = body.validated_data.get("subject_alt_name", "")
236        sans = raw_san.split(",") if raw_san != "" else []
237        builder = CertificateBuilder(body.validated_data["name"])
238        builder.alg = body.validated_data["alg"]
239        builder.build(
240            subject_alt_names=sans,
241            validity_days=int(body.validated_data["validity_days"]),
242        )
243        instance = builder.save()
244        serializer = self.get_serializer(instance)
245        return Response(serializer.data)
246
247    @permission_required("view_certificatekeypair_certificate")
248    @extend_schema(
249        parameters=[
250            OpenApiParameter(
251                name="download",
252                location=OpenApiParameter.QUERY,
253                type=OpenApiTypes.BOOL,
254            )
255        ],
256        responses={200: CertificateDataSerializer(many=False)},
257    )
258    @action(detail=True, pagination_class=None, permission_classes=[IsAuthenticated])
259    def view_certificate(self, request: Request, pk: str) -> Response:
260        """Return certificate-key pairs certificate and log access"""
261        certificate: CertificateKeyPair = self.get_object()
262        if request.user.type != UserTypes.INTERNAL_SERVICE_ACCOUNT:
263            Event.new(  # noqa # nosec
264                EventAction.SECRET_VIEW,
265                secret=certificate,
266                type="certificate",
267            ).from_http(request)
268        if "download" in request.query_params:
269            # Mime type from https://pki-tutorial.readthedocs.io/en/latest/mime.html
270            response = HttpResponse(
271                certificate.certificate_data, content_type="application/x-pem-file"
272            )
273            response["Content-Disposition"] = (
274                f'attachment; filename="{certificate.name}_certificate.pem"'
275            )
276            return response
277        return Response(CertificateDataSerializer({"data": certificate.certificate_data}).data)
278
279    @permission_required("view_certificatekeypair_key")
280    @extend_schema(
281        parameters=[
282            OpenApiParameter(
283                name="download",
284                location=OpenApiParameter.QUERY,
285                type=OpenApiTypes.BOOL,
286            )
287        ],
288        responses={200: CertificateDataSerializer(many=False)},
289    )
290    @action(detail=True, pagination_class=None, permission_classes=[IsAuthenticated])
291    def view_private_key(self, request: Request, pk: str) -> Response:
292        """Return certificate-key pairs private key and log access"""
293        certificate: CertificateKeyPair = self.get_object()
294        if request.user.type != UserTypes.INTERNAL_SERVICE_ACCOUNT:
295            Event.new(  # noqa # nosec
296                EventAction.SECRET_VIEW,
297                secret=certificate,
298                type="private_key",
299            ).from_http(request)
300        if "download" in request.query_params:
301            # Mime type from https://pki-tutorial.readthedocs.io/en/latest/mime.html
302            response = HttpResponse(certificate.key_data, content_type="application/x-pem-file")
303            response["Content-Disposition"] = (
304                f'attachment; filename="{certificate.name}_private_key.pem"'
305            )
306            return response
307        return Response(CertificateDataSerializer({"data": certificate.key_data}).data)
LOGGER = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
class CertificateKeyPairSerializer(authentik.core.api.utils.ModelSerializer):
 48class CertificateKeyPairSerializer(ModelSerializer):
 49    """CertificateKeyPair Serializer"""
 50
 51    private_key_available = SerializerMethodField()
 52
 53    certificate_download_url = SerializerMethodField()
 54    private_key_download_url = SerializerMethodField()
 55
 56    def get_private_key_available(self, instance: CertificateKeyPair) -> bool:
 57        """Show if this keypair has a private key configured or not"""
 58        return instance.key_data != "" and instance.key_data is not None
 59
 60    def get_certificate_download_url(self, instance: CertificateKeyPair) -> str:
 61        """Get URL to download certificate"""
 62        return (
 63            reverse(
 64                "authentik_api:certificatekeypair-view-certificate",
 65                kwargs={"pk": instance.pk},
 66            )
 67            + "?download"
 68        )
 69
 70    def get_private_key_download_url(self, instance: CertificateKeyPair) -> str:
 71        """Get URL to download private key"""
 72        return (
 73            reverse(
 74                "authentik_api:certificatekeypair-view-private-key",
 75                kwargs={"pk": instance.pk},
 76            )
 77            + "?download"
 78        )
 79
 80    def validate_certificate_data(self, value: str) -> str:
 81        """Verify that input is a valid PEM x509 Certificate"""
 82        try:
 83            # Cast to string to fully load and parse certificate
 84            # Prevents issues like https://github.com/goauthentik/authentik/issues/2082
 85            str(load_pem_x509_certificate(value.encode("utf-8"), default_backend()))
 86        except ValueError as exc:
 87            LOGGER.warning("Failed to load certificate", exc=exc)
 88            raise ValidationError("Unable to load certificate.") from None
 89        return value
 90
 91    def validate_key_data(self, value: str) -> str:
 92        """Verify that input is a valid PEM Key"""
 93        # Since this field is optional, data can be empty.
 94        if value != "":
 95            try:
 96                # Cast to string to fully load and parse certificate
 97                # Prevents issues like https://github.com/goauthentik/authentik/issues/2082
 98                str(
 99                    load_pem_private_key(
100                        str.encode("\n".join([x.strip() for x in value.split("\n")])),
101                        password=None,
102                        backend=default_backend(),
103                    )
104                )
105            except (ValueError, TypeError) as exc:
106                LOGGER.warning("Failed to load private key", exc=exc)
107                raise ValidationError("Unable to load private key (possibly encrypted?).") from None
108        return value
109
110    class Meta:
111        model = CertificateKeyPair
112        fields = [
113            "pk",
114            "name",
115            "fingerprint_sha256",
116            "fingerprint_sha1",
117            "certificate_data",
118            "key_data",
119            "cert_expiry",
120            "cert_subject",
121            "private_key_available",
122            "key_type",
123            "certificate_download_url",
124            "private_key_download_url",
125            "managed",
126        ]
127        extra_kwargs = {
128            "managed": {"read_only": True},
129            "key_data": {"write_only": True},
130            "certificate_data": {"write_only": True},
131            "fingerprint_sha256": {"read_only": True},
132            "fingerprint_sha1": {"read_only": True},
133            "cert_expiry": {"read_only": True},
134            "cert_subject": {"read_only": True},
135            "key_type": {"read_only": True},
136        }

CertificateKeyPair Serializer

private_key_available
certificate_download_url
private_key_download_url
def get_private_key_available(self, instance: authentik.crypto.models.CertificateKeyPair) -> bool:
56    def get_private_key_available(self, instance: CertificateKeyPair) -> bool:
57        """Show if this keypair has a private key configured or not"""
58        return instance.key_data != "" and instance.key_data is not None

Show if this keypair has a private key configured or not

def get_certificate_download_url(self, instance: authentik.crypto.models.CertificateKeyPair) -> str:
60    def get_certificate_download_url(self, instance: CertificateKeyPair) -> str:
61        """Get URL to download certificate"""
62        return (
63            reverse(
64                "authentik_api:certificatekeypair-view-certificate",
65                kwargs={"pk": instance.pk},
66            )
67            + "?download"
68        )

Get URL to download certificate

def get_private_key_download_url(self, instance: authentik.crypto.models.CertificateKeyPair) -> str:
70    def get_private_key_download_url(self, instance: CertificateKeyPair) -> str:
71        """Get URL to download private key"""
72        return (
73            reverse(
74                "authentik_api:certificatekeypair-view-private-key",
75                kwargs={"pk": instance.pk},
76            )
77            + "?download"
78        )

Get URL to download private key

def validate_certificate_data(self, value: str) -> str:
80    def validate_certificate_data(self, value: str) -> str:
81        """Verify that input is a valid PEM x509 Certificate"""
82        try:
83            # Cast to string to fully load and parse certificate
84            # Prevents issues like https://github.com/goauthentik/authentik/issues/2082
85            str(load_pem_x509_certificate(value.encode("utf-8"), default_backend()))
86        except ValueError as exc:
87            LOGGER.warning("Failed to load certificate", exc=exc)
88            raise ValidationError("Unable to load certificate.") from None
89        return value

Verify that input is a valid PEM x509 Certificate

def validate_key_data(self, value: str) -> str:
 91    def validate_key_data(self, value: str) -> str:
 92        """Verify that input is a valid PEM Key"""
 93        # Since this field is optional, data can be empty.
 94        if value != "":
 95            try:
 96                # Cast to string to fully load and parse certificate
 97                # Prevents issues like https://github.com/goauthentik/authentik/issues/2082
 98                str(
 99                    load_pem_private_key(
100                        str.encode("\n".join([x.strip() for x in value.split("\n")])),
101                        password=None,
102                        backend=default_backend(),
103                    )
104                )
105            except (ValueError, TypeError) as exc:
106                LOGGER.warning("Failed to load private key", exc=exc)
107                raise ValidationError("Unable to load private key (possibly encrypted?).") from None
108        return value

Verify that input is a valid PEM Key

class CertificateKeyPairSerializer.Meta:
110    class Meta:
111        model = CertificateKeyPair
112        fields = [
113            "pk",
114            "name",
115            "fingerprint_sha256",
116            "fingerprint_sha1",
117            "certificate_data",
118            "key_data",
119            "cert_expiry",
120            "cert_subject",
121            "private_key_available",
122            "key_type",
123            "certificate_download_url",
124            "private_key_download_url",
125            "managed",
126        ]
127        extra_kwargs = {
128            "managed": {"read_only": True},
129            "key_data": {"write_only": True},
130            "certificate_data": {"write_only": True},
131            "fingerprint_sha256": {"read_only": True},
132            "fingerprint_sha1": {"read_only": True},
133            "cert_expiry": {"read_only": True},
134            "cert_subject": {"read_only": True},
135            "key_type": {"read_only": True},
136        }
fields = ['pk', 'name', 'fingerprint_sha256', 'fingerprint_sha1', 'certificate_data', 'key_data', 'cert_expiry', 'cert_subject', 'private_key_available', 'key_type', 'certificate_download_url', 'private_key_download_url', 'managed']
extra_kwargs = {'managed': {'read_only': True}, 'key_data': {'write_only': True}, 'certificate_data': {'write_only': True}, 'fingerprint_sha256': {'read_only': True}, 'fingerprint_sha1': {'read_only': True}, 'cert_expiry': {'read_only': True}, 'cert_subject': {'read_only': True}, 'key_type': {'read_only': True}}
class CertificateDataSerializer(authentik.core.api.utils.PassiveSerializer):
139class CertificateDataSerializer(PassiveSerializer):
140    """Get CertificateKeyPair's data"""
141
142    data = CharField(read_only=True)

Get CertificateKeyPair's data

data
584    @property
585    def data(self):
586        ret = super().data
587        return ReturnDict(ret, serializer=self)
class CertificateGenerationSerializer(authentik.core.api.utils.PassiveSerializer):
145class CertificateGenerationSerializer(PassiveSerializer):
146    """Certificate generation parameters"""
147
148    common_name = CharField(
149        validators=[UniqueValidator(queryset=CertificateKeyPair.objects.all())],
150        source="name",
151    )
152    subject_alt_name = CharField(required=False, allow_blank=True, label=_("Subject-alt name"))
153    validity_days = IntegerField(initial=365)
154    alg = ChoiceField(default=PrivateKeyAlg.RSA, choices=PrivateKeyAlg.choices)

Certificate generation parameters

common_name
subject_alt_name
validity_days
alg
class CertificateKeyPairFilter(django_filters.filterset.FilterSet):
157class CertificateKeyPairFilter(FilterSet):
158    """Filter for certificates"""
159
160    has_key = BooleanFilter(
161        label="Only return certificate-key pairs with keys", method="filter_has_key"
162    )
163
164    key_type = MultipleChoiceFilter(
165        choices=KeyType.choices,
166        label="Filter by key algorithm type",
167        method="filter_key_type",
168    )
169
170    def filter_has_key(self, queryset, name, value):  # pragma: no cover
171        """Only return certificate-key pairs with keys"""
172        if not value:
173            return queryset
174        return queryset.exclude(key_data__exact="")
175
176    def filter_key_type(self, queryset, name, value):  # pragma: no cover
177        """Filter certificates by key type using the stored database field"""
178        if not value:
179            return queryset
180
181        # value is a list of KeyType enum values from MultipleChoiceFilter
182        return queryset.filter(key_type__in=value)
183
184    class Meta:
185        model = CertificateKeyPair
186        fields = ["name", "managed"]

Filter for certificates

has_key
key_type
def filter_has_key(self, queryset, name, value):
170    def filter_has_key(self, queryset, name, value):  # pragma: no cover
171        """Only return certificate-key pairs with keys"""
172        if not value:
173            return queryset
174        return queryset.exclude(key_data__exact="")

Only return certificate-key pairs with keys

def filter_key_type(self, queryset, name, value):
176    def filter_key_type(self, queryset, name, value):  # pragma: no cover
177        """Filter certificates by key type using the stored database field"""
178        if not value:
179            return queryset
180
181        # value is a list of KeyType enum values from MultipleChoiceFilter
182        return queryset.filter(key_type__in=value)

Filter certificates by key type using the stored database field

declared_filters = OrderedDict({'has_key': <django_filters.filters.BooleanFilter object>, 'key_type': <django_filters.filters.MultipleChoiceFilter object>})
base_filters = OrderedDict({'name': <django_filters.filters.CharFilter object>, 'managed': <django_filters.filters.CharFilter object>, 'has_key': <django_filters.filters.BooleanFilter object>, 'key_type': <django_filters.filters.MultipleChoiceFilter object>})
class CertificateKeyPairFilter.Meta:
184    class Meta:
185        model = CertificateKeyPair
186        fields = ["name", "managed"]
fields = ['name', 'managed']
class CertificateKeyPairViewSet(authentik.core.api.used_by.UsedByMixin, rest_framework.viewsets.ModelViewSet):
189class CertificateKeyPairViewSet(UsedByMixin, ModelViewSet):
190    """CertificateKeyPair Viewset"""
191
192    queryset = CertificateKeyPair.objects.exclude(managed=MANAGED_KEY)
193    serializer_class = CertificateKeyPairSerializer
194    filterset_class = CertificateKeyPairFilter
195    ordering = ["name"]
196    search_fields = ["name"]
197    filter_backends = [SecretKeyFilter, OrderingFilter, SearchFilter]
198
199    @extend_schema(
200        parameters=[
201            # Override the type for `has_key` above
202            OpenApiParameter(
203                "has_key",
204                bool,
205                required=False,
206                description="Only return certificate-key pairs with keys",
207            ),
208            OpenApiParameter(
209                "key_type",
210                OpenApiTypes.STR,
211                required=False,
212                many=True,
213                enum=[choice[0] for choice in KeyType.choices],
214                description=(
215                    "Filter by key algorithm type (RSA, EC, DSA, etc). "
216                    "Can be specified multiple times (e.g. '?key_type=rsa&key_type=ec')"
217                ),
218            ),
219        ]
220    )
221    def list(self, request, *args, **kwargs):
222        return super().list(request, *args, **kwargs)
223
224    @permission_required(None, ["authentik_crypto.add_certificatekeypair"])
225    @extend_schema(
226        request=CertificateGenerationSerializer(),
227        responses={
228            200: CertificateKeyPairSerializer,
229            400: OpenApiResponse(description="Bad request"),
230        },
231    )
232    @action(detail=False, methods=["POST"])
233    @validate(CertificateGenerationSerializer)
234    def generate(self, request: Request, body: CertificateGenerationSerializer) -> Response:
235        """Generate a new, self-signed certificate-key pair"""
236        raw_san = body.validated_data.get("subject_alt_name", "")
237        sans = raw_san.split(",") if raw_san != "" else []
238        builder = CertificateBuilder(body.validated_data["name"])
239        builder.alg = body.validated_data["alg"]
240        builder.build(
241            subject_alt_names=sans,
242            validity_days=int(body.validated_data["validity_days"]),
243        )
244        instance = builder.save()
245        serializer = self.get_serializer(instance)
246        return Response(serializer.data)
247
248    @permission_required("view_certificatekeypair_certificate")
249    @extend_schema(
250        parameters=[
251            OpenApiParameter(
252                name="download",
253                location=OpenApiParameter.QUERY,
254                type=OpenApiTypes.BOOL,
255            )
256        ],
257        responses={200: CertificateDataSerializer(many=False)},
258    )
259    @action(detail=True, pagination_class=None, permission_classes=[IsAuthenticated])
260    def view_certificate(self, request: Request, pk: str) -> Response:
261        """Return certificate-key pairs certificate and log access"""
262        certificate: CertificateKeyPair = self.get_object()
263        if request.user.type != UserTypes.INTERNAL_SERVICE_ACCOUNT:
264            Event.new(  # noqa # nosec
265                EventAction.SECRET_VIEW,
266                secret=certificate,
267                type="certificate",
268            ).from_http(request)
269        if "download" in request.query_params:
270            # Mime type from https://pki-tutorial.readthedocs.io/en/latest/mime.html
271            response = HttpResponse(
272                certificate.certificate_data, content_type="application/x-pem-file"
273            )
274            response["Content-Disposition"] = (
275                f'attachment; filename="{certificate.name}_certificate.pem"'
276            )
277            return response
278        return Response(CertificateDataSerializer({"data": certificate.certificate_data}).data)
279
280    @permission_required("view_certificatekeypair_key")
281    @extend_schema(
282        parameters=[
283            OpenApiParameter(
284                name="download",
285                location=OpenApiParameter.QUERY,
286                type=OpenApiTypes.BOOL,
287            )
288        ],
289        responses={200: CertificateDataSerializer(many=False)},
290    )
291    @action(detail=True, pagination_class=None, permission_classes=[IsAuthenticated])
292    def view_private_key(self, request: Request, pk: str) -> Response:
293        """Return certificate-key pairs private key and log access"""
294        certificate: CertificateKeyPair = self.get_object()
295        if request.user.type != UserTypes.INTERNAL_SERVICE_ACCOUNT:
296            Event.new(  # noqa # nosec
297                EventAction.SECRET_VIEW,
298                secret=certificate,
299                type="private_key",
300            ).from_http(request)
301        if "download" in request.query_params:
302            # Mime type from https://pki-tutorial.readthedocs.io/en/latest/mime.html
303            response = HttpResponse(certificate.key_data, content_type="application/x-pem-file")
304            response["Content-Disposition"] = (
305                f'attachment; filename="{certificate.name}_private_key.pem"'
306            )
307            return response
308        return Response(CertificateDataSerializer({"data": certificate.key_data}).data)

CertificateKeyPair Viewset

queryset = <QuerySet []>
serializer_class = <class 'CertificateKeyPairSerializer'>
filterset_class = <class 'CertificateKeyPairFilter'>
ordering = ['name']
search_fields = ['name']
filter_backends = [<class 'authentik.rbac.filters.SecretKeyFilter'>, <class 'rest_framework.filters.OrderingFilter'>, <class 'rest_framework.filters.SearchFilter'>]
@extend_schema(parameters=[OpenApiParameter('has_key', bool, required=False, description='Only return certificate-key pairs with keys'), OpenApiParameter('key_type', OpenApiTypes.STR, required=False, many=True, enum=[choice[0] for choice in KeyType.choices], description="Filter by key algorithm type (RSA, EC, DSA, etc). Can be specified multiple times (e.g. '?key_type=rsa&key_type=ec')")])
def list(self, request, *args, **kwargs):
199    @extend_schema(
200        parameters=[
201            # Override the type for `has_key` above
202            OpenApiParameter(
203                "has_key",
204                bool,
205                required=False,
206                description="Only return certificate-key pairs with keys",
207            ),
208            OpenApiParameter(
209                "key_type",
210                OpenApiTypes.STR,
211                required=False,
212                many=True,
213                enum=[choice[0] for choice in KeyType.choices],
214                description=(
215                    "Filter by key algorithm type (RSA, EC, DSA, etc). "
216                    "Can be specified multiple times (e.g. '?key_type=rsa&key_type=ec')"
217                ),
218            ),
219        ]
220    )
221    def list(self, request, *args, **kwargs):
222        return super().list(request, *args, **kwargs)
@permission_required(None, ['authentik_crypto.add_certificatekeypair'])
@extend_schema(request=CertificateGenerationSerializer(), responses={200: CertificateKeyPairSerializer, 400: OpenApiResponse(description='Bad request')})
@action(detail=False, methods=['POST'])
@validate(CertificateGenerationSerializer)
def generate( self, request: rest_framework.request.Request, body: CertificateGenerationSerializer) -> rest_framework.response.Response:
224    @permission_required(None, ["authentik_crypto.add_certificatekeypair"])
225    @extend_schema(
226        request=CertificateGenerationSerializer(),
227        responses={
228            200: CertificateKeyPairSerializer,
229            400: OpenApiResponse(description="Bad request"),
230        },
231    )
232    @action(detail=False, methods=["POST"])
233    @validate(CertificateGenerationSerializer)
234    def generate(self, request: Request, body: CertificateGenerationSerializer) -> Response:
235        """Generate a new, self-signed certificate-key pair"""
236        raw_san = body.validated_data.get("subject_alt_name", "")
237        sans = raw_san.split(",") if raw_san != "" else []
238        builder = CertificateBuilder(body.validated_data["name"])
239        builder.alg = body.validated_data["alg"]
240        builder.build(
241            subject_alt_names=sans,
242            validity_days=int(body.validated_data["validity_days"]),
243        )
244        instance = builder.save()
245        serializer = self.get_serializer(instance)
246        return Response(serializer.data)

Generate a new, self-signed certificate-key pair

@permission_required('view_certificatekeypair_certificate')
@extend_schema(parameters=[OpenApiParameter(name='download', location=OpenApiParameter.QUERY, type=OpenApiTypes.BOOL)], responses={200: CertificateDataSerializer(many=False)})
@action(detail=True, pagination_class=None, permission_classes=[IsAuthenticated])
def view_certificate( self, request: rest_framework.request.Request, pk: str) -> rest_framework.response.Response:
248    @permission_required("view_certificatekeypair_certificate")
249    @extend_schema(
250        parameters=[
251            OpenApiParameter(
252                name="download",
253                location=OpenApiParameter.QUERY,
254                type=OpenApiTypes.BOOL,
255            )
256        ],
257        responses={200: CertificateDataSerializer(many=False)},
258    )
259    @action(detail=True, pagination_class=None, permission_classes=[IsAuthenticated])
260    def view_certificate(self, request: Request, pk: str) -> Response:
261        """Return certificate-key pairs certificate and log access"""
262        certificate: CertificateKeyPair = self.get_object()
263        if request.user.type != UserTypes.INTERNAL_SERVICE_ACCOUNT:
264            Event.new(  # noqa # nosec
265                EventAction.SECRET_VIEW,
266                secret=certificate,
267                type="certificate",
268            ).from_http(request)
269        if "download" in request.query_params:
270            # Mime type from https://pki-tutorial.readthedocs.io/en/latest/mime.html
271            response = HttpResponse(
272                certificate.certificate_data, content_type="application/x-pem-file"
273            )
274            response["Content-Disposition"] = (
275                f'attachment; filename="{certificate.name}_certificate.pem"'
276            )
277            return response
278        return Response(CertificateDataSerializer({"data": certificate.certificate_data}).data)

Return certificate-key pairs certificate and log access

@permission_required('view_certificatekeypair_key')
@extend_schema(parameters=[OpenApiParameter(name='download', location=OpenApiParameter.QUERY, type=OpenApiTypes.BOOL)], responses={200: CertificateDataSerializer(many=False)})
@action(detail=True, pagination_class=None, permission_classes=[IsAuthenticated])
def view_private_key( self, request: rest_framework.request.Request, pk: str) -> rest_framework.response.Response:
280    @permission_required("view_certificatekeypair_key")
281    @extend_schema(
282        parameters=[
283            OpenApiParameter(
284                name="download",
285                location=OpenApiParameter.QUERY,
286                type=OpenApiTypes.BOOL,
287            )
288        ],
289        responses={200: CertificateDataSerializer(many=False)},
290    )
291    @action(detail=True, pagination_class=None, permission_classes=[IsAuthenticated])
292    def view_private_key(self, request: Request, pk: str) -> Response:
293        """Return certificate-key pairs private key and log access"""
294        certificate: CertificateKeyPair = self.get_object()
295        if request.user.type != UserTypes.INTERNAL_SERVICE_ACCOUNT:
296            Event.new(  # noqa # nosec
297                EventAction.SECRET_VIEW,
298                secret=certificate,
299                type="private_key",
300            ).from_http(request)
301        if "download" in request.query_params:
302            # Mime type from https://pki-tutorial.readthedocs.io/en/latest/mime.html
303            response = HttpResponse(certificate.key_data, content_type="application/x-pem-file")
304            response["Content-Disposition"] = (
305                f'attachment; filename="{certificate.name}_private_key.pem"'
306            )
307            return response
308        return Response(CertificateDataSerializer({"data": certificate.key_data}).data)

Return certificate-key pairs private key and log access

name = None
description = None
suffix = None
detail = None
basename = None