authentik.core.api.applications
Application API Views
1"""Application API Views""" 2 3from collections.abc import Iterator 4from copy import copy 5 6from django.core.cache import cache 7from django.db.models import Case, QuerySet 8from django.db.models.expressions import When 9from django.shortcuts import get_object_or_404 10from django.utils.translation import gettext as _ 11from drf_spectacular.types import OpenApiTypes 12from drf_spectacular.utils import OpenApiParameter, extend_schema 13from guardian.shortcuts import get_objects_for_user 14from rest_framework.decorators import action 15from rest_framework.exceptions import ValidationError 16from rest_framework.fields import CharField, ReadOnlyField, SerializerMethodField 17from rest_framework.request import Request 18from rest_framework.response import Response 19from rest_framework.viewsets import ModelViewSet 20from structlog.stdlib import get_logger 21 22from authentik.api.pagination import Pagination 23from authentik.blueprints.v1.importer import SERIALIZER_CONTEXT_BLUEPRINT 24from authentik.core.api.providers import ProviderSerializer 25from authentik.core.api.used_by import UsedByMixin 26from authentik.core.api.users import UserSerializer 27from authentik.core.api.utils import ModelSerializer, ThemedUrlsSerializer 28from authentik.core.apps import AppAccessWithoutBindings 29from authentik.core.models import Application, User 30from authentik.events.logs import LogEventSerializer, capture_logs 31from authentik.policies.api.exec import PolicyTestResultSerializer 32from authentik.policies.engine import PolicyEngine 33from authentik.policies.types import CACHE_PREFIX, PolicyResult 34from authentik.rbac.filters import ObjectFilter 35 36LOGGER = get_logger() 37 38 39def user_app_cache_key(user_pk: str, page_number: int | None = None) -> str: 40 """Cache key where application list for user is saved""" 41 key = f"{CACHE_PREFIX}app_access/{user_pk}" 42 if page_number: 43 key += f"/{page_number}" 44 return key 45 46 47class ApplicationSerializer(ModelSerializer): 48 """Application Serializer""" 49 50 launch_url = SerializerMethodField() 51 provider_obj = ProviderSerializer( 52 source="get_provider", 53 required=False, 54 read_only=True, 55 allow_null=True, 56 ) 57 backchannel_providers_obj = ProviderSerializer( 58 source="backchannel_providers", required=False, read_only=True, many=True 59 ) 60 61 meta_icon_url = ReadOnlyField(source="get_meta_icon") 62 meta_icon_themed_urls = ThemedUrlsSerializer( 63 source="get_meta_icon_themed_urls", read_only=True, allow_null=True 64 ) 65 66 def get_launch_url(self, app: Application) -> str | None: 67 """Allow formatting of launch URL""" 68 user = None 69 user_data = None 70 71 if "request" in self.context: 72 user = self.context["request"].user 73 74 # Cache serialized user data to avoid N+1 when formatting launch URLs 75 # for multiple applications. UserSerializer accesses user.groups which 76 # would otherwise trigger a query for each application. 77 if user is not None: 78 if "_cached_user_data" not in self.context: 79 # Prefetch groups to avoid N+1 80 self.context["_cached_user_data"] = UserSerializer(instance=user).data 81 user_data = self.context["_cached_user_data"] 82 83 return app.get_launch_url(user, user_data=user_data) 84 85 def validate_slug(self, slug: str) -> str: 86 if slug in Application.reserved_slugs: 87 raise ValidationError( 88 _("The slug '{slug}' is reserved and cannot be used for applications.").format( 89 slug=slug 90 ) 91 ) 92 return slug 93 94 def __init__(self, *args, **kwargs) -> None: 95 super().__init__(*args, **kwargs) 96 if SERIALIZER_CONTEXT_BLUEPRINT in self.context: 97 self.fields["icon"] = CharField(source="meta_icon", required=False) 98 99 class Meta: 100 model = Application 101 fields = [ 102 "pk", 103 "name", 104 "slug", 105 "provider", 106 "provider_obj", 107 "backchannel_providers", 108 "backchannel_providers_obj", 109 "launch_url", 110 "open_in_new_tab", 111 "meta_launch_url", 112 "meta_icon", 113 "meta_icon_url", 114 "meta_icon_themed_urls", 115 "meta_description", 116 "meta_publisher", 117 "policy_engine_mode", 118 "group", 119 ] 120 extra_kwargs = { 121 "backchannel_providers": {"required": False}, 122 } 123 124 125class ApplicationViewSet(UsedByMixin, ModelViewSet): 126 """Application Viewset""" 127 128 queryset = ( 129 Application.objects.all() 130 .with_provider() 131 .prefetch_related("policies") 132 .prefetch_related("backchannel_providers") 133 ) 134 serializer_class = ApplicationSerializer 135 search_fields = [ 136 "name", 137 "slug", 138 "meta_launch_url", 139 "meta_description", 140 "meta_publisher", 141 "group", 142 ] 143 filterset_fields = [ 144 "name", 145 "slug", 146 "meta_launch_url", 147 "meta_description", 148 "meta_publisher", 149 "group", 150 ] 151 lookup_field = "slug" 152 ordering = ["name"] 153 154 def _filter_queryset_for_list(self, queryset: QuerySet) -> QuerySet: 155 """Custom filter_queryset method which ignores guardian, but still supports sorting""" 156 for backend in list(self.filter_backends): 157 if backend == ObjectFilter: 158 continue 159 queryset = backend().filter_queryset(self.request, queryset, self) 160 return queryset 161 162 def _get_allowed_applications( 163 self, paginated_apps: Iterator[Application], user: User | None = None 164 ) -> list[Application]: 165 applications = [] 166 request = self.request._request 167 if user: 168 request = copy(request) 169 request.user = user 170 for application in paginated_apps: 171 engine = PolicyEngine(application, request.user, request) 172 engine.empty_result = AppAccessWithoutBindings.get() 173 engine.build() 174 if engine.passing: 175 applications.append(application) 176 return applications 177 178 def _expand_applications(self, applications: list[Application]) -> QuerySet[Application]: 179 """ 180 Re-fetch with proper prefetching for serialization 181 Cached applications don't have prefetched relationships, causing N+1 queries 182 during serialization when get_provider() is called 183 """ 184 if not applications: 185 return self.get_queryset().none() 186 pks = [app.pk for app in applications] 187 return ( 188 self.get_queryset() 189 .filter(pk__in=pks) 190 .order_by(Case(*[When(pk=pk, then=pos) for pos, pk in enumerate(pks)])) 191 ) 192 193 def _filter_applications_with_launch_url( 194 self, paginated_apps: QuerySet[Application] 195 ) -> list[Application]: 196 applications = [] 197 for app in paginated_apps: 198 if app.get_launch_url(): 199 applications.append(app) 200 return applications 201 202 @extend_schema( 203 parameters=[ 204 OpenApiParameter( 205 name="for_user", 206 location=OpenApiParameter.QUERY, 207 type=OpenApiTypes.INT, 208 ) 209 ], 210 responses={ 211 200: PolicyTestResultSerializer(), 212 }, 213 ) 214 @action(detail=True, methods=["GET"]) 215 def check_access(self, request: Request, slug: str) -> Response: 216 """Check access to a single application by slug""" 217 # Don't use self.get_object as that checks for view_application permission 218 # which the user might not have, even if they have access 219 application = get_object_or_404(Application, slug=slug) 220 # If the current user is superuser, they can set `for_user` 221 for_user = request.user 222 if request.user.is_superuser and "for_user" in request.query_params: 223 try: 224 for_user = User.objects.filter(pk=request.query_params.get("for_user")).first() 225 except ValueError: 226 raise ValidationError({"for_user": "for_user must be numerical"}) from None 227 if not for_user: 228 raise ValidationError({"for_user": "User not found"}) 229 engine = PolicyEngine(application, for_user, request) 230 engine.empty_result = AppAccessWithoutBindings.get() 231 engine.use_cache = False 232 with capture_logs() as logs: 233 engine.build() 234 result = engine.result 235 response = PolicyTestResultSerializer(PolicyResult(False)) 236 if result.passing: 237 response = PolicyTestResultSerializer(PolicyResult(True)) 238 if request.user.is_superuser: 239 log_messages = [] 240 for log in logs: 241 if log.attributes.get("process", "") == "PolicyProcess": 242 continue 243 log_messages.append(LogEventSerializer(log).data) 244 result.log_messages = log_messages 245 response = PolicyTestResultSerializer(result) 246 return Response(response.data) 247 248 @extend_schema( 249 parameters=[ 250 OpenApiParameter( 251 name="superuser_full_list", 252 location=OpenApiParameter.QUERY, 253 type=OpenApiTypes.BOOL, 254 ), 255 OpenApiParameter( 256 name="for_user", 257 location=OpenApiParameter.QUERY, 258 type=OpenApiTypes.INT, 259 ), 260 OpenApiParameter( 261 name="only_with_launch_url", 262 location=OpenApiParameter.QUERY, 263 type=OpenApiTypes.BOOL, 264 ), 265 ] 266 ) 267 def list(self, request: Request) -> Response: 268 """Custom list method that checks Policy based access instead of guardian""" 269 should_cache = request.query_params.get("search", "") == "" 270 271 superuser_full_list = ( 272 str(request.query_params.get("superuser_full_list", "false")).lower() == "true" 273 ) 274 if superuser_full_list and request.user.is_superuser: 275 return super().list(request) 276 277 only_with_launch_url = str( 278 request.query_params.get("only_with_launch_url", "false") 279 ).lower() 280 281 queryset = self._filter_queryset_for_list(self.get_queryset()) 282 paginator: Pagination = self.paginator 283 paginated_apps = paginator.paginate_queryset(queryset, request) 284 285 if "for_user" in request.query_params: 286 try: 287 for_user: int = int(request.query_params.get("for_user", 0)) 288 for_user = ( 289 get_objects_for_user(request.user, "authentik_core.view_user_applications") 290 .filter(pk=for_user) 291 .first() 292 ) 293 if not for_user: 294 raise ValidationError({"for_user": "User not found"}) 295 except ValueError as exc: 296 raise ValidationError from exc 297 allowed_applications = self._get_allowed_applications(paginated_apps, user=for_user) 298 allowed_applications = self._expand_applications(allowed_applications) 299 300 serializer = self.get_serializer(allowed_applications, many=True) 301 return self.get_paginated_response(serializer.data) 302 303 allowed_applications = [] 304 if not should_cache: 305 allowed_applications = self._get_allowed_applications(paginated_apps) 306 if should_cache: 307 allowed_applications = cache.get( 308 user_app_cache_key(self.request.user.pk, paginator.page.number) 309 ) 310 if not allowed_applications: 311 LOGGER.debug("Caching allowed application list", page=paginator.page.number) 312 allowed_applications = self._get_allowed_applications(paginated_apps) 313 cache.set( 314 user_app_cache_key(self.request.user.pk, paginator.page.number), 315 allowed_applications, 316 timeout=86400, 317 ) 318 allowed_applications = self._expand_applications(allowed_applications) 319 320 if only_with_launch_url == "true": 321 allowed_applications = self._filter_applications_with_launch_url(allowed_applications) 322 323 serializer = self.get_serializer(allowed_applications, many=True) 324 return self.get_paginated_response(serializer.data)
LOGGER =
<BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
def
user_app_cache_key(user_pk: str, page_number: int | None = None) -> str:
40def user_app_cache_key(user_pk: str, page_number: int | None = None) -> str: 41 """Cache key where application list for user is saved""" 42 key = f"{CACHE_PREFIX}app_access/{user_pk}" 43 if page_number: 44 key += f"/{page_number}" 45 return key
Cache key where application list for user is saved
48class ApplicationSerializer(ModelSerializer): 49 """Application Serializer""" 50 51 launch_url = SerializerMethodField() 52 provider_obj = ProviderSerializer( 53 source="get_provider", 54 required=False, 55 read_only=True, 56 allow_null=True, 57 ) 58 backchannel_providers_obj = ProviderSerializer( 59 source="backchannel_providers", required=False, read_only=True, many=True 60 ) 61 62 meta_icon_url = ReadOnlyField(source="get_meta_icon") 63 meta_icon_themed_urls = ThemedUrlsSerializer( 64 source="get_meta_icon_themed_urls", read_only=True, allow_null=True 65 ) 66 67 def get_launch_url(self, app: Application) -> str | None: 68 """Allow formatting of launch URL""" 69 user = None 70 user_data = None 71 72 if "request" in self.context: 73 user = self.context["request"].user 74 75 # Cache serialized user data to avoid N+1 when formatting launch URLs 76 # for multiple applications. UserSerializer accesses user.groups which 77 # would otherwise trigger a query for each application. 78 if user is not None: 79 if "_cached_user_data" not in self.context: 80 # Prefetch groups to avoid N+1 81 self.context["_cached_user_data"] = UserSerializer(instance=user).data 82 user_data = self.context["_cached_user_data"] 83 84 return app.get_launch_url(user, user_data=user_data) 85 86 def validate_slug(self, slug: str) -> str: 87 if slug in Application.reserved_slugs: 88 raise ValidationError( 89 _("The slug '{slug}' is reserved and cannot be used for applications.").format( 90 slug=slug 91 ) 92 ) 93 return slug 94 95 def __init__(self, *args, **kwargs) -> None: 96 super().__init__(*args, **kwargs) 97 if SERIALIZER_CONTEXT_BLUEPRINT in self.context: 98 self.fields["icon"] = CharField(source="meta_icon", required=False) 99 100 class Meta: 101 model = Application 102 fields = [ 103 "pk", 104 "name", 105 "slug", 106 "provider", 107 "provider_obj", 108 "backchannel_providers", 109 "backchannel_providers_obj", 110 "launch_url", 111 "open_in_new_tab", 112 "meta_launch_url", 113 "meta_icon", 114 "meta_icon_url", 115 "meta_icon_themed_urls", 116 "meta_description", 117 "meta_publisher", 118 "policy_engine_mode", 119 "group", 120 ] 121 extra_kwargs = { 122 "backchannel_providers": {"required": False}, 123 }
Application Serializer
67 def get_launch_url(self, app: Application) -> str | None: 68 """Allow formatting of launch URL""" 69 user = None 70 user_data = None 71 72 if "request" in self.context: 73 user = self.context["request"].user 74 75 # Cache serialized user data to avoid N+1 when formatting launch URLs 76 # for multiple applications. UserSerializer accesses user.groups which 77 # would otherwise trigger a query for each application. 78 if user is not None: 79 if "_cached_user_data" not in self.context: 80 # Prefetch groups to avoid N+1 81 self.context["_cached_user_data"] = UserSerializer(instance=user).data 82 user_data = self.context["_cached_user_data"] 83 84 return app.get_launch_url(user, user_data=user_data)
Allow formatting of launch URL
Inherited Members
class
ApplicationSerializer.Meta:
100 class Meta: 101 model = Application 102 fields = [ 103 "pk", 104 "name", 105 "slug", 106 "provider", 107 "provider_obj", 108 "backchannel_providers", 109 "backchannel_providers_obj", 110 "launch_url", 111 "open_in_new_tab", 112 "meta_launch_url", 113 "meta_icon", 114 "meta_icon_url", 115 "meta_icon_themed_urls", 116 "meta_description", 117 "meta_publisher", 118 "policy_engine_mode", 119 "group", 120 ] 121 extra_kwargs = { 122 "backchannel_providers": {"required": False}, 123 }
model =
<class 'authentik.core.models.Application'>
class
ApplicationViewSet(authentik.core.api.used_by.UsedByMixin, rest_framework.viewsets.ModelViewSet):
126class ApplicationViewSet(UsedByMixin, ModelViewSet): 127 """Application Viewset""" 128 129 queryset = ( 130 Application.objects.all() 131 .with_provider() 132 .prefetch_related("policies") 133 .prefetch_related("backchannel_providers") 134 ) 135 serializer_class = ApplicationSerializer 136 search_fields = [ 137 "name", 138 "slug", 139 "meta_launch_url", 140 "meta_description", 141 "meta_publisher", 142 "group", 143 ] 144 filterset_fields = [ 145 "name", 146 "slug", 147 "meta_launch_url", 148 "meta_description", 149 "meta_publisher", 150 "group", 151 ] 152 lookup_field = "slug" 153 ordering = ["name"] 154 155 def _filter_queryset_for_list(self, queryset: QuerySet) -> QuerySet: 156 """Custom filter_queryset method which ignores guardian, but still supports sorting""" 157 for backend in list(self.filter_backends): 158 if backend == ObjectFilter: 159 continue 160 queryset = backend().filter_queryset(self.request, queryset, self) 161 return queryset 162 163 def _get_allowed_applications( 164 self, paginated_apps: Iterator[Application], user: User | None = None 165 ) -> list[Application]: 166 applications = [] 167 request = self.request._request 168 if user: 169 request = copy(request) 170 request.user = user 171 for application in paginated_apps: 172 engine = PolicyEngine(application, request.user, request) 173 engine.empty_result = AppAccessWithoutBindings.get() 174 engine.build() 175 if engine.passing: 176 applications.append(application) 177 return applications 178 179 def _expand_applications(self, applications: list[Application]) -> QuerySet[Application]: 180 """ 181 Re-fetch with proper prefetching for serialization 182 Cached applications don't have prefetched relationships, causing N+1 queries 183 during serialization when get_provider() is called 184 """ 185 if not applications: 186 return self.get_queryset().none() 187 pks = [app.pk for app in applications] 188 return ( 189 self.get_queryset() 190 .filter(pk__in=pks) 191 .order_by(Case(*[When(pk=pk, then=pos) for pos, pk in enumerate(pks)])) 192 ) 193 194 def _filter_applications_with_launch_url( 195 self, paginated_apps: QuerySet[Application] 196 ) -> list[Application]: 197 applications = [] 198 for app in paginated_apps: 199 if app.get_launch_url(): 200 applications.append(app) 201 return applications 202 203 @extend_schema( 204 parameters=[ 205 OpenApiParameter( 206 name="for_user", 207 location=OpenApiParameter.QUERY, 208 type=OpenApiTypes.INT, 209 ) 210 ], 211 responses={ 212 200: PolicyTestResultSerializer(), 213 }, 214 ) 215 @action(detail=True, methods=["GET"]) 216 def check_access(self, request: Request, slug: str) -> Response: 217 """Check access to a single application by slug""" 218 # Don't use self.get_object as that checks for view_application permission 219 # which the user might not have, even if they have access 220 application = get_object_or_404(Application, slug=slug) 221 # If the current user is superuser, they can set `for_user` 222 for_user = request.user 223 if request.user.is_superuser and "for_user" in request.query_params: 224 try: 225 for_user = User.objects.filter(pk=request.query_params.get("for_user")).first() 226 except ValueError: 227 raise ValidationError({"for_user": "for_user must be numerical"}) from None 228 if not for_user: 229 raise ValidationError({"for_user": "User not found"}) 230 engine = PolicyEngine(application, for_user, request) 231 engine.empty_result = AppAccessWithoutBindings.get() 232 engine.use_cache = False 233 with capture_logs() as logs: 234 engine.build() 235 result = engine.result 236 response = PolicyTestResultSerializer(PolicyResult(False)) 237 if result.passing: 238 response = PolicyTestResultSerializer(PolicyResult(True)) 239 if request.user.is_superuser: 240 log_messages = [] 241 for log in logs: 242 if log.attributes.get("process", "") == "PolicyProcess": 243 continue 244 log_messages.append(LogEventSerializer(log).data) 245 result.log_messages = log_messages 246 response = PolicyTestResultSerializer(result) 247 return Response(response.data) 248 249 @extend_schema( 250 parameters=[ 251 OpenApiParameter( 252 name="superuser_full_list", 253 location=OpenApiParameter.QUERY, 254 type=OpenApiTypes.BOOL, 255 ), 256 OpenApiParameter( 257 name="for_user", 258 location=OpenApiParameter.QUERY, 259 type=OpenApiTypes.INT, 260 ), 261 OpenApiParameter( 262 name="only_with_launch_url", 263 location=OpenApiParameter.QUERY, 264 type=OpenApiTypes.BOOL, 265 ), 266 ] 267 ) 268 def list(self, request: Request) -> Response: 269 """Custom list method that checks Policy based access instead of guardian""" 270 should_cache = request.query_params.get("search", "") == "" 271 272 superuser_full_list = ( 273 str(request.query_params.get("superuser_full_list", "false")).lower() == "true" 274 ) 275 if superuser_full_list and request.user.is_superuser: 276 return super().list(request) 277 278 only_with_launch_url = str( 279 request.query_params.get("only_with_launch_url", "false") 280 ).lower() 281 282 queryset = self._filter_queryset_for_list(self.get_queryset()) 283 paginator: Pagination = self.paginator 284 paginated_apps = paginator.paginate_queryset(queryset, request) 285 286 if "for_user" in request.query_params: 287 try: 288 for_user: int = int(request.query_params.get("for_user", 0)) 289 for_user = ( 290 get_objects_for_user(request.user, "authentik_core.view_user_applications") 291 .filter(pk=for_user) 292 .first() 293 ) 294 if not for_user: 295 raise ValidationError({"for_user": "User not found"}) 296 except ValueError as exc: 297 raise ValidationError from exc 298 allowed_applications = self._get_allowed_applications(paginated_apps, user=for_user) 299 allowed_applications = self._expand_applications(allowed_applications) 300 301 serializer = self.get_serializer(allowed_applications, many=True) 302 return self.get_paginated_response(serializer.data) 303 304 allowed_applications = [] 305 if not should_cache: 306 allowed_applications = self._get_allowed_applications(paginated_apps) 307 if should_cache: 308 allowed_applications = cache.get( 309 user_app_cache_key(self.request.user.pk, paginator.page.number) 310 ) 311 if not allowed_applications: 312 LOGGER.debug("Caching allowed application list", page=paginator.page.number) 313 allowed_applications = self._get_allowed_applications(paginated_apps) 314 cache.set( 315 user_app_cache_key(self.request.user.pk, paginator.page.number), 316 allowed_applications, 317 timeout=86400, 318 ) 319 allowed_applications = self._expand_applications(allowed_applications) 320 321 if only_with_launch_url == "true": 322 allowed_applications = self._filter_applications_with_launch_url(allowed_applications) 323 324 serializer = self.get_serializer(allowed_applications, many=True) 325 return self.get_paginated_response(serializer.data)
Application Viewset
serializer_class =
<class 'ApplicationSerializer'>
filterset_fields =
['name', 'slug', 'meta_launch_url', 'meta_description', 'meta_publisher', 'group']
@extend_schema(parameters=[OpenApiParameter(name='for_user', location=OpenApiParameter.QUERY, type=OpenApiTypes.INT)], responses={200: PolicyTestResultSerializer()})
@action(detail=True, methods=['GET'])
def
check_access( self, request: rest_framework.request.Request, slug: str) -> rest_framework.response.Response:
203 @extend_schema( 204 parameters=[ 205 OpenApiParameter( 206 name="for_user", 207 location=OpenApiParameter.QUERY, 208 type=OpenApiTypes.INT, 209 ) 210 ], 211 responses={ 212 200: PolicyTestResultSerializer(), 213 }, 214 ) 215 @action(detail=True, methods=["GET"]) 216 def check_access(self, request: Request, slug: str) -> Response: 217 """Check access to a single application by slug""" 218 # Don't use self.get_object as that checks for view_application permission 219 # which the user might not have, even if they have access 220 application = get_object_or_404(Application, slug=slug) 221 # If the current user is superuser, they can set `for_user` 222 for_user = request.user 223 if request.user.is_superuser and "for_user" in request.query_params: 224 try: 225 for_user = User.objects.filter(pk=request.query_params.get("for_user")).first() 226 except ValueError: 227 raise ValidationError({"for_user": "for_user must be numerical"}) from None 228 if not for_user: 229 raise ValidationError({"for_user": "User not found"}) 230 engine = PolicyEngine(application, for_user, request) 231 engine.empty_result = AppAccessWithoutBindings.get() 232 engine.use_cache = False 233 with capture_logs() as logs: 234 engine.build() 235 result = engine.result 236 response = PolicyTestResultSerializer(PolicyResult(False)) 237 if result.passing: 238 response = PolicyTestResultSerializer(PolicyResult(True)) 239 if request.user.is_superuser: 240 log_messages = [] 241 for log in logs: 242 if log.attributes.get("process", "") == "PolicyProcess": 243 continue 244 log_messages.append(LogEventSerializer(log).data) 245 result.log_messages = log_messages 246 response = PolicyTestResultSerializer(result) 247 return Response(response.data)
Check access to a single application by slug
@extend_schema(parameters=[OpenApiParameter(name='superuser_full_list', location=OpenApiParameter.QUERY, type=OpenApiTypes.BOOL), OpenApiParameter(name='for_user', location=OpenApiParameter.QUERY, type=OpenApiTypes.INT), OpenApiParameter(name='only_with_launch_url', location=OpenApiParameter.QUERY, type=OpenApiTypes.BOOL)])
def
list( self, request: rest_framework.request.Request) -> rest_framework.response.Response:
249 @extend_schema( 250 parameters=[ 251 OpenApiParameter( 252 name="superuser_full_list", 253 location=OpenApiParameter.QUERY, 254 type=OpenApiTypes.BOOL, 255 ), 256 OpenApiParameter( 257 name="for_user", 258 location=OpenApiParameter.QUERY, 259 type=OpenApiTypes.INT, 260 ), 261 OpenApiParameter( 262 name="only_with_launch_url", 263 location=OpenApiParameter.QUERY, 264 type=OpenApiTypes.BOOL, 265 ), 266 ] 267 ) 268 def list(self, request: Request) -> Response: 269 """Custom list method that checks Policy based access instead of guardian""" 270 should_cache = request.query_params.get("search", "") == "" 271 272 superuser_full_list = ( 273 str(request.query_params.get("superuser_full_list", "false")).lower() == "true" 274 ) 275 if superuser_full_list and request.user.is_superuser: 276 return super().list(request) 277 278 only_with_launch_url = str( 279 request.query_params.get("only_with_launch_url", "false") 280 ).lower() 281 282 queryset = self._filter_queryset_for_list(self.get_queryset()) 283 paginator: Pagination = self.paginator 284 paginated_apps = paginator.paginate_queryset(queryset, request) 285 286 if "for_user" in request.query_params: 287 try: 288 for_user: int = int(request.query_params.get("for_user", 0)) 289 for_user = ( 290 get_objects_for_user(request.user, "authentik_core.view_user_applications") 291 .filter(pk=for_user) 292 .first() 293 ) 294 if not for_user: 295 raise ValidationError({"for_user": "User not found"}) 296 except ValueError as exc: 297 raise ValidationError from exc 298 allowed_applications = self._get_allowed_applications(paginated_apps, user=for_user) 299 allowed_applications = self._expand_applications(allowed_applications) 300 301 serializer = self.get_serializer(allowed_applications, many=True) 302 return self.get_paginated_response(serializer.data) 303 304 allowed_applications = [] 305 if not should_cache: 306 allowed_applications = self._get_allowed_applications(paginated_apps) 307 if should_cache: 308 allowed_applications = cache.get( 309 user_app_cache_key(self.request.user.pk, paginator.page.number) 310 ) 311 if not allowed_applications: 312 LOGGER.debug("Caching allowed application list", page=paginator.page.number) 313 allowed_applications = self._get_allowed_applications(paginated_apps) 314 cache.set( 315 user_app_cache_key(self.request.user.pk, paginator.page.number), 316 allowed_applications, 317 timeout=86400, 318 ) 319 allowed_applications = self._expand_applications(allowed_applications) 320 321 if only_with_launch_url == "true": 322 allowed_applications = self._filter_applications_with_launch_url(allowed_applications) 323 324 serializer = self.get_serializer(allowed_applications, many=True) 325 return self.get_paginated_response(serializer.data)
Custom list method that checks Policy based access instead of guardian