authentik.providers.rac.api.endpoints
RAC Provider API Views
1"""RAC Provider API Views""" 2 3from django.core.cache import cache 4from django.db.models import QuerySet 5from django.urls import reverse 6from drf_spectacular.types import OpenApiTypes 7from drf_spectacular.utils import OpenApiParameter, OpenApiResponse, extend_schema 8from rest_framework.fields import SerializerMethodField 9from rest_framework.request import Request 10from rest_framework.response import Response 11from rest_framework.viewsets import ModelViewSet 12from structlog.stdlib import get_logger 13 14from authentik.core.api.used_by import UsedByMixin 15from authentik.core.api.utils import ModelSerializer 16from authentik.core.models import Provider 17from authentik.policies.engine import PolicyEngine 18from authentik.providers.rac.api.providers import RACProviderSerializer 19from authentik.providers.rac.models import Endpoint 20from authentik.rbac.filters import ObjectFilter 21 22LOGGER = get_logger() 23 24 25def user_endpoint_cache_key(user_pk: str, provider_pk: str) -> str: 26 """Cache key where endpoint list for user is saved""" 27 return f"goauthentik.io/providers/rac/endpoint_access/{user_pk}/{provider_pk}" 28 29 30class EndpointSerializer(ModelSerializer): 31 """Endpoint Serializer""" 32 33 provider_obj = RACProviderSerializer(source="provider", read_only=True) 34 launch_url = SerializerMethodField() 35 36 def get_launch_url(self, endpoint: Endpoint) -> str | None: 37 """Build actual launch URL (the provider itself does not have one, just 38 individual endpoints)""" 39 try: 40 41 return reverse( 42 "authentik_providers_rac:start", 43 kwargs={"app": endpoint.provider.application.slug, "endpoint": endpoint.pk}, 44 ) 45 except Provider.application.RelatedObjectDoesNotExist: 46 return None 47 48 class Meta: 49 model = Endpoint 50 fields = [ 51 "pk", 52 "name", 53 "provider", 54 "provider_obj", 55 "protocol", 56 "host", 57 "settings", 58 "property_mappings", 59 "auth_mode", 60 "launch_url", 61 "maximum_connections", 62 ] 63 64 65class EndpointViewSet(UsedByMixin, ModelViewSet): 66 """Endpoint Viewset""" 67 68 queryset = Endpoint.objects.all() 69 serializer_class = EndpointSerializer 70 filterset_fields = ["name", "provider"] 71 search_fields = ["name", "protocol"] 72 ordering = ["name", "protocol"] 73 74 def _filter_queryset_for_list(self, queryset: QuerySet) -> QuerySet: 75 """Custom filter_queryset method which ignores guardian, but still supports sorting""" 76 for backend in list(self.filter_backends): 77 if backend == ObjectFilter: 78 continue 79 queryset = backend().filter_queryset(self.request, queryset, self) 80 return queryset 81 82 def _get_allowed_endpoints(self, queryset: QuerySet) -> list[Endpoint]: 83 endpoints = [] 84 for endpoint in queryset: 85 engine = PolicyEngine(endpoint, self.request.user, self.request) 86 engine.build() 87 if engine.passing: 88 endpoints.append(endpoint) 89 return endpoints 90 91 @extend_schema( 92 parameters=[ 93 OpenApiParameter( 94 "search", 95 OpenApiTypes.STR, 96 ), 97 OpenApiParameter( 98 name="superuser_full_list", 99 location=OpenApiParameter.QUERY, 100 type=OpenApiTypes.BOOL, 101 ), 102 ], 103 responses={ 104 200: EndpointSerializer(many=True), 105 400: OpenApiResponse(description="Bad request"), 106 }, 107 ) 108 def list(self, request: Request, *args, **kwargs) -> Response: 109 """List accessible endpoints""" 110 should_cache = request.GET.get("search", "") == "" and "provider" in request.query_params 111 112 superuser_full_list = str(request.GET.get("superuser_full_list", "false")).lower() == "true" 113 if superuser_full_list and request.user.is_superuser: 114 return super().list(request) 115 116 queryset = self._filter_queryset_for_list(self.get_queryset()) 117 self.paginate_queryset(queryset) 118 119 allowed_endpoints = [] 120 if not should_cache: 121 allowed_endpoints = self._get_allowed_endpoints(queryset) 122 if should_cache: 123 provider = request.query_params.get("provider") 124 allowed_endpoints = cache.get(user_endpoint_cache_key(self.request.user.pk, provider)) 125 if not allowed_endpoints: 126 LOGGER.debug("Caching allowed endpoint list") 127 allowed_endpoints = self._get_allowed_endpoints(queryset) 128 cache.set( 129 user_endpoint_cache_key(self.request.user.pk, provider), 130 allowed_endpoints, 131 timeout=86400, 132 ) 133 serializer = self.get_serializer(allowed_endpoints, many=True) 134 return self.get_paginated_response(serializer.data)
LOGGER =
<BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
def
user_endpoint_cache_key(user_pk: str, provider_pk: str) -> str:
26def user_endpoint_cache_key(user_pk: str, provider_pk: str) -> str: 27 """Cache key where endpoint list for user is saved""" 28 return f"goauthentik.io/providers/rac/endpoint_access/{user_pk}/{provider_pk}"
Cache key where endpoint list for user is saved
31class EndpointSerializer(ModelSerializer): 32 """Endpoint Serializer""" 33 34 provider_obj = RACProviderSerializer(source="provider", read_only=True) 35 launch_url = SerializerMethodField() 36 37 def get_launch_url(self, endpoint: Endpoint) -> str | None: 38 """Build actual launch URL (the provider itself does not have one, just 39 individual endpoints)""" 40 try: 41 42 return reverse( 43 "authentik_providers_rac:start", 44 kwargs={"app": endpoint.provider.application.slug, "endpoint": endpoint.pk}, 45 ) 46 except Provider.application.RelatedObjectDoesNotExist: 47 return None 48 49 class Meta: 50 model = Endpoint 51 fields = [ 52 "pk", 53 "name", 54 "provider", 55 "provider_obj", 56 "protocol", 57 "host", 58 "settings", 59 "property_mappings", 60 "auth_mode", 61 "launch_url", 62 "maximum_connections", 63 ]
Endpoint Serializer
37 def get_launch_url(self, endpoint: Endpoint) -> str | None: 38 """Build actual launch URL (the provider itself does not have one, just 39 individual endpoints)""" 40 try: 41 42 return reverse( 43 "authentik_providers_rac:start", 44 kwargs={"app": endpoint.provider.application.slug, "endpoint": endpoint.pk}, 45 ) 46 except Provider.application.RelatedObjectDoesNotExist: 47 return None
Build actual launch URL (the provider itself does not have one, just individual endpoints)
Inherited Members
class
EndpointSerializer.Meta:
49 class Meta: 50 model = Endpoint 51 fields = [ 52 "pk", 53 "name", 54 "provider", 55 "provider_obj", 56 "protocol", 57 "host", 58 "settings", 59 "property_mappings", 60 "auth_mode", 61 "launch_url", 62 "maximum_connections", 63 ]
model =
<class 'authentik.providers.rac.models.Endpoint'>
class
EndpointViewSet(authentik.core.api.used_by.UsedByMixin, rest_framework.viewsets.ModelViewSet):
66class EndpointViewSet(UsedByMixin, ModelViewSet): 67 """Endpoint Viewset""" 68 69 queryset = Endpoint.objects.all() 70 serializer_class = EndpointSerializer 71 filterset_fields = ["name", "provider"] 72 search_fields = ["name", "protocol"] 73 ordering = ["name", "protocol"] 74 75 def _filter_queryset_for_list(self, queryset: QuerySet) -> QuerySet: 76 """Custom filter_queryset method which ignores guardian, but still supports sorting""" 77 for backend in list(self.filter_backends): 78 if backend == ObjectFilter: 79 continue 80 queryset = backend().filter_queryset(self.request, queryset, self) 81 return queryset 82 83 def _get_allowed_endpoints(self, queryset: QuerySet) -> list[Endpoint]: 84 endpoints = [] 85 for endpoint in queryset: 86 engine = PolicyEngine(endpoint, self.request.user, self.request) 87 engine.build() 88 if engine.passing: 89 endpoints.append(endpoint) 90 return endpoints 91 92 @extend_schema( 93 parameters=[ 94 OpenApiParameter( 95 "search", 96 OpenApiTypes.STR, 97 ), 98 OpenApiParameter( 99 name="superuser_full_list", 100 location=OpenApiParameter.QUERY, 101 type=OpenApiTypes.BOOL, 102 ), 103 ], 104 responses={ 105 200: EndpointSerializer(many=True), 106 400: OpenApiResponse(description="Bad request"), 107 }, 108 ) 109 def list(self, request: Request, *args, **kwargs) -> Response: 110 """List accessible endpoints""" 111 should_cache = request.GET.get("search", "") == "" and "provider" in request.query_params 112 113 superuser_full_list = str(request.GET.get("superuser_full_list", "false")).lower() == "true" 114 if superuser_full_list and request.user.is_superuser: 115 return super().list(request) 116 117 queryset = self._filter_queryset_for_list(self.get_queryset()) 118 self.paginate_queryset(queryset) 119 120 allowed_endpoints = [] 121 if not should_cache: 122 allowed_endpoints = self._get_allowed_endpoints(queryset) 123 if should_cache: 124 provider = request.query_params.get("provider") 125 allowed_endpoints = cache.get(user_endpoint_cache_key(self.request.user.pk, provider)) 126 if not allowed_endpoints: 127 LOGGER.debug("Caching allowed endpoint list") 128 allowed_endpoints = self._get_allowed_endpoints(queryset) 129 cache.set( 130 user_endpoint_cache_key(self.request.user.pk, provider), 131 allowed_endpoints, 132 timeout=86400, 133 ) 134 serializer = self.get_serializer(allowed_endpoints, many=True) 135 return self.get_paginated_response(serializer.data)
Endpoint Viewset
serializer_class =
<class 'EndpointSerializer'>
@extend_schema(parameters=[OpenApiParameter('search', OpenApiTypes.STR), OpenApiParameter(name='superuser_full_list', location=OpenApiParameter.QUERY, type=OpenApiTypes.BOOL)], responses={200: EndpointSerializer(many=True), 400: OpenApiResponse(description='Bad request')})
def
list( self, request: rest_framework.request.Request, *args, **kwargs) -> rest_framework.response.Response:
92 @extend_schema( 93 parameters=[ 94 OpenApiParameter( 95 "search", 96 OpenApiTypes.STR, 97 ), 98 OpenApiParameter( 99 name="superuser_full_list", 100 location=OpenApiParameter.QUERY, 101 type=OpenApiTypes.BOOL, 102 ), 103 ], 104 responses={ 105 200: EndpointSerializer(many=True), 106 400: OpenApiResponse(description="Bad request"), 107 }, 108 ) 109 def list(self, request: Request, *args, **kwargs) -> Response: 110 """List accessible endpoints""" 111 should_cache = request.GET.get("search", "") == "" and "provider" in request.query_params 112 113 superuser_full_list = str(request.GET.get("superuser_full_list", "false")).lower() == "true" 114 if superuser_full_list and request.user.is_superuser: 115 return super().list(request) 116 117 queryset = self._filter_queryset_for_list(self.get_queryset()) 118 self.paginate_queryset(queryset) 119 120 allowed_endpoints = [] 121 if not should_cache: 122 allowed_endpoints = self._get_allowed_endpoints(queryset) 123 if should_cache: 124 provider = request.query_params.get("provider") 125 allowed_endpoints = cache.get(user_endpoint_cache_key(self.request.user.pk, provider)) 126 if not allowed_endpoints: 127 LOGGER.debug("Caching allowed endpoint list") 128 allowed_endpoints = self._get_allowed_endpoints(queryset) 129 cache.set( 130 user_endpoint_cache_key(self.request.user.pk, provider), 131 allowed_endpoints, 132 timeout=86400, 133 ) 134 serializer = self.get_serializer(allowed_endpoints, many=True) 135 return self.get_paginated_response(serializer.data)
List accessible endpoints