authentik.admin.api.system

authentik administration overview

  1"""authentik administration overview"""
  2
  3import platform
  4from datetime import datetime
  5from ssl import OPENSSL_VERSION
  6from sys import version as python_version
  7from typing import TypedDict
  8
  9from cryptography.hazmat.backends.openssl.backend import backend
 10from django.conf import settings
 11from django.utils.timezone import now
 12from django.views.debug import SafeExceptionReporterFilter
 13from drf_spectacular.utils import extend_schema
 14from rest_framework.fields import SerializerMethodField
 15from rest_framework.request import Request
 16from rest_framework.response import Response
 17from rest_framework.views import APIView
 18
 19from authentik import authentik_full_version
 20from authentik.core.api.utils import PassiveSerializer
 21from authentik.lib.config import CONFIG
 22from authentik.lib.utils.reflection import get_env
 23from authentik.outposts.apps import MANAGED_OUTPOST
 24from authentik.outposts.models import Outpost
 25from authentik.rbac.permissions import HasPermission
 26
 27
 28def fips_enabled():
 29    try:
 30        from authentik.enterprise.license import LicenseKey
 31
 32        return backend._fips_enabled if LicenseKey.get_total().status().is_valid else None
 33    except ModuleNotFoundError:
 34        return None
 35
 36
 37class RuntimeDict(TypedDict):
 38    """Runtime information"""
 39
 40    python_version: str
 41    environment: str
 42    architecture: str
 43    platform: str
 44    uname: str
 45    openssl_version: str
 46    openssl_fips_enabled: bool | None
 47    authentik_version: str
 48
 49
 50class SystemInfoSerializer(PassiveSerializer):
 51    """Get system information."""
 52
 53    http_headers = SerializerMethodField()
 54    http_host = SerializerMethodField()
 55    http_is_secure = SerializerMethodField()
 56    runtime = SerializerMethodField()
 57    brand = SerializerMethodField()
 58    server_time = SerializerMethodField()
 59    embedded_outpost_disabled = SerializerMethodField()
 60    embedded_outpost_host = SerializerMethodField()
 61
 62    def get_http_headers(self, request: Request) -> dict[str, str]:
 63        """Get HTTP Request headers"""
 64        headers = {}
 65        raw_session = request._request.COOKIES.get(settings.SESSION_COOKIE_NAME)
 66        for key, value in request.META.items():
 67            if not isinstance(value, str):
 68                continue
 69            actual_value = value
 70            if raw_session is not None and raw_session in actual_value:
 71                actual_value = actual_value.replace(
 72                    raw_session, SafeExceptionReporterFilter.cleansed_substitute
 73                )
 74            headers[key] = actual_value
 75        return headers
 76
 77    def get_http_host(self, request: Request) -> str:
 78        """Get HTTP host"""
 79        return request._request.get_host()
 80
 81    def get_http_is_secure(self, request: Request) -> bool:
 82        """Get HTTP Secure flag"""
 83        return request._request.is_secure()
 84
 85    def get_runtime(self, request: Request) -> RuntimeDict:
 86        """Get versions"""
 87        return {
 88            "architecture": platform.machine(),
 89            "authentik_version": authentik_full_version(),
 90            "environment": get_env(),
 91            "openssl_fips_enabled": fips_enabled(),
 92            "openssl_version": OPENSSL_VERSION,
 93            "platform": platform.platform(),
 94            "python_version": python_version,
 95            "uname": " ".join(platform.uname()),
 96        }
 97
 98    def get_brand(self, request: Request) -> str:
 99        """Currently active brand"""
100        return str(request._request.brand)
101
102    def get_server_time(self, request: Request) -> datetime:
103        """Current server time"""
104        return now()
105
106    def get_embedded_outpost_disabled(self, request: Request) -> bool:
107        """Whether the embedded outpost is disabled"""
108        return CONFIG.get_bool("outposts.disable_embedded_outpost", False)
109
110    def get_embedded_outpost_host(self, request: Request) -> str:
111        """Get the FQDN configured on the embedded outpost"""
112        outposts = Outpost.objects.filter(managed=MANAGED_OUTPOST)
113        if not outposts.exists():  # pragma: no cover
114            return ""
115        return outposts.first().config.authentik_host
116
117
118class SystemView(APIView):
119    """Get system information."""
120
121    permission_classes = [HasPermission("authentik_rbac.view_system_info")]
122    pagination_class = None
123    filter_backends = []
124    serializer_class = SystemInfoSerializer
125
126    @extend_schema(responses={200: SystemInfoSerializer(many=False)})
127    def get(self, request: Request) -> Response:
128        """Get system information."""
129        return Response(SystemInfoSerializer(request).data)
130
131    @extend_schema(responses={200: SystemInfoSerializer(many=False)})
132    def post(self, request: Request) -> Response:
133        """Get system information."""
134        return Response(SystemInfoSerializer(request).data)
def fips_enabled():
29def fips_enabled():
30    try:
31        from authentik.enterprise.license import LicenseKey
32
33        return backend._fips_enabled if LicenseKey.get_total().status().is_valid else None
34    except ModuleNotFoundError:
35        return None
class RuntimeDict(typing.TypedDict):
38class RuntimeDict(TypedDict):
39    """Runtime information"""
40
41    python_version: str
42    environment: str
43    architecture: str
44    platform: str
45    uname: str
46    openssl_version: str
47    openssl_fips_enabled: bool | None
48    authentik_version: str

Runtime information

python_version: str
environment: str
architecture: str
platform: str
uname: str
openssl_version: str
openssl_fips_enabled: bool | None
authentik_version: str
class SystemInfoSerializer(authentik.core.api.utils.PassiveSerializer):
 51class SystemInfoSerializer(PassiveSerializer):
 52    """Get system information."""
 53
 54    http_headers = SerializerMethodField()
 55    http_host = SerializerMethodField()
 56    http_is_secure = SerializerMethodField()
 57    runtime = SerializerMethodField()
 58    brand = SerializerMethodField()
 59    server_time = SerializerMethodField()
 60    embedded_outpost_disabled = SerializerMethodField()
 61    embedded_outpost_host = SerializerMethodField()
 62
 63    def get_http_headers(self, request: Request) -> dict[str, str]:
 64        """Get HTTP Request headers"""
 65        headers = {}
 66        raw_session = request._request.COOKIES.get(settings.SESSION_COOKIE_NAME)
 67        for key, value in request.META.items():
 68            if not isinstance(value, str):
 69                continue
 70            actual_value = value
 71            if raw_session is not None and raw_session in actual_value:
 72                actual_value = actual_value.replace(
 73                    raw_session, SafeExceptionReporterFilter.cleansed_substitute
 74                )
 75            headers[key] = actual_value
 76        return headers
 77
 78    def get_http_host(self, request: Request) -> str:
 79        """Get HTTP host"""
 80        return request._request.get_host()
 81
 82    def get_http_is_secure(self, request: Request) -> bool:
 83        """Get HTTP Secure flag"""
 84        return request._request.is_secure()
 85
 86    def get_runtime(self, request: Request) -> RuntimeDict:
 87        """Get versions"""
 88        return {
 89            "architecture": platform.machine(),
 90            "authentik_version": authentik_full_version(),
 91            "environment": get_env(),
 92            "openssl_fips_enabled": fips_enabled(),
 93            "openssl_version": OPENSSL_VERSION,
 94            "platform": platform.platform(),
 95            "python_version": python_version,
 96            "uname": " ".join(platform.uname()),
 97        }
 98
 99    def get_brand(self, request: Request) -> str:
100        """Currently active brand"""
101        return str(request._request.brand)
102
103    def get_server_time(self, request: Request) -> datetime:
104        """Current server time"""
105        return now()
106
107    def get_embedded_outpost_disabled(self, request: Request) -> bool:
108        """Whether the embedded outpost is disabled"""
109        return CONFIG.get_bool("outposts.disable_embedded_outpost", False)
110
111    def get_embedded_outpost_host(self, request: Request) -> str:
112        """Get the FQDN configured on the embedded outpost"""
113        outposts = Outpost.objects.filter(managed=MANAGED_OUTPOST)
114        if not outposts.exists():  # pragma: no cover
115            return ""
116        return outposts.first().config.authentik_host

Get system information.

http_headers
http_host
http_is_secure
runtime
brand
server_time
embedded_outpost_disabled
embedded_outpost_host
def get_http_headers(self, request: rest_framework.request.Request) -> dict[str, str]:
63    def get_http_headers(self, request: Request) -> dict[str, str]:
64        """Get HTTP Request headers"""
65        headers = {}
66        raw_session = request._request.COOKIES.get(settings.SESSION_COOKIE_NAME)
67        for key, value in request.META.items():
68            if not isinstance(value, str):
69                continue
70            actual_value = value
71            if raw_session is not None and raw_session in actual_value:
72                actual_value = actual_value.replace(
73                    raw_session, SafeExceptionReporterFilter.cleansed_substitute
74                )
75            headers[key] = actual_value
76        return headers

Get HTTP Request headers

def get_http_host(self, request: rest_framework.request.Request) -> str:
78    def get_http_host(self, request: Request) -> str:
79        """Get HTTP host"""
80        return request._request.get_host()

Get HTTP host

def get_http_is_secure(self, request: rest_framework.request.Request) -> bool:
82    def get_http_is_secure(self, request: Request) -> bool:
83        """Get HTTP Secure flag"""
84        return request._request.is_secure()

Get HTTP Secure flag

def get_runtime( self, request: rest_framework.request.Request) -> RuntimeDict:
86    def get_runtime(self, request: Request) -> RuntimeDict:
87        """Get versions"""
88        return {
89            "architecture": platform.machine(),
90            "authentik_version": authentik_full_version(),
91            "environment": get_env(),
92            "openssl_fips_enabled": fips_enabled(),
93            "openssl_version": OPENSSL_VERSION,
94            "platform": platform.platform(),
95            "python_version": python_version,
96            "uname": " ".join(platform.uname()),
97        }

Get versions

def get_brand(self, request: rest_framework.request.Request) -> str:
 99    def get_brand(self, request: Request) -> str:
100        """Currently active brand"""
101        return str(request._request.brand)

Currently active brand

def get_server_time(self, request: rest_framework.request.Request) -> datetime.datetime:
103    def get_server_time(self, request: Request) -> datetime:
104        """Current server time"""
105        return now()

Current server time

def get_embedded_outpost_disabled(self, request: rest_framework.request.Request) -> bool:
107    def get_embedded_outpost_disabled(self, request: Request) -> bool:
108        """Whether the embedded outpost is disabled"""
109        return CONFIG.get_bool("outposts.disable_embedded_outpost", False)

Whether the embedded outpost is disabled

def get_embedded_outpost_host(self, request: rest_framework.request.Request) -> str:
111    def get_embedded_outpost_host(self, request: Request) -> str:
112        """Get the FQDN configured on the embedded outpost"""
113        outposts = Outpost.objects.filter(managed=MANAGED_OUTPOST)
114        if not outposts.exists():  # pragma: no cover
115            return ""
116        return outposts.first().config.authentik_host

Get the FQDN configured on the embedded outpost

class SystemView(rest_framework.views.APIView):
119class SystemView(APIView):
120    """Get system information."""
121
122    permission_classes = [HasPermission("authentik_rbac.view_system_info")]
123    pagination_class = None
124    filter_backends = []
125    serializer_class = SystemInfoSerializer
126
127    @extend_schema(responses={200: SystemInfoSerializer(many=False)})
128    def get(self, request: Request) -> Response:
129        """Get system information."""
130        return Response(SystemInfoSerializer(request).data)
131
132    @extend_schema(responses={200: SystemInfoSerializer(many=False)})
133    def post(self, request: Request) -> Response:
134        """Get system information."""
135        return Response(SystemInfoSerializer(request).data)

Get system information.

permission_classes = [<class 'authentik.rbac.permissions.HasPermission.<locals>.checker'>]
pagination_class = None
filter_backends = []
serializer_class = <class 'SystemInfoSerializer'>
@extend_schema(responses={200: SystemInfoSerializer(many=False)})
def get( self, request: rest_framework.request.Request) -> rest_framework.response.Response:
127    @extend_schema(responses={200: SystemInfoSerializer(many=False)})
128    def get(self, request: Request) -> Response:
129        """Get system information."""
130        return Response(SystemInfoSerializer(request).data)

Get system information.

@extend_schema(responses={200: SystemInfoSerializer(many=False)})
def post( self, request: rest_framework.request.Request) -> rest_framework.response.Response:
132    @extend_schema(responses={200: SystemInfoSerializer(many=False)})
133    def post(self, request: Request) -> Response:
134        """Get system information."""
135        return Response(SystemInfoSerializer(request).data)

Get system information.