authentik.crypto.api
Crypto API Views
1"""Crypto API Views""" 2 3from cryptography.hazmat.backends import default_backend 4from cryptography.hazmat.primitives.serialization import load_pem_private_key 5from cryptography.x509 import load_pem_x509_certificate 6from django.http.response import HttpResponse 7from django.urls import reverse 8from django.utils.translation import gettext_lazy as _ 9from django_filters import FilterSet 10from django_filters.filters import BooleanFilter, MultipleChoiceFilter 11from drf_spectacular.types import OpenApiTypes 12from drf_spectacular.utils import ( 13 OpenApiParameter, 14 OpenApiResponse, 15 extend_schema, 16) 17from rest_framework.decorators import action 18from rest_framework.exceptions import ValidationError 19from rest_framework.fields import ( 20 CharField, 21 ChoiceField, 22 IntegerField, 23 SerializerMethodField, 24) 25from rest_framework.filters import OrderingFilter, SearchFilter 26from rest_framework.permissions import IsAuthenticated 27from rest_framework.request import Request 28from rest_framework.response import Response 29from rest_framework.validators import UniqueValidator 30from rest_framework.viewsets import ModelViewSet 31from structlog.stdlib import get_logger 32 33from authentik.api.validation import validate 34from authentik.core.api.used_by import UsedByMixin 35from authentik.core.api.utils import ModelSerializer, PassiveSerializer 36from authentik.core.models import UserTypes 37from authentik.crypto.apps import MANAGED_KEY 38from authentik.crypto.builder import CertificateBuilder, PrivateKeyAlg 39from authentik.crypto.models import CertificateKeyPair, KeyType 40from authentik.events.models import Event, EventAction 41from authentik.rbac.decorators import permission_required 42from authentik.rbac.filters import SecretKeyFilter 43 44LOGGER = get_logger() 45 46 47class CertificateKeyPairSerializer(ModelSerializer): 48 """CertificateKeyPair Serializer""" 49 50 private_key_available = SerializerMethodField() 51 52 certificate_download_url = SerializerMethodField() 53 private_key_download_url = SerializerMethodField() 54 55 def get_private_key_available(self, instance: CertificateKeyPair) -> bool: 56 """Show if this keypair has a private key configured or not""" 57 return instance.key_data != "" and instance.key_data is not None 58 59 def get_certificate_download_url(self, instance: CertificateKeyPair) -> str: 60 """Get URL to download certificate""" 61 return ( 62 reverse( 63 "authentik_api:certificatekeypair-view-certificate", 64 kwargs={"pk": instance.pk}, 65 ) 66 + "?download" 67 ) 68 69 def get_private_key_download_url(self, instance: CertificateKeyPair) -> str: 70 """Get URL to download private key""" 71 return ( 72 reverse( 73 "authentik_api:certificatekeypair-view-private-key", 74 kwargs={"pk": instance.pk}, 75 ) 76 + "?download" 77 ) 78 79 def validate_certificate_data(self, value: str) -> str: 80 """Verify that input is a valid PEM x509 Certificate""" 81 try: 82 # Cast to string to fully load and parse certificate 83 # Prevents issues like https://github.com/goauthentik/authentik/issues/2082 84 str(load_pem_x509_certificate(value.encode("utf-8"), default_backend())) 85 except ValueError as exc: 86 LOGGER.warning("Failed to load certificate", exc=exc) 87 raise ValidationError("Unable to load certificate.") from None 88 return value 89 90 def validate_key_data(self, value: str) -> str: 91 """Verify that input is a valid PEM Key""" 92 # Since this field is optional, data can be empty. 93 if value != "": 94 try: 95 # Cast to string to fully load and parse certificate 96 # Prevents issues like https://github.com/goauthentik/authentik/issues/2082 97 str( 98 load_pem_private_key( 99 str.encode("\n".join([x.strip() for x in value.split("\n")])), 100 password=None, 101 backend=default_backend(), 102 ) 103 ) 104 except (ValueError, TypeError) as exc: 105 LOGGER.warning("Failed to load private key", exc=exc) 106 raise ValidationError("Unable to load private key (possibly encrypted?).") from None 107 return value 108 109 class Meta: 110 model = CertificateKeyPair 111 fields = [ 112 "pk", 113 "name", 114 "fingerprint_sha256", 115 "fingerprint_sha1", 116 "certificate_data", 117 "key_data", 118 "cert_expiry", 119 "cert_subject", 120 "private_key_available", 121 "key_type", 122 "certificate_download_url", 123 "private_key_download_url", 124 "managed", 125 ] 126 extra_kwargs = { 127 "managed": {"read_only": True}, 128 "key_data": {"write_only": True}, 129 "certificate_data": {"write_only": True}, 130 "fingerprint_sha256": {"read_only": True}, 131 "fingerprint_sha1": {"read_only": True}, 132 "cert_expiry": {"read_only": True}, 133 "cert_subject": {"read_only": True}, 134 "key_type": {"read_only": True}, 135 } 136 137 138class CertificateDataSerializer(PassiveSerializer): 139 """Get CertificateKeyPair's data""" 140 141 data = CharField(read_only=True) 142 143 144class CertificateGenerationSerializer(PassiveSerializer): 145 """Certificate generation parameters""" 146 147 common_name = CharField( 148 validators=[UniqueValidator(queryset=CertificateKeyPair.objects.all())], 149 source="name", 150 ) 151 subject_alt_name = CharField(required=False, allow_blank=True, label=_("Subject-alt name")) 152 validity_days = IntegerField(initial=365) 153 alg = ChoiceField(default=PrivateKeyAlg.RSA, choices=PrivateKeyAlg.choices) 154 155 156class CertificateKeyPairFilter(FilterSet): 157 """Filter for certificates""" 158 159 has_key = BooleanFilter( 160 label="Only return certificate-key pairs with keys", method="filter_has_key" 161 ) 162 163 key_type = MultipleChoiceFilter( 164 choices=KeyType.choices, 165 label="Filter by key algorithm type", 166 method="filter_key_type", 167 ) 168 169 def filter_has_key(self, queryset, name, value): # pragma: no cover 170 """Only return certificate-key pairs with keys""" 171 if not value: 172 return queryset 173 return queryset.exclude(key_data__exact="") 174 175 def filter_key_type(self, queryset, name, value): # pragma: no cover 176 """Filter certificates by key type using the stored database field""" 177 if not value: 178 return queryset 179 180 # value is a list of KeyType enum values from MultipleChoiceFilter 181 return queryset.filter(key_type__in=value) 182 183 class Meta: 184 model = CertificateKeyPair 185 fields = ["name", "managed"] 186 187 188class CertificateKeyPairViewSet(UsedByMixin, ModelViewSet): 189 """CertificateKeyPair Viewset""" 190 191 queryset = CertificateKeyPair.objects.exclude(managed=MANAGED_KEY) 192 serializer_class = CertificateKeyPairSerializer 193 filterset_class = CertificateKeyPairFilter 194 ordering = ["name"] 195 search_fields = ["name"] 196 filter_backends = [SecretKeyFilter, OrderingFilter, SearchFilter] 197 198 @extend_schema( 199 parameters=[ 200 # Override the type for `has_key` above 201 OpenApiParameter( 202 "has_key", 203 bool, 204 required=False, 205 description="Only return certificate-key pairs with keys", 206 ), 207 OpenApiParameter( 208 "key_type", 209 OpenApiTypes.STR, 210 required=False, 211 many=True, 212 enum=[choice[0] for choice in KeyType.choices], 213 description=( 214 "Filter by key algorithm type (RSA, EC, DSA, etc). " 215 "Can be specified multiple times (e.g. '?key_type=rsa&key_type=ec')" 216 ), 217 ), 218 ] 219 ) 220 def list(self, request, *args, **kwargs): 221 return super().list(request, *args, **kwargs) 222 223 @permission_required(None, ["authentik_crypto.add_certificatekeypair"]) 224 @extend_schema( 225 request=CertificateGenerationSerializer(), 226 responses={ 227 200: CertificateKeyPairSerializer, 228 400: OpenApiResponse(description="Bad request"), 229 }, 230 ) 231 @action(detail=False, methods=["POST"]) 232 @validate(CertificateGenerationSerializer) 233 def generate(self, request: Request, body: CertificateGenerationSerializer) -> Response: 234 """Generate a new, self-signed certificate-key pair""" 235 raw_san = body.validated_data.get("subject_alt_name", "") 236 sans = raw_san.split(",") if raw_san != "" else [] 237 builder = CertificateBuilder(body.validated_data["name"]) 238 builder.alg = body.validated_data["alg"] 239 builder.build( 240 subject_alt_names=sans, 241 validity_days=int(body.validated_data["validity_days"]), 242 ) 243 instance = builder.save() 244 serializer = self.get_serializer(instance) 245 return Response(serializer.data) 246 247 @permission_required("view_certificatekeypair_certificate") 248 @extend_schema( 249 parameters=[ 250 OpenApiParameter( 251 name="download", 252 location=OpenApiParameter.QUERY, 253 type=OpenApiTypes.BOOL, 254 ) 255 ], 256 responses={200: CertificateDataSerializer(many=False)}, 257 ) 258 @action(detail=True, pagination_class=None, permission_classes=[IsAuthenticated]) 259 def view_certificate(self, request: Request, pk: str) -> Response: 260 """Return certificate-key pairs certificate and log access""" 261 certificate: CertificateKeyPair = self.get_object() 262 if request.user.type != UserTypes.INTERNAL_SERVICE_ACCOUNT: 263 Event.new( # noqa # nosec 264 EventAction.SECRET_VIEW, 265 secret=certificate, 266 type="certificate", 267 ).from_http(request) 268 if "download" in request.query_params: 269 # Mime type from https://pki-tutorial.readthedocs.io/en/latest/mime.html 270 response = HttpResponse( 271 certificate.certificate_data, content_type="application/x-pem-file" 272 ) 273 response["Content-Disposition"] = ( 274 f'attachment; filename="{certificate.name}_certificate.pem"' 275 ) 276 return response 277 return Response(CertificateDataSerializer({"data": certificate.certificate_data}).data) 278 279 @permission_required("view_certificatekeypair_key") 280 @extend_schema( 281 parameters=[ 282 OpenApiParameter( 283 name="download", 284 location=OpenApiParameter.QUERY, 285 type=OpenApiTypes.BOOL, 286 ) 287 ], 288 responses={200: CertificateDataSerializer(many=False)}, 289 ) 290 @action(detail=True, pagination_class=None, permission_classes=[IsAuthenticated]) 291 def view_private_key(self, request: Request, pk: str) -> Response: 292 """Return certificate-key pairs private key and log access""" 293 certificate: CertificateKeyPair = self.get_object() 294 if request.user.type != UserTypes.INTERNAL_SERVICE_ACCOUNT: 295 Event.new( # noqa # nosec 296 EventAction.SECRET_VIEW, 297 secret=certificate, 298 type="private_key", 299 ).from_http(request) 300 if "download" in request.query_params: 301 # Mime type from https://pki-tutorial.readthedocs.io/en/latest/mime.html 302 response = HttpResponse(certificate.key_data, content_type="application/x-pem-file") 303 response["Content-Disposition"] = ( 304 f'attachment; filename="{certificate.name}_private_key.pem"' 305 ) 306 return response 307 return Response(CertificateDataSerializer({"data": certificate.key_data}).data)
LOGGER =
<BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
48class CertificateKeyPairSerializer(ModelSerializer): 49 """CertificateKeyPair Serializer""" 50 51 private_key_available = SerializerMethodField() 52 53 certificate_download_url = SerializerMethodField() 54 private_key_download_url = SerializerMethodField() 55 56 def get_private_key_available(self, instance: CertificateKeyPair) -> bool: 57 """Show if this keypair has a private key configured or not""" 58 return instance.key_data != "" and instance.key_data is not None 59 60 def get_certificate_download_url(self, instance: CertificateKeyPair) -> str: 61 """Get URL to download certificate""" 62 return ( 63 reverse( 64 "authentik_api:certificatekeypair-view-certificate", 65 kwargs={"pk": instance.pk}, 66 ) 67 + "?download" 68 ) 69 70 def get_private_key_download_url(self, instance: CertificateKeyPair) -> str: 71 """Get URL to download private key""" 72 return ( 73 reverse( 74 "authentik_api:certificatekeypair-view-private-key", 75 kwargs={"pk": instance.pk}, 76 ) 77 + "?download" 78 ) 79 80 def validate_certificate_data(self, value: str) -> str: 81 """Verify that input is a valid PEM x509 Certificate""" 82 try: 83 # Cast to string to fully load and parse certificate 84 # Prevents issues like https://github.com/goauthentik/authentik/issues/2082 85 str(load_pem_x509_certificate(value.encode("utf-8"), default_backend())) 86 except ValueError as exc: 87 LOGGER.warning("Failed to load certificate", exc=exc) 88 raise ValidationError("Unable to load certificate.") from None 89 return value 90 91 def validate_key_data(self, value: str) -> str: 92 """Verify that input is a valid PEM Key""" 93 # Since this field is optional, data can be empty. 94 if value != "": 95 try: 96 # Cast to string to fully load and parse certificate 97 # Prevents issues like https://github.com/goauthentik/authentik/issues/2082 98 str( 99 load_pem_private_key( 100 str.encode("\n".join([x.strip() for x in value.split("\n")])), 101 password=None, 102 backend=default_backend(), 103 ) 104 ) 105 except (ValueError, TypeError) as exc: 106 LOGGER.warning("Failed to load private key", exc=exc) 107 raise ValidationError("Unable to load private key (possibly encrypted?).") from None 108 return value 109 110 class Meta: 111 model = CertificateKeyPair 112 fields = [ 113 "pk", 114 "name", 115 "fingerprint_sha256", 116 "fingerprint_sha1", 117 "certificate_data", 118 "key_data", 119 "cert_expiry", 120 "cert_subject", 121 "private_key_available", 122 "key_type", 123 "certificate_download_url", 124 "private_key_download_url", 125 "managed", 126 ] 127 extra_kwargs = { 128 "managed": {"read_only": True}, 129 "key_data": {"write_only": True}, 130 "certificate_data": {"write_only": True}, 131 "fingerprint_sha256": {"read_only": True}, 132 "fingerprint_sha1": {"read_only": True}, 133 "cert_expiry": {"read_only": True}, 134 "cert_subject": {"read_only": True}, 135 "key_type": {"read_only": True}, 136 }
CertificateKeyPair Serializer
56 def get_private_key_available(self, instance: CertificateKeyPair) -> bool: 57 """Show if this keypair has a private key configured or not""" 58 return instance.key_data != "" and instance.key_data is not None
Show if this keypair has a private key configured or not
def
get_certificate_download_url(self, instance: authentik.crypto.models.CertificateKeyPair) -> str:
60 def get_certificate_download_url(self, instance: CertificateKeyPair) -> str: 61 """Get URL to download certificate""" 62 return ( 63 reverse( 64 "authentik_api:certificatekeypair-view-certificate", 65 kwargs={"pk": instance.pk}, 66 ) 67 + "?download" 68 )
Get URL to download certificate
def
get_private_key_download_url(self, instance: authentik.crypto.models.CertificateKeyPair) -> str:
70 def get_private_key_download_url(self, instance: CertificateKeyPair) -> str: 71 """Get URL to download private key""" 72 return ( 73 reverse( 74 "authentik_api:certificatekeypair-view-private-key", 75 kwargs={"pk": instance.pk}, 76 ) 77 + "?download" 78 )
Get URL to download private key
def
validate_certificate_data(self, value: str) -> str:
80 def validate_certificate_data(self, value: str) -> str: 81 """Verify that input is a valid PEM x509 Certificate""" 82 try: 83 # Cast to string to fully load and parse certificate 84 # Prevents issues like https://github.com/goauthentik/authentik/issues/2082 85 str(load_pem_x509_certificate(value.encode("utf-8"), default_backend())) 86 except ValueError as exc: 87 LOGGER.warning("Failed to load certificate", exc=exc) 88 raise ValidationError("Unable to load certificate.") from None 89 return value
Verify that input is a valid PEM x509 Certificate
def
validate_key_data(self, value: str) -> str:
91 def validate_key_data(self, value: str) -> str: 92 """Verify that input is a valid PEM Key""" 93 # Since this field is optional, data can be empty. 94 if value != "": 95 try: 96 # Cast to string to fully load and parse certificate 97 # Prevents issues like https://github.com/goauthentik/authentik/issues/2082 98 str( 99 load_pem_private_key( 100 str.encode("\n".join([x.strip() for x in value.split("\n")])), 101 password=None, 102 backend=default_backend(), 103 ) 104 ) 105 except (ValueError, TypeError) as exc: 106 LOGGER.warning("Failed to load private key", exc=exc) 107 raise ValidationError("Unable to load private key (possibly encrypted?).") from None 108 return value
Verify that input is a valid PEM Key
Inherited Members
class
CertificateKeyPairSerializer.Meta:
110 class Meta: 111 model = CertificateKeyPair 112 fields = [ 113 "pk", 114 "name", 115 "fingerprint_sha256", 116 "fingerprint_sha1", 117 "certificate_data", 118 "key_data", 119 "cert_expiry", 120 "cert_subject", 121 "private_key_available", 122 "key_type", 123 "certificate_download_url", 124 "private_key_download_url", 125 "managed", 126 ] 127 extra_kwargs = { 128 "managed": {"read_only": True}, 129 "key_data": {"write_only": True}, 130 "certificate_data": {"write_only": True}, 131 "fingerprint_sha256": {"read_only": True}, 132 "fingerprint_sha1": {"read_only": True}, 133 "cert_expiry": {"read_only": True}, 134 "cert_subject": {"read_only": True}, 135 "key_type": {"read_only": True}, 136 }
model =
<class 'authentik.crypto.models.CertificateKeyPair'>
fields =
['pk', 'name', 'fingerprint_sha256', 'fingerprint_sha1', 'certificate_data', 'key_data', 'cert_expiry', 'cert_subject', 'private_key_available', 'key_type', 'certificate_download_url', 'private_key_download_url', 'managed']
extra_kwargs =
{'managed': {'read_only': True}, 'key_data': {'write_only': True}, 'certificate_data': {'write_only': True}, 'fingerprint_sha256': {'read_only': True}, 'fingerprint_sha1': {'read_only': True}, 'cert_expiry': {'read_only': True}, 'cert_subject': {'read_only': True}, 'key_type': {'read_only': True}}
139class CertificateDataSerializer(PassiveSerializer): 140 """Get CertificateKeyPair's data""" 141 142 data = CharField(read_only=True)
Get CertificateKeyPair's data
Inherited Members
145class CertificateGenerationSerializer(PassiveSerializer): 146 """Certificate generation parameters""" 147 148 common_name = CharField( 149 validators=[UniqueValidator(queryset=CertificateKeyPair.objects.all())], 150 source="name", 151 ) 152 subject_alt_name = CharField(required=False, allow_blank=True, label=_("Subject-alt name")) 153 validity_days = IntegerField(initial=365) 154 alg = ChoiceField(default=PrivateKeyAlg.RSA, choices=PrivateKeyAlg.choices)
Certificate generation parameters
Inherited Members
class
CertificateKeyPairFilter(django_filters.filterset.FilterSet):
157class CertificateKeyPairFilter(FilterSet): 158 """Filter for certificates""" 159 160 has_key = BooleanFilter( 161 label="Only return certificate-key pairs with keys", method="filter_has_key" 162 ) 163 164 key_type = MultipleChoiceFilter( 165 choices=KeyType.choices, 166 label="Filter by key algorithm type", 167 method="filter_key_type", 168 ) 169 170 def filter_has_key(self, queryset, name, value): # pragma: no cover 171 """Only return certificate-key pairs with keys""" 172 if not value: 173 return queryset 174 return queryset.exclude(key_data__exact="") 175 176 def filter_key_type(self, queryset, name, value): # pragma: no cover 177 """Filter certificates by key type using the stored database field""" 178 if not value: 179 return queryset 180 181 # value is a list of KeyType enum values from MultipleChoiceFilter 182 return queryset.filter(key_type__in=value) 183 184 class Meta: 185 model = CertificateKeyPair 186 fields = ["name", "managed"]
Filter for certificates
def
filter_has_key(self, queryset, name, value):
170 def filter_has_key(self, queryset, name, value): # pragma: no cover 171 """Only return certificate-key pairs with keys""" 172 if not value: 173 return queryset 174 return queryset.exclude(key_data__exact="")
Only return certificate-key pairs with keys
def
filter_key_type(self, queryset, name, value):
176 def filter_key_type(self, queryset, name, value): # pragma: no cover 177 """Filter certificates by key type using the stored database field""" 178 if not value: 179 return queryset 180 181 # value is a list of KeyType enum values from MultipleChoiceFilter 182 return queryset.filter(key_type__in=value)
Filter certificates by key type using the stored database field
class
CertificateKeyPairFilter.Meta:
model =
<class 'authentik.crypto.models.CertificateKeyPair'>
class
CertificateKeyPairViewSet(authentik.core.api.used_by.UsedByMixin, rest_framework.viewsets.ModelViewSet):
189class CertificateKeyPairViewSet(UsedByMixin, ModelViewSet): 190 """CertificateKeyPair Viewset""" 191 192 queryset = CertificateKeyPair.objects.exclude(managed=MANAGED_KEY) 193 serializer_class = CertificateKeyPairSerializer 194 filterset_class = CertificateKeyPairFilter 195 ordering = ["name"] 196 search_fields = ["name"] 197 filter_backends = [SecretKeyFilter, OrderingFilter, SearchFilter] 198 199 @extend_schema( 200 parameters=[ 201 # Override the type for `has_key` above 202 OpenApiParameter( 203 "has_key", 204 bool, 205 required=False, 206 description="Only return certificate-key pairs with keys", 207 ), 208 OpenApiParameter( 209 "key_type", 210 OpenApiTypes.STR, 211 required=False, 212 many=True, 213 enum=[choice[0] for choice in KeyType.choices], 214 description=( 215 "Filter by key algorithm type (RSA, EC, DSA, etc). " 216 "Can be specified multiple times (e.g. '?key_type=rsa&key_type=ec')" 217 ), 218 ), 219 ] 220 ) 221 def list(self, request, *args, **kwargs): 222 return super().list(request, *args, **kwargs) 223 224 @permission_required(None, ["authentik_crypto.add_certificatekeypair"]) 225 @extend_schema( 226 request=CertificateGenerationSerializer(), 227 responses={ 228 200: CertificateKeyPairSerializer, 229 400: OpenApiResponse(description="Bad request"), 230 }, 231 ) 232 @action(detail=False, methods=["POST"]) 233 @validate(CertificateGenerationSerializer) 234 def generate(self, request: Request, body: CertificateGenerationSerializer) -> Response: 235 """Generate a new, self-signed certificate-key pair""" 236 raw_san = body.validated_data.get("subject_alt_name", "") 237 sans = raw_san.split(",") if raw_san != "" else [] 238 builder = CertificateBuilder(body.validated_data["name"]) 239 builder.alg = body.validated_data["alg"] 240 builder.build( 241 subject_alt_names=sans, 242 validity_days=int(body.validated_data["validity_days"]), 243 ) 244 instance = builder.save() 245 serializer = self.get_serializer(instance) 246 return Response(serializer.data) 247 248 @permission_required("view_certificatekeypair_certificate") 249 @extend_schema( 250 parameters=[ 251 OpenApiParameter( 252 name="download", 253 location=OpenApiParameter.QUERY, 254 type=OpenApiTypes.BOOL, 255 ) 256 ], 257 responses={200: CertificateDataSerializer(many=False)}, 258 ) 259 @action(detail=True, pagination_class=None, permission_classes=[IsAuthenticated]) 260 def view_certificate(self, request: Request, pk: str) -> Response: 261 """Return certificate-key pairs certificate and log access""" 262 certificate: CertificateKeyPair = self.get_object() 263 if request.user.type != UserTypes.INTERNAL_SERVICE_ACCOUNT: 264 Event.new( # noqa # nosec 265 EventAction.SECRET_VIEW, 266 secret=certificate, 267 type="certificate", 268 ).from_http(request) 269 if "download" in request.query_params: 270 # Mime type from https://pki-tutorial.readthedocs.io/en/latest/mime.html 271 response = HttpResponse( 272 certificate.certificate_data, content_type="application/x-pem-file" 273 ) 274 response["Content-Disposition"] = ( 275 f'attachment; filename="{certificate.name}_certificate.pem"' 276 ) 277 return response 278 return Response(CertificateDataSerializer({"data": certificate.certificate_data}).data) 279 280 @permission_required("view_certificatekeypair_key") 281 @extend_schema( 282 parameters=[ 283 OpenApiParameter( 284 name="download", 285 location=OpenApiParameter.QUERY, 286 type=OpenApiTypes.BOOL, 287 ) 288 ], 289 responses={200: CertificateDataSerializer(many=False)}, 290 ) 291 @action(detail=True, pagination_class=None, permission_classes=[IsAuthenticated]) 292 def view_private_key(self, request: Request, pk: str) -> Response: 293 """Return certificate-key pairs private key and log access""" 294 certificate: CertificateKeyPair = self.get_object() 295 if request.user.type != UserTypes.INTERNAL_SERVICE_ACCOUNT: 296 Event.new( # noqa # nosec 297 EventAction.SECRET_VIEW, 298 secret=certificate, 299 type="private_key", 300 ).from_http(request) 301 if "download" in request.query_params: 302 # Mime type from https://pki-tutorial.readthedocs.io/en/latest/mime.html 303 response = HttpResponse(certificate.key_data, content_type="application/x-pem-file") 304 response["Content-Disposition"] = ( 305 f'attachment; filename="{certificate.name}_private_key.pem"' 306 ) 307 return response 308 return Response(CertificateDataSerializer({"data": certificate.key_data}).data)
CertificateKeyPair Viewset
serializer_class =
<class 'CertificateKeyPairSerializer'>
filterset_class =
<class 'CertificateKeyPairFilter'>
filter_backends =
[<class 'authentik.rbac.filters.SecretKeyFilter'>, <class 'rest_framework.filters.OrderingFilter'>, <class 'rest_framework.filters.SearchFilter'>]
@extend_schema(parameters=[OpenApiParameter('has_key', bool, required=False, description='Only return certificate-key pairs with keys'), OpenApiParameter('key_type', OpenApiTypes.STR, required=False, many=True, enum=[choice[0] for choice in KeyType.choices], description="Filter by key algorithm type (RSA, EC, DSA, etc). Can be specified multiple times (e.g. '?key_type=rsa&key_type=ec')")])
def
list(self, request, *args, **kwargs):
199 @extend_schema( 200 parameters=[ 201 # Override the type for `has_key` above 202 OpenApiParameter( 203 "has_key", 204 bool, 205 required=False, 206 description="Only return certificate-key pairs with keys", 207 ), 208 OpenApiParameter( 209 "key_type", 210 OpenApiTypes.STR, 211 required=False, 212 many=True, 213 enum=[choice[0] for choice in KeyType.choices], 214 description=( 215 "Filter by key algorithm type (RSA, EC, DSA, etc). " 216 "Can be specified multiple times (e.g. '?key_type=rsa&key_type=ec')" 217 ), 218 ), 219 ] 220 ) 221 def list(self, request, *args, **kwargs): 222 return super().list(request, *args, **kwargs)
@extend_schema(request=CertificateGenerationSerializer(), responses={200: CertificateKeyPairSerializer, 400: OpenApiResponse(description='Bad request')})
@action(detail=False, methods=['POST'])
@validate(CertificateGenerationSerializer)
def
generate( self, request: rest_framework.request.Request, body: CertificateGenerationSerializer) -> rest_framework.response.Response:
224 @permission_required(None, ["authentik_crypto.add_certificatekeypair"]) 225 @extend_schema( 226 request=CertificateGenerationSerializer(), 227 responses={ 228 200: CertificateKeyPairSerializer, 229 400: OpenApiResponse(description="Bad request"), 230 }, 231 ) 232 @action(detail=False, methods=["POST"]) 233 @validate(CertificateGenerationSerializer) 234 def generate(self, request: Request, body: CertificateGenerationSerializer) -> Response: 235 """Generate a new, self-signed certificate-key pair""" 236 raw_san = body.validated_data.get("subject_alt_name", "") 237 sans = raw_san.split(",") if raw_san != "" else [] 238 builder = CertificateBuilder(body.validated_data["name"]) 239 builder.alg = body.validated_data["alg"] 240 builder.build( 241 subject_alt_names=sans, 242 validity_days=int(body.validated_data["validity_days"]), 243 ) 244 instance = builder.save() 245 serializer = self.get_serializer(instance) 246 return Response(serializer.data)
Generate a new, self-signed certificate-key pair
@extend_schema(parameters=[OpenApiParameter(name='download', location=OpenApiParameter.QUERY, type=OpenApiTypes.BOOL)], responses={200: CertificateDataSerializer(many=False)})
@action(detail=True, pagination_class=None, permission_classes=[IsAuthenticated])
def
view_certificate( self, request: rest_framework.request.Request, pk: str) -> rest_framework.response.Response:
248 @permission_required("view_certificatekeypair_certificate") 249 @extend_schema( 250 parameters=[ 251 OpenApiParameter( 252 name="download", 253 location=OpenApiParameter.QUERY, 254 type=OpenApiTypes.BOOL, 255 ) 256 ], 257 responses={200: CertificateDataSerializer(many=False)}, 258 ) 259 @action(detail=True, pagination_class=None, permission_classes=[IsAuthenticated]) 260 def view_certificate(self, request: Request, pk: str) -> Response: 261 """Return certificate-key pairs certificate and log access""" 262 certificate: CertificateKeyPair = self.get_object() 263 if request.user.type != UserTypes.INTERNAL_SERVICE_ACCOUNT: 264 Event.new( # noqa # nosec 265 EventAction.SECRET_VIEW, 266 secret=certificate, 267 type="certificate", 268 ).from_http(request) 269 if "download" in request.query_params: 270 # Mime type from https://pki-tutorial.readthedocs.io/en/latest/mime.html 271 response = HttpResponse( 272 certificate.certificate_data, content_type="application/x-pem-file" 273 ) 274 response["Content-Disposition"] = ( 275 f'attachment; filename="{certificate.name}_certificate.pem"' 276 ) 277 return response 278 return Response(CertificateDataSerializer({"data": certificate.certificate_data}).data)
Return certificate-key pairs certificate and log access
@extend_schema(parameters=[OpenApiParameter(name='download', location=OpenApiParameter.QUERY, type=OpenApiTypes.BOOL)], responses={200: CertificateDataSerializer(many=False)})
@action(detail=True, pagination_class=None, permission_classes=[IsAuthenticated])
def
view_private_key( self, request: rest_framework.request.Request, pk: str) -> rest_framework.response.Response:
280 @permission_required("view_certificatekeypair_key") 281 @extend_schema( 282 parameters=[ 283 OpenApiParameter( 284 name="download", 285 location=OpenApiParameter.QUERY, 286 type=OpenApiTypes.BOOL, 287 ) 288 ], 289 responses={200: CertificateDataSerializer(many=False)}, 290 ) 291 @action(detail=True, pagination_class=None, permission_classes=[IsAuthenticated]) 292 def view_private_key(self, request: Request, pk: str) -> Response: 293 """Return certificate-key pairs private key and log access""" 294 certificate: CertificateKeyPair = self.get_object() 295 if request.user.type != UserTypes.INTERNAL_SERVICE_ACCOUNT: 296 Event.new( # noqa # nosec 297 EventAction.SECRET_VIEW, 298 secret=certificate, 299 type="private_key", 300 ).from_http(request) 301 if "download" in request.query_params: 302 # Mime type from https://pki-tutorial.readthedocs.io/en/latest/mime.html 303 response = HttpResponse(certificate.key_data, content_type="application/x-pem-file") 304 response["Content-Disposition"] = ( 305 f'attachment; filename="{certificate.name}_private_key.pem"' 306 ) 307 return response 308 return Response(CertificateDataSerializer({"data": certificate.key_data}).data)
Return certificate-key pairs private key and log access