authentik.core.api.applications
Application API Views
1"""Application API Views""" 2 3from collections.abc import Iterator 4from copy import copy 5 6from django.core.cache import cache 7from django.db.models import Case, QuerySet 8from django.db.models.expressions import When 9from django.shortcuts import get_object_or_404 10from django.utils.translation import gettext as _ 11from drf_spectacular.types import OpenApiTypes 12from drf_spectacular.utils import OpenApiParameter, extend_schema 13from guardian.shortcuts import get_objects_for_user 14from rest_framework.decorators import action 15from rest_framework.exceptions import ValidationError 16from rest_framework.fields import CharField, ReadOnlyField, SerializerMethodField 17from rest_framework.request import Request 18from rest_framework.response import Response 19from rest_framework.viewsets import ModelViewSet 20from structlog.stdlib import get_logger 21 22from authentik.api.pagination import Pagination 23from authentik.blueprints.v1.importer import SERIALIZER_CONTEXT_BLUEPRINT 24from authentik.core.api.providers import ProviderSerializer 25from authentik.core.api.used_by import UsedByMixin 26from authentik.core.api.users import UserSerializer 27from authentik.core.api.utils import ModelSerializer, ThemedUrlsSerializer 28from authentik.core.apps import AppAccessWithoutBindings 29from authentik.core.models import Application, User 30from authentik.events.logs import LogEventSerializer, capture_logs 31from authentik.policies.api.exec import PolicyTestResultSerializer 32from authentik.policies.engine import PolicyEngine 33from authentik.policies.types import CACHE_PREFIX, PolicyResult 34from authentik.rbac.filters import ObjectFilter 35 36LOGGER = get_logger() 37 38 39def user_app_cache_key( 40 user_pk: str, page_number: int | None = None, only_with_launch_url: bool = False 41) -> str: 42 """Cache key where application list for user is saved""" 43 key = f"{CACHE_PREFIX}app_access/{user_pk}" 44 if only_with_launch_url: 45 key += "/launch" 46 if page_number: 47 key += f"/{page_number}" 48 return key 49 50 51class ApplicationSerializer(ModelSerializer): 52 """Application Serializer""" 53 54 launch_url = SerializerMethodField() 55 provider_obj = ProviderSerializer( 56 source="get_provider", 57 required=False, 58 read_only=True, 59 allow_null=True, 60 ) 61 backchannel_providers_obj = ProviderSerializer( 62 source="backchannel_providers", required=False, read_only=True, many=True 63 ) 64 65 meta_icon_url = ReadOnlyField(source="get_meta_icon") 66 meta_icon_themed_urls = ThemedUrlsSerializer( 67 source="get_meta_icon_themed_urls", read_only=True, allow_null=True 68 ) 69 70 def get_launch_url(self, app: Application) -> str | None: 71 """Allow formatting of launch URL""" 72 user = None 73 user_data = None 74 75 if "request" in self.context: 76 user = self.context["request"].user 77 78 # Cache serialized user data to avoid N+1 when formatting launch URLs 79 # for multiple applications. UserSerializer accesses user.groups which 80 # would otherwise trigger a query for each application. 81 if user is not None: 82 if "_cached_user_data" not in self.context: 83 # Prefetch groups to avoid N+1 84 self.context["_cached_user_data"] = UserSerializer(instance=user).data 85 user_data = self.context["_cached_user_data"] 86 87 return app.get_launch_url(user, user_data=user_data) 88 89 def validate_slug(self, slug: str) -> str: 90 if slug in Application.reserved_slugs: 91 raise ValidationError( 92 _("The slug '{slug}' is reserved and cannot be used for applications.").format( 93 slug=slug 94 ) 95 ) 96 return slug 97 98 def __init__(self, *args, **kwargs) -> None: 99 super().__init__(*args, **kwargs) 100 if SERIALIZER_CONTEXT_BLUEPRINT in self.context: 101 self.fields["icon"] = CharField(source="meta_icon", required=False) 102 103 class Meta: 104 model = Application 105 fields = [ 106 "pk", 107 "name", 108 "slug", 109 "provider", 110 "provider_obj", 111 "backchannel_providers", 112 "backchannel_providers_obj", 113 "launch_url", 114 "open_in_new_tab", 115 "meta_launch_url", 116 "meta_icon", 117 "meta_icon_url", 118 "meta_icon_themed_urls", 119 "meta_description", 120 "meta_publisher", 121 "policy_engine_mode", 122 "group", 123 "meta_hide", 124 ] 125 extra_kwargs = { 126 "backchannel_providers": {"required": False}, 127 } 128 129 130class ApplicationViewSet(UsedByMixin, ModelViewSet): 131 """Application Viewset""" 132 133 queryset = ( 134 Application.objects.all() 135 .with_provider() 136 .prefetch_related("policies") 137 .prefetch_related("backchannel_providers") 138 ) 139 serializer_class = ApplicationSerializer 140 search_fields = [ 141 "name", 142 "slug", 143 "meta_launch_url", 144 "meta_description", 145 "meta_publisher", 146 "group", 147 ] 148 filterset_fields = [ 149 "name", 150 "slug", 151 "meta_launch_url", 152 "meta_description", 153 "meta_publisher", 154 "group", 155 ] 156 lookup_field = "slug" 157 ordering = ["name"] 158 159 def _filter_queryset_for_list(self, queryset: QuerySet) -> QuerySet: 160 """Custom filter_queryset method which ignores guardian, but still supports sorting""" 161 for backend in list(self.filter_backends): 162 if backend == ObjectFilter: 163 continue 164 queryset = backend().filter_queryset(self.request, queryset, self) 165 return queryset 166 167 def _get_allowed_applications( 168 self, paginated_apps: Iterator[Application], user: User | None = None 169 ) -> list[Application]: 170 applications = [] 171 request = self.request._request 172 if user: 173 request = copy(request) 174 request.user = user 175 for application in paginated_apps: 176 engine = PolicyEngine(application, request.user, request) 177 engine.empty_result = AppAccessWithoutBindings.get() 178 engine.build() 179 if engine.passing: 180 applications.append(application) 181 return applications 182 183 def _expand_applications(self, applications: list[Application]) -> QuerySet[Application]: 184 """ 185 Re-fetch with proper prefetching for serialization 186 Cached applications don't have prefetched relationships, causing N+1 queries 187 during serialization when get_provider() is called 188 """ 189 if not applications: 190 return self.get_queryset().none() 191 pks = [app.pk for app in applications] 192 return ( 193 self.get_queryset() 194 .filter(pk__in=pks) 195 .order_by(Case(*[When(pk=pk, then=pos) for pos, pk in enumerate(pks)])) 196 ) 197 198 def _filter_applications_with_launch_url( 199 self, paginated_apps: QuerySet[Application] 200 ) -> list[Application]: 201 applications = [] 202 for app in paginated_apps: 203 if app.get_launch_url(): 204 applications.append(app) 205 return applications 206 207 @extend_schema( 208 parameters=[ 209 OpenApiParameter( 210 name="for_user", 211 location=OpenApiParameter.QUERY, 212 type=OpenApiTypes.INT, 213 ) 214 ], 215 responses={ 216 200: PolicyTestResultSerializer(), 217 }, 218 ) 219 @action(detail=True, methods=["GET"]) 220 def check_access(self, request: Request, slug: str) -> Response: 221 """Check access to a single application by slug""" 222 # Don't use self.get_object as that checks for view_application permission 223 # which the user might not have, even if they have access 224 application = get_object_or_404(Application, slug=slug) 225 # If the current user is superuser, they can set `for_user` 226 for_user = request.user 227 if request.user.is_superuser and "for_user" in request.query_params: 228 try: 229 for_user = User.objects.filter(pk=request.query_params.get("for_user")).first() 230 except ValueError: 231 raise ValidationError({"for_user": "for_user must be numerical"}) from None 232 if not for_user: 233 raise ValidationError({"for_user": "User not found"}) 234 engine = PolicyEngine(application, for_user, request) 235 engine.empty_result = AppAccessWithoutBindings.get() 236 engine.use_cache = False 237 with capture_logs() as logs: 238 engine.build() 239 result = engine.result 240 response = PolicyTestResultSerializer(PolicyResult(False)) 241 if result.passing: 242 response = PolicyTestResultSerializer(PolicyResult(True)) 243 if request.user.is_superuser: 244 log_messages = [] 245 for log in logs: 246 if log.attributes.get("process", "") == "PolicyProcess": 247 continue 248 log_messages.append(LogEventSerializer(log).data) 249 result.log_messages = log_messages 250 response = PolicyTestResultSerializer(result) 251 return Response(response.data) 252 253 @extend_schema( 254 parameters=[ 255 OpenApiParameter( 256 name="superuser_full_list", 257 location=OpenApiParameter.QUERY, 258 type=OpenApiTypes.BOOL, 259 ), 260 OpenApiParameter( 261 name="for_user", 262 location=OpenApiParameter.QUERY, 263 type=OpenApiTypes.INT, 264 ), 265 OpenApiParameter( 266 name="only_with_launch_url", 267 location=OpenApiParameter.QUERY, 268 type=OpenApiTypes.BOOL, 269 ), 270 ] 271 ) 272 def list(self, request: Request) -> Response: 273 """Custom list method that checks Policy based access instead of guardian""" 274 should_cache = request.query_params.get("search", "") == "" 275 276 superuser_full_list = ( 277 str(request.query_params.get("superuser_full_list", "false")).lower() == "true" 278 ) 279 if superuser_full_list and request.user.is_superuser: 280 return super().list(request) 281 282 only_with_launch_url = ( 283 str(request.query_params.get("only_with_launch_url", "false")).lower() 284 ) == "true" 285 286 queryset = self._filter_queryset_for_list(self.get_queryset()) 287 queryset = queryset.exclude(meta_hide=True) 288 if only_with_launch_url: 289 # Pre-filter at DB level to skip expensive per-app policy evaluation 290 # for apps that can never appear in the launcher (no meta_launch_url 291 # and no provider, so no possible launch URL). 292 queryset = queryset.exclude(meta_launch_url="", provider__isnull=True) 293 paginator: Pagination = self.paginator 294 paginated_apps = paginator.paginate_queryset(queryset, request) 295 296 if "for_user" in request.query_params: 297 try: 298 for_user: int = int(request.query_params.get("for_user", 0)) 299 for_user = ( 300 get_objects_for_user(request.user, "authentik_core.view_user_applications") 301 .filter(pk=for_user) 302 .first() 303 ) 304 if not for_user: 305 raise ValidationError({"for_user": "User not found"}) 306 except ValueError as exc: 307 raise ValidationError from exc 308 allowed_applications = self._get_allowed_applications(paginated_apps, user=for_user) 309 310 serializer = self.get_serializer(allowed_applications, many=True) 311 return self.get_paginated_response(serializer.data) 312 313 allowed_applications = [] 314 if not should_cache: 315 allowed_applications = self._get_allowed_applications(paginated_apps) 316 if should_cache: 317 allowed_applications = cache.get( 318 user_app_cache_key( 319 self.request.user.pk, paginator.page.number, only_with_launch_url 320 ) 321 ) 322 if allowed_applications: 323 # Re-fetch cached applications since pickled instances lose prefetched 324 # relationships, causing N+1 queries during serialization 325 allowed_applications = self._expand_applications(allowed_applications) 326 else: 327 LOGGER.debug("Caching allowed application list", page=paginator.page.number) 328 allowed_applications = self._get_allowed_applications(paginated_apps) 329 cache.set( 330 user_app_cache_key( 331 self.request.user.pk, paginator.page.number, only_with_launch_url 332 ), 333 allowed_applications, 334 timeout=86400, 335 ) 336 337 if only_with_launch_url: 338 allowed_applications = self._filter_applications_with_launch_url(allowed_applications) 339 340 serializer = self.get_serializer(allowed_applications, many=True) 341 return self.get_paginated_response(serializer.data)
LOGGER =
<BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
def
user_app_cache_key( user_pk: str, page_number: int | None = None, only_with_launch_url: bool = False) -> str:
40def user_app_cache_key( 41 user_pk: str, page_number: int | None = None, only_with_launch_url: bool = False 42) -> str: 43 """Cache key where application list for user is saved""" 44 key = f"{CACHE_PREFIX}app_access/{user_pk}" 45 if only_with_launch_url: 46 key += "/launch" 47 if page_number: 48 key += f"/{page_number}" 49 return key
Cache key where application list for user is saved
52class ApplicationSerializer(ModelSerializer): 53 """Application Serializer""" 54 55 launch_url = SerializerMethodField() 56 provider_obj = ProviderSerializer( 57 source="get_provider", 58 required=False, 59 read_only=True, 60 allow_null=True, 61 ) 62 backchannel_providers_obj = ProviderSerializer( 63 source="backchannel_providers", required=False, read_only=True, many=True 64 ) 65 66 meta_icon_url = ReadOnlyField(source="get_meta_icon") 67 meta_icon_themed_urls = ThemedUrlsSerializer( 68 source="get_meta_icon_themed_urls", read_only=True, allow_null=True 69 ) 70 71 def get_launch_url(self, app: Application) -> str | None: 72 """Allow formatting of launch URL""" 73 user = None 74 user_data = None 75 76 if "request" in self.context: 77 user = self.context["request"].user 78 79 # Cache serialized user data to avoid N+1 when formatting launch URLs 80 # for multiple applications. UserSerializer accesses user.groups which 81 # would otherwise trigger a query for each application. 82 if user is not None: 83 if "_cached_user_data" not in self.context: 84 # Prefetch groups to avoid N+1 85 self.context["_cached_user_data"] = UserSerializer(instance=user).data 86 user_data = self.context["_cached_user_data"] 87 88 return app.get_launch_url(user, user_data=user_data) 89 90 def validate_slug(self, slug: str) -> str: 91 if slug in Application.reserved_slugs: 92 raise ValidationError( 93 _("The slug '{slug}' is reserved and cannot be used for applications.").format( 94 slug=slug 95 ) 96 ) 97 return slug 98 99 def __init__(self, *args, **kwargs) -> None: 100 super().__init__(*args, **kwargs) 101 if SERIALIZER_CONTEXT_BLUEPRINT in self.context: 102 self.fields["icon"] = CharField(source="meta_icon", required=False) 103 104 class Meta: 105 model = Application 106 fields = [ 107 "pk", 108 "name", 109 "slug", 110 "provider", 111 "provider_obj", 112 "backchannel_providers", 113 "backchannel_providers_obj", 114 "launch_url", 115 "open_in_new_tab", 116 "meta_launch_url", 117 "meta_icon", 118 "meta_icon_url", 119 "meta_icon_themed_urls", 120 "meta_description", 121 "meta_publisher", 122 "policy_engine_mode", 123 "group", 124 "meta_hide", 125 ] 126 extra_kwargs = { 127 "backchannel_providers": {"required": False}, 128 }
Application Serializer
71 def get_launch_url(self, app: Application) -> str | None: 72 """Allow formatting of launch URL""" 73 user = None 74 user_data = None 75 76 if "request" in self.context: 77 user = self.context["request"].user 78 79 # Cache serialized user data to avoid N+1 when formatting launch URLs 80 # for multiple applications. UserSerializer accesses user.groups which 81 # would otherwise trigger a query for each application. 82 if user is not None: 83 if "_cached_user_data" not in self.context: 84 # Prefetch groups to avoid N+1 85 self.context["_cached_user_data"] = UserSerializer(instance=user).data 86 user_data = self.context["_cached_user_data"] 87 88 return app.get_launch_url(user, user_data=user_data)
Allow formatting of launch URL
Inherited Members
class
ApplicationSerializer.Meta:
104 class Meta: 105 model = Application 106 fields = [ 107 "pk", 108 "name", 109 "slug", 110 "provider", 111 "provider_obj", 112 "backchannel_providers", 113 "backchannel_providers_obj", 114 "launch_url", 115 "open_in_new_tab", 116 "meta_launch_url", 117 "meta_icon", 118 "meta_icon_url", 119 "meta_icon_themed_urls", 120 "meta_description", 121 "meta_publisher", 122 "policy_engine_mode", 123 "group", 124 "meta_hide", 125 ] 126 extra_kwargs = { 127 "backchannel_providers": {"required": False}, 128 }
model =
<class 'authentik.core.models.Application'>
fields =
['pk', 'name', 'slug', 'provider', 'provider_obj', 'backchannel_providers', 'backchannel_providers_obj', 'launch_url', 'open_in_new_tab', 'meta_launch_url', 'meta_icon', 'meta_icon_url', 'meta_icon_themed_urls', 'meta_description', 'meta_publisher', 'policy_engine_mode', 'group', 'meta_hide']
class
ApplicationViewSet(authentik.core.api.used_by.UsedByMixin, rest_framework.viewsets.ModelViewSet):
131class ApplicationViewSet(UsedByMixin, ModelViewSet): 132 """Application Viewset""" 133 134 queryset = ( 135 Application.objects.all() 136 .with_provider() 137 .prefetch_related("policies") 138 .prefetch_related("backchannel_providers") 139 ) 140 serializer_class = ApplicationSerializer 141 search_fields = [ 142 "name", 143 "slug", 144 "meta_launch_url", 145 "meta_description", 146 "meta_publisher", 147 "group", 148 ] 149 filterset_fields = [ 150 "name", 151 "slug", 152 "meta_launch_url", 153 "meta_description", 154 "meta_publisher", 155 "group", 156 ] 157 lookup_field = "slug" 158 ordering = ["name"] 159 160 def _filter_queryset_for_list(self, queryset: QuerySet) -> QuerySet: 161 """Custom filter_queryset method which ignores guardian, but still supports sorting""" 162 for backend in list(self.filter_backends): 163 if backend == ObjectFilter: 164 continue 165 queryset = backend().filter_queryset(self.request, queryset, self) 166 return queryset 167 168 def _get_allowed_applications( 169 self, paginated_apps: Iterator[Application], user: User | None = None 170 ) -> list[Application]: 171 applications = [] 172 request = self.request._request 173 if user: 174 request = copy(request) 175 request.user = user 176 for application in paginated_apps: 177 engine = PolicyEngine(application, request.user, request) 178 engine.empty_result = AppAccessWithoutBindings.get() 179 engine.build() 180 if engine.passing: 181 applications.append(application) 182 return applications 183 184 def _expand_applications(self, applications: list[Application]) -> QuerySet[Application]: 185 """ 186 Re-fetch with proper prefetching for serialization 187 Cached applications don't have prefetched relationships, causing N+1 queries 188 during serialization when get_provider() is called 189 """ 190 if not applications: 191 return self.get_queryset().none() 192 pks = [app.pk for app in applications] 193 return ( 194 self.get_queryset() 195 .filter(pk__in=pks) 196 .order_by(Case(*[When(pk=pk, then=pos) for pos, pk in enumerate(pks)])) 197 ) 198 199 def _filter_applications_with_launch_url( 200 self, paginated_apps: QuerySet[Application] 201 ) -> list[Application]: 202 applications = [] 203 for app in paginated_apps: 204 if app.get_launch_url(): 205 applications.append(app) 206 return applications 207 208 @extend_schema( 209 parameters=[ 210 OpenApiParameter( 211 name="for_user", 212 location=OpenApiParameter.QUERY, 213 type=OpenApiTypes.INT, 214 ) 215 ], 216 responses={ 217 200: PolicyTestResultSerializer(), 218 }, 219 ) 220 @action(detail=True, methods=["GET"]) 221 def check_access(self, request: Request, slug: str) -> Response: 222 """Check access to a single application by slug""" 223 # Don't use self.get_object as that checks for view_application permission 224 # which the user might not have, even if they have access 225 application = get_object_or_404(Application, slug=slug) 226 # If the current user is superuser, they can set `for_user` 227 for_user = request.user 228 if request.user.is_superuser and "for_user" in request.query_params: 229 try: 230 for_user = User.objects.filter(pk=request.query_params.get("for_user")).first() 231 except ValueError: 232 raise ValidationError({"for_user": "for_user must be numerical"}) from None 233 if not for_user: 234 raise ValidationError({"for_user": "User not found"}) 235 engine = PolicyEngine(application, for_user, request) 236 engine.empty_result = AppAccessWithoutBindings.get() 237 engine.use_cache = False 238 with capture_logs() as logs: 239 engine.build() 240 result = engine.result 241 response = PolicyTestResultSerializer(PolicyResult(False)) 242 if result.passing: 243 response = PolicyTestResultSerializer(PolicyResult(True)) 244 if request.user.is_superuser: 245 log_messages = [] 246 for log in logs: 247 if log.attributes.get("process", "") == "PolicyProcess": 248 continue 249 log_messages.append(LogEventSerializer(log).data) 250 result.log_messages = log_messages 251 response = PolicyTestResultSerializer(result) 252 return Response(response.data) 253 254 @extend_schema( 255 parameters=[ 256 OpenApiParameter( 257 name="superuser_full_list", 258 location=OpenApiParameter.QUERY, 259 type=OpenApiTypes.BOOL, 260 ), 261 OpenApiParameter( 262 name="for_user", 263 location=OpenApiParameter.QUERY, 264 type=OpenApiTypes.INT, 265 ), 266 OpenApiParameter( 267 name="only_with_launch_url", 268 location=OpenApiParameter.QUERY, 269 type=OpenApiTypes.BOOL, 270 ), 271 ] 272 ) 273 def list(self, request: Request) -> Response: 274 """Custom list method that checks Policy based access instead of guardian""" 275 should_cache = request.query_params.get("search", "") == "" 276 277 superuser_full_list = ( 278 str(request.query_params.get("superuser_full_list", "false")).lower() == "true" 279 ) 280 if superuser_full_list and request.user.is_superuser: 281 return super().list(request) 282 283 only_with_launch_url = ( 284 str(request.query_params.get("only_with_launch_url", "false")).lower() 285 ) == "true" 286 287 queryset = self._filter_queryset_for_list(self.get_queryset()) 288 queryset = queryset.exclude(meta_hide=True) 289 if only_with_launch_url: 290 # Pre-filter at DB level to skip expensive per-app policy evaluation 291 # for apps that can never appear in the launcher (no meta_launch_url 292 # and no provider, so no possible launch URL). 293 queryset = queryset.exclude(meta_launch_url="", provider__isnull=True) 294 paginator: Pagination = self.paginator 295 paginated_apps = paginator.paginate_queryset(queryset, request) 296 297 if "for_user" in request.query_params: 298 try: 299 for_user: int = int(request.query_params.get("for_user", 0)) 300 for_user = ( 301 get_objects_for_user(request.user, "authentik_core.view_user_applications") 302 .filter(pk=for_user) 303 .first() 304 ) 305 if not for_user: 306 raise ValidationError({"for_user": "User not found"}) 307 except ValueError as exc: 308 raise ValidationError from exc 309 allowed_applications = self._get_allowed_applications(paginated_apps, user=for_user) 310 311 serializer = self.get_serializer(allowed_applications, many=True) 312 return self.get_paginated_response(serializer.data) 313 314 allowed_applications = [] 315 if not should_cache: 316 allowed_applications = self._get_allowed_applications(paginated_apps) 317 if should_cache: 318 allowed_applications = cache.get( 319 user_app_cache_key( 320 self.request.user.pk, paginator.page.number, only_with_launch_url 321 ) 322 ) 323 if allowed_applications: 324 # Re-fetch cached applications since pickled instances lose prefetched 325 # relationships, causing N+1 queries during serialization 326 allowed_applications = self._expand_applications(allowed_applications) 327 else: 328 LOGGER.debug("Caching allowed application list", page=paginator.page.number) 329 allowed_applications = self._get_allowed_applications(paginated_apps) 330 cache.set( 331 user_app_cache_key( 332 self.request.user.pk, paginator.page.number, only_with_launch_url 333 ), 334 allowed_applications, 335 timeout=86400, 336 ) 337 338 if only_with_launch_url: 339 allowed_applications = self._filter_applications_with_launch_url(allowed_applications) 340 341 serializer = self.get_serializer(allowed_applications, many=True) 342 return self.get_paginated_response(serializer.data)
Application Viewset
serializer_class =
<class 'ApplicationSerializer'>
filterset_fields =
['name', 'slug', 'meta_launch_url', 'meta_description', 'meta_publisher', 'group']
@extend_schema(parameters=[OpenApiParameter(name='for_user', location=OpenApiParameter.QUERY, type=OpenApiTypes.INT)], responses={200: PolicyTestResultSerializer()})
@action(detail=True, methods=['GET'])
def
check_access( self, request: rest_framework.request.Request, slug: str) -> rest_framework.response.Response:
208 @extend_schema( 209 parameters=[ 210 OpenApiParameter( 211 name="for_user", 212 location=OpenApiParameter.QUERY, 213 type=OpenApiTypes.INT, 214 ) 215 ], 216 responses={ 217 200: PolicyTestResultSerializer(), 218 }, 219 ) 220 @action(detail=True, methods=["GET"]) 221 def check_access(self, request: Request, slug: str) -> Response: 222 """Check access to a single application by slug""" 223 # Don't use self.get_object as that checks for view_application permission 224 # which the user might not have, even if they have access 225 application = get_object_or_404(Application, slug=slug) 226 # If the current user is superuser, they can set `for_user` 227 for_user = request.user 228 if request.user.is_superuser and "for_user" in request.query_params: 229 try: 230 for_user = User.objects.filter(pk=request.query_params.get("for_user")).first() 231 except ValueError: 232 raise ValidationError({"for_user": "for_user must be numerical"}) from None 233 if not for_user: 234 raise ValidationError({"for_user": "User not found"}) 235 engine = PolicyEngine(application, for_user, request) 236 engine.empty_result = AppAccessWithoutBindings.get() 237 engine.use_cache = False 238 with capture_logs() as logs: 239 engine.build() 240 result = engine.result 241 response = PolicyTestResultSerializer(PolicyResult(False)) 242 if result.passing: 243 response = PolicyTestResultSerializer(PolicyResult(True)) 244 if request.user.is_superuser: 245 log_messages = [] 246 for log in logs: 247 if log.attributes.get("process", "") == "PolicyProcess": 248 continue 249 log_messages.append(LogEventSerializer(log).data) 250 result.log_messages = log_messages 251 response = PolicyTestResultSerializer(result) 252 return Response(response.data)
Check access to a single application by slug
@extend_schema(parameters=[OpenApiParameter(name='superuser_full_list', location=OpenApiParameter.QUERY, type=OpenApiTypes.BOOL), OpenApiParameter(name='for_user', location=OpenApiParameter.QUERY, type=OpenApiTypes.INT), OpenApiParameter(name='only_with_launch_url', location=OpenApiParameter.QUERY, type=OpenApiTypes.BOOL)])
def
list( self, request: rest_framework.request.Request) -> rest_framework.response.Response:
254 @extend_schema( 255 parameters=[ 256 OpenApiParameter( 257 name="superuser_full_list", 258 location=OpenApiParameter.QUERY, 259 type=OpenApiTypes.BOOL, 260 ), 261 OpenApiParameter( 262 name="for_user", 263 location=OpenApiParameter.QUERY, 264 type=OpenApiTypes.INT, 265 ), 266 OpenApiParameter( 267 name="only_with_launch_url", 268 location=OpenApiParameter.QUERY, 269 type=OpenApiTypes.BOOL, 270 ), 271 ] 272 ) 273 def list(self, request: Request) -> Response: 274 """Custom list method that checks Policy based access instead of guardian""" 275 should_cache = request.query_params.get("search", "") == "" 276 277 superuser_full_list = ( 278 str(request.query_params.get("superuser_full_list", "false")).lower() == "true" 279 ) 280 if superuser_full_list and request.user.is_superuser: 281 return super().list(request) 282 283 only_with_launch_url = ( 284 str(request.query_params.get("only_with_launch_url", "false")).lower() 285 ) == "true" 286 287 queryset = self._filter_queryset_for_list(self.get_queryset()) 288 queryset = queryset.exclude(meta_hide=True) 289 if only_with_launch_url: 290 # Pre-filter at DB level to skip expensive per-app policy evaluation 291 # for apps that can never appear in the launcher (no meta_launch_url 292 # and no provider, so no possible launch URL). 293 queryset = queryset.exclude(meta_launch_url="", provider__isnull=True) 294 paginator: Pagination = self.paginator 295 paginated_apps = paginator.paginate_queryset(queryset, request) 296 297 if "for_user" in request.query_params: 298 try: 299 for_user: int = int(request.query_params.get("for_user", 0)) 300 for_user = ( 301 get_objects_for_user(request.user, "authentik_core.view_user_applications") 302 .filter(pk=for_user) 303 .first() 304 ) 305 if not for_user: 306 raise ValidationError({"for_user": "User not found"}) 307 except ValueError as exc: 308 raise ValidationError from exc 309 allowed_applications = self._get_allowed_applications(paginated_apps, user=for_user) 310 311 serializer = self.get_serializer(allowed_applications, many=True) 312 return self.get_paginated_response(serializer.data) 313 314 allowed_applications = [] 315 if not should_cache: 316 allowed_applications = self._get_allowed_applications(paginated_apps) 317 if should_cache: 318 allowed_applications = cache.get( 319 user_app_cache_key( 320 self.request.user.pk, paginator.page.number, only_with_launch_url 321 ) 322 ) 323 if allowed_applications: 324 # Re-fetch cached applications since pickled instances lose prefetched 325 # relationships, causing N+1 queries during serialization 326 allowed_applications = self._expand_applications(allowed_applications) 327 else: 328 LOGGER.debug("Caching allowed application list", page=paginator.page.number) 329 allowed_applications = self._get_allowed_applications(paginated_apps) 330 cache.set( 331 user_app_cache_key( 332 self.request.user.pk, paginator.page.number, only_with_launch_url 333 ), 334 allowed_applications, 335 timeout=86400, 336 ) 337 338 if only_with_launch_url: 339 allowed_applications = self._filter_applications_with_launch_url(allowed_applications) 340 341 serializer = self.get_serializer(allowed_applications, many=True) 342 return self.get_paginated_response(serializer.data)
Custom list method that checks Policy based access instead of guardian