authentik.providers.rac.api.endpoints

RAC Provider API Views

  1"""RAC Provider API Views"""
  2
  3from django.core.cache import cache
  4from django.db.models import QuerySet
  5from django.urls import reverse
  6from drf_spectacular.types import OpenApiTypes
  7from drf_spectacular.utils import OpenApiParameter, OpenApiResponse, extend_schema
  8from rest_framework.fields import SerializerMethodField
  9from rest_framework.request import Request
 10from rest_framework.response import Response
 11from rest_framework.viewsets import ModelViewSet
 12from structlog.stdlib import get_logger
 13
 14from authentik.core.api.used_by import UsedByMixin
 15from authentik.core.api.utils import ModelSerializer
 16from authentik.core.models import Provider
 17from authentik.policies.engine import PolicyEngine
 18from authentik.providers.rac.api.providers import RACProviderSerializer
 19from authentik.providers.rac.models import Endpoint
 20from authentik.rbac.filters import ObjectFilter
 21
 22LOGGER = get_logger()
 23
 24
 25def user_endpoint_cache_key(user_pk: str, provider_pk: str) -> str:
 26    """Cache key where endpoint list for user is saved"""
 27    return f"goauthentik.io/providers/rac/endpoint_access/{user_pk}/{provider_pk}"
 28
 29
 30class EndpointSerializer(ModelSerializer):
 31    """Endpoint Serializer"""
 32
 33    provider_obj = RACProviderSerializer(source="provider", read_only=True)
 34    launch_url = SerializerMethodField()
 35
 36    def get_launch_url(self, endpoint: Endpoint) -> str | None:
 37        """Build actual launch URL (the provider itself does not have one, just
 38        individual endpoints)"""
 39        try:
 40
 41            return reverse(
 42                "authentik_providers_rac:start",
 43                kwargs={"app": endpoint.provider.application.slug, "endpoint": endpoint.pk},
 44            )
 45        except Provider.application.RelatedObjectDoesNotExist:
 46            return None
 47
 48    class Meta:
 49        model = Endpoint
 50        fields = [
 51            "pk",
 52            "name",
 53            "provider",
 54            "provider_obj",
 55            "protocol",
 56            "host",
 57            "settings",
 58            "property_mappings",
 59            "auth_mode",
 60            "launch_url",
 61            "maximum_connections",
 62        ]
 63
 64
 65class EndpointViewSet(UsedByMixin, ModelViewSet):
 66    """Endpoint Viewset"""
 67
 68    queryset = Endpoint.objects.all()
 69    serializer_class = EndpointSerializer
 70    filterset_fields = ["name", "provider"]
 71    search_fields = ["name", "protocol"]
 72    ordering = ["name", "protocol"]
 73
 74    def _filter_queryset_for_list(self, queryset: QuerySet) -> QuerySet:
 75        """Custom filter_queryset method which ignores guardian, but still supports sorting"""
 76        for backend in list(self.filter_backends):
 77            if backend == ObjectFilter:
 78                continue
 79            queryset = backend().filter_queryset(self.request, queryset, self)
 80        return queryset
 81
 82    def _get_allowed_endpoints(self, queryset: QuerySet) -> list[Endpoint]:
 83        endpoints = []
 84        for endpoint in queryset:
 85            engine = PolicyEngine(endpoint, self.request.user, self.request)
 86            engine.build()
 87            if engine.passing:
 88                endpoints.append(endpoint)
 89        return endpoints
 90
 91    @extend_schema(
 92        parameters=[
 93            OpenApiParameter(
 94                "search",
 95                OpenApiTypes.STR,
 96            ),
 97            OpenApiParameter(
 98                name="superuser_full_list",
 99                location=OpenApiParameter.QUERY,
100                type=OpenApiTypes.BOOL,
101            ),
102        ],
103        responses={
104            200: EndpointSerializer(many=True),
105            400: OpenApiResponse(description="Bad request"),
106        },
107    )
108    def list(self, request: Request, *args, **kwargs) -> Response:
109        """List accessible endpoints"""
110        should_cache = request.GET.get("search", "") == "" and "provider" in request.query_params
111
112        superuser_full_list = str(request.GET.get("superuser_full_list", "false")).lower() == "true"
113        if superuser_full_list and request.user.is_superuser:
114            return super().list(request)
115
116        queryset = self._filter_queryset_for_list(self.get_queryset())
117        self.paginate_queryset(queryset)
118
119        allowed_endpoints = []
120        if not should_cache:
121            allowed_endpoints = self._get_allowed_endpoints(queryset)
122        if should_cache:
123            provider = request.query_params.get("provider")
124            allowed_endpoints = cache.get(user_endpoint_cache_key(self.request.user.pk, provider))
125            if not allowed_endpoints:
126                LOGGER.debug("Caching allowed endpoint list")
127                allowed_endpoints = self._get_allowed_endpoints(queryset)
128                cache.set(
129                    user_endpoint_cache_key(self.request.user.pk, provider),
130                    allowed_endpoints,
131                    timeout=86400,
132                )
133        serializer = self.get_serializer(allowed_endpoints, many=True)
134        return self.get_paginated_response(serializer.data)
LOGGER = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
def user_endpoint_cache_key(user_pk: str, provider_pk: str) -> str:
26def user_endpoint_cache_key(user_pk: str, provider_pk: str) -> str:
27    """Cache key where endpoint list for user is saved"""
28    return f"goauthentik.io/providers/rac/endpoint_access/{user_pk}/{provider_pk}"

Cache key where endpoint list for user is saved

class EndpointSerializer(authentik.core.api.utils.ModelSerializer):
31class EndpointSerializer(ModelSerializer):
32    """Endpoint Serializer"""
33
34    provider_obj = RACProviderSerializer(source="provider", read_only=True)
35    launch_url = SerializerMethodField()
36
37    def get_launch_url(self, endpoint: Endpoint) -> str | None:
38        """Build actual launch URL (the provider itself does not have one, just
39        individual endpoints)"""
40        try:
41
42            return reverse(
43                "authentik_providers_rac:start",
44                kwargs={"app": endpoint.provider.application.slug, "endpoint": endpoint.pk},
45            )
46        except Provider.application.RelatedObjectDoesNotExist:
47            return None
48
49    class Meta:
50        model = Endpoint
51        fields = [
52            "pk",
53            "name",
54            "provider",
55            "provider_obj",
56            "protocol",
57            "host",
58            "settings",
59            "property_mappings",
60            "auth_mode",
61            "launch_url",
62            "maximum_connections",
63        ]

Endpoint Serializer

provider_obj
launch_url
def get_launch_url(self, endpoint: authentik.providers.rac.models.Endpoint) -> str | None:
37    def get_launch_url(self, endpoint: Endpoint) -> str | None:
38        """Build actual launch URL (the provider itself does not have one, just
39        individual endpoints)"""
40        try:
41
42            return reverse(
43                "authentik_providers_rac:start",
44                kwargs={"app": endpoint.provider.application.slug, "endpoint": endpoint.pk},
45            )
46        except Provider.application.RelatedObjectDoesNotExist:
47            return None

Build actual launch URL (the provider itself does not have one, just individual endpoints)

class EndpointSerializer.Meta:
49    class Meta:
50        model = Endpoint
51        fields = [
52            "pk",
53            "name",
54            "provider",
55            "provider_obj",
56            "protocol",
57            "host",
58            "settings",
59            "property_mappings",
60            "auth_mode",
61            "launch_url",
62            "maximum_connections",
63        ]
fields = ['pk', 'name', 'provider', 'provider_obj', 'protocol', 'host', 'settings', 'property_mappings', 'auth_mode', 'launch_url', 'maximum_connections']
class EndpointViewSet(authentik.core.api.used_by.UsedByMixin, rest_framework.viewsets.ModelViewSet):
 66class EndpointViewSet(UsedByMixin, ModelViewSet):
 67    """Endpoint Viewset"""
 68
 69    queryset = Endpoint.objects.all()
 70    serializer_class = EndpointSerializer
 71    filterset_fields = ["name", "provider"]
 72    search_fields = ["name", "protocol"]
 73    ordering = ["name", "protocol"]
 74
 75    def _filter_queryset_for_list(self, queryset: QuerySet) -> QuerySet:
 76        """Custom filter_queryset method which ignores guardian, but still supports sorting"""
 77        for backend in list(self.filter_backends):
 78            if backend == ObjectFilter:
 79                continue
 80            queryset = backend().filter_queryset(self.request, queryset, self)
 81        return queryset
 82
 83    def _get_allowed_endpoints(self, queryset: QuerySet) -> list[Endpoint]:
 84        endpoints = []
 85        for endpoint in queryset:
 86            engine = PolicyEngine(endpoint, self.request.user, self.request)
 87            engine.build()
 88            if engine.passing:
 89                endpoints.append(endpoint)
 90        return endpoints
 91
 92    @extend_schema(
 93        parameters=[
 94            OpenApiParameter(
 95                "search",
 96                OpenApiTypes.STR,
 97            ),
 98            OpenApiParameter(
 99                name="superuser_full_list",
100                location=OpenApiParameter.QUERY,
101                type=OpenApiTypes.BOOL,
102            ),
103        ],
104        responses={
105            200: EndpointSerializer(many=True),
106            400: OpenApiResponse(description="Bad request"),
107        },
108    )
109    def list(self, request: Request, *args, **kwargs) -> Response:
110        """List accessible endpoints"""
111        should_cache = request.GET.get("search", "") == "" and "provider" in request.query_params
112
113        superuser_full_list = str(request.GET.get("superuser_full_list", "false")).lower() == "true"
114        if superuser_full_list and request.user.is_superuser:
115            return super().list(request)
116
117        queryset = self._filter_queryset_for_list(self.get_queryset())
118        self.paginate_queryset(queryset)
119
120        allowed_endpoints = []
121        if not should_cache:
122            allowed_endpoints = self._get_allowed_endpoints(queryset)
123        if should_cache:
124            provider = request.query_params.get("provider")
125            allowed_endpoints = cache.get(user_endpoint_cache_key(self.request.user.pk, provider))
126            if not allowed_endpoints:
127                LOGGER.debug("Caching allowed endpoint list")
128                allowed_endpoints = self._get_allowed_endpoints(queryset)
129                cache.set(
130                    user_endpoint_cache_key(self.request.user.pk, provider),
131                    allowed_endpoints,
132                    timeout=86400,
133                )
134        serializer = self.get_serializer(allowed_endpoints, many=True)
135        return self.get_paginated_response(serializer.data)

Endpoint Viewset

queryset = <InheritanceQuerySet []>
serializer_class = <class 'EndpointSerializer'>
filterset_fields = ['name', 'provider']
search_fields = ['name', 'protocol']
ordering = ['name', 'protocol']
@extend_schema(parameters=[OpenApiParameter('search', OpenApiTypes.STR), OpenApiParameter(name='superuser_full_list', location=OpenApiParameter.QUERY, type=OpenApiTypes.BOOL)], responses={200: EndpointSerializer(many=True), 400: OpenApiResponse(description='Bad request')})
def list( self, request: rest_framework.request.Request, *args, **kwargs) -> rest_framework.response.Response:
 92    @extend_schema(
 93        parameters=[
 94            OpenApiParameter(
 95                "search",
 96                OpenApiTypes.STR,
 97            ),
 98            OpenApiParameter(
 99                name="superuser_full_list",
100                location=OpenApiParameter.QUERY,
101                type=OpenApiTypes.BOOL,
102            ),
103        ],
104        responses={
105            200: EndpointSerializer(many=True),
106            400: OpenApiResponse(description="Bad request"),
107        },
108    )
109    def list(self, request: Request, *args, **kwargs) -> Response:
110        """List accessible endpoints"""
111        should_cache = request.GET.get("search", "") == "" and "provider" in request.query_params
112
113        superuser_full_list = str(request.GET.get("superuser_full_list", "false")).lower() == "true"
114        if superuser_full_list and request.user.is_superuser:
115            return super().list(request)
116
117        queryset = self._filter_queryset_for_list(self.get_queryset())
118        self.paginate_queryset(queryset)
119
120        allowed_endpoints = []
121        if not should_cache:
122            allowed_endpoints = self._get_allowed_endpoints(queryset)
123        if should_cache:
124            provider = request.query_params.get("provider")
125            allowed_endpoints = cache.get(user_endpoint_cache_key(self.request.user.pk, provider))
126            if not allowed_endpoints:
127                LOGGER.debug("Caching allowed endpoint list")
128                allowed_endpoints = self._get_allowed_endpoints(queryset)
129                cache.set(
130                    user_endpoint_cache_key(self.request.user.pk, provider),
131                    allowed_endpoints,
132                    timeout=86400,
133                )
134        serializer = self.get_serializer(allowed_endpoints, many=True)
135        return self.get_paginated_response(serializer.data)

List accessible endpoints

name = None
description = None
suffix = None
detail = None
basename = None