authentik.core.api.applications

Application API Views

  1"""Application API Views"""
  2
  3from collections.abc import Iterator
  4from copy import copy
  5
  6from django.core.cache import cache
  7from django.db.models import Case, QuerySet
  8from django.db.models.expressions import When
  9from django.shortcuts import get_object_or_404
 10from django.utils.translation import gettext as _
 11from drf_spectacular.types import OpenApiTypes
 12from drf_spectacular.utils import OpenApiParameter, extend_schema
 13from guardian.shortcuts import get_objects_for_user
 14from rest_framework.decorators import action
 15from rest_framework.exceptions import ValidationError
 16from rest_framework.fields import CharField, ReadOnlyField, SerializerMethodField
 17from rest_framework.request import Request
 18from rest_framework.response import Response
 19from rest_framework.viewsets import ModelViewSet
 20from structlog.stdlib import get_logger
 21
 22from authentik.api.pagination import Pagination
 23from authentik.blueprints.v1.importer import SERIALIZER_CONTEXT_BLUEPRINT
 24from authentik.core.api.providers import ProviderSerializer
 25from authentik.core.api.used_by import UsedByMixin
 26from authentik.core.api.users import UserSerializer
 27from authentik.core.api.utils import ModelSerializer, ThemedUrlsSerializer
 28from authentik.core.apps import AppAccessWithoutBindings
 29from authentik.core.models import Application, User
 30from authentik.events.logs import LogEventSerializer, capture_logs
 31from authentik.policies.api.exec import PolicyTestResultSerializer
 32from authentik.policies.engine import PolicyEngine
 33from authentik.policies.types import CACHE_PREFIX, PolicyResult
 34from authentik.rbac.filters import ObjectFilter
 35
 36LOGGER = get_logger()
 37
 38
 39def user_app_cache_key(
 40    user_pk: str, page_number: int | None = None, only_with_launch_url: bool = False
 41) -> str:
 42    """Cache key where application list for user is saved"""
 43    key = f"{CACHE_PREFIX}app_access/{user_pk}"
 44    if only_with_launch_url:
 45        key += "/launch"
 46    if page_number:
 47        key += f"/{page_number}"
 48    return key
 49
 50
 51class ApplicationSerializer(ModelSerializer):
 52    """Application Serializer"""
 53
 54    launch_url = SerializerMethodField()
 55    provider_obj = ProviderSerializer(
 56        source="get_provider",
 57        required=False,
 58        read_only=True,
 59        allow_null=True,
 60    )
 61    backchannel_providers_obj = ProviderSerializer(
 62        source="backchannel_providers", required=False, read_only=True, many=True
 63    )
 64
 65    meta_icon_url = ReadOnlyField(source="get_meta_icon")
 66    meta_icon_themed_urls = ThemedUrlsSerializer(
 67        source="get_meta_icon_themed_urls", read_only=True, allow_null=True
 68    )
 69
 70    def get_launch_url(self, app: Application) -> str | None:
 71        """Allow formatting of launch URL"""
 72        user = None
 73        user_data = None
 74
 75        if "request" in self.context:
 76            user = self.context["request"].user
 77
 78        # Cache serialized user data to avoid N+1 when formatting launch URLs
 79        # for multiple applications. UserSerializer accesses user.groups which
 80        # would otherwise trigger a query for each application.
 81        if user is not None:
 82            if "_cached_user_data" not in self.context:
 83                # Prefetch groups to avoid N+1
 84                self.context["_cached_user_data"] = UserSerializer(instance=user).data
 85            user_data = self.context["_cached_user_data"]
 86
 87        return app.get_launch_url(user, user_data=user_data)
 88
 89    def validate_slug(self, slug: str) -> str:
 90        if slug in Application.reserved_slugs:
 91            raise ValidationError(
 92                _("The slug '{slug}' is reserved and cannot be used for applications.").format(
 93                    slug=slug
 94                )
 95            )
 96        return slug
 97
 98    def __init__(self, *args, **kwargs) -> None:
 99        super().__init__(*args, **kwargs)
100        if SERIALIZER_CONTEXT_BLUEPRINT in self.context:
101            self.fields["icon"] = CharField(source="meta_icon", required=False)
102
103    class Meta:
104        model = Application
105        fields = [
106            "pk",
107            "name",
108            "slug",
109            "provider",
110            "provider_obj",
111            "backchannel_providers",
112            "backchannel_providers_obj",
113            "launch_url",
114            "open_in_new_tab",
115            "meta_launch_url",
116            "meta_icon",
117            "meta_icon_url",
118            "meta_icon_themed_urls",
119            "meta_description",
120            "meta_publisher",
121            "policy_engine_mode",
122            "group",
123            "meta_hide",
124        ]
125        extra_kwargs = {
126            "backchannel_providers": {"required": False},
127        }
128
129
130class ApplicationViewSet(UsedByMixin, ModelViewSet):
131    """Application Viewset"""
132
133    queryset = (
134        Application.objects.all()
135        .with_provider()
136        .prefetch_related("policies")
137        .prefetch_related("backchannel_providers")
138    )
139    serializer_class = ApplicationSerializer
140    search_fields = [
141        "name",
142        "slug",
143        "meta_launch_url",
144        "meta_description",
145        "meta_publisher",
146        "group",
147    ]
148    filterset_fields = [
149        "name",
150        "slug",
151        "meta_launch_url",
152        "meta_description",
153        "meta_publisher",
154        "group",
155    ]
156    lookup_field = "slug"
157    ordering = ["name"]
158
159    def _filter_queryset_for_list(self, queryset: QuerySet) -> QuerySet:
160        """Custom filter_queryset method which ignores guardian, but still supports sorting"""
161        for backend in list(self.filter_backends):
162            if backend == ObjectFilter:
163                continue
164            queryset = backend().filter_queryset(self.request, queryset, self)
165        return queryset
166
167    def _get_allowed_applications(
168        self, paginated_apps: Iterator[Application], user: User | None = None
169    ) -> list[Application]:
170        applications = []
171        request = self.request._request
172        if user:
173            request = copy(request)
174            request.user = user
175        for application in paginated_apps:
176            engine = PolicyEngine(application, request.user, request)
177            engine.empty_result = AppAccessWithoutBindings.get()
178            engine.build()
179            if engine.passing:
180                applications.append(application)
181        return applications
182
183    def _expand_applications(self, applications: list[Application]) -> QuerySet[Application]:
184        """
185        Re-fetch with proper prefetching for serialization
186        Cached applications don't have prefetched relationships, causing N+1 queries
187        during serialization when get_provider() is called
188        """
189        if not applications:
190            return self.get_queryset().none()
191        pks = [app.pk for app in applications]
192        return (
193            self.get_queryset()
194            .filter(pk__in=pks)
195            .order_by(Case(*[When(pk=pk, then=pos) for pos, pk in enumerate(pks)]))
196        )
197
198    def _filter_applications_with_launch_url(
199        self, paginated_apps: QuerySet[Application]
200    ) -> list[Application]:
201        applications = []
202        for app in paginated_apps:
203            if app.get_launch_url():
204                applications.append(app)
205        return applications
206
207    @extend_schema(
208        parameters=[
209            OpenApiParameter(
210                name="for_user",
211                location=OpenApiParameter.QUERY,
212                type=OpenApiTypes.INT,
213            )
214        ],
215        responses={
216            200: PolicyTestResultSerializer(),
217        },
218    )
219    @action(detail=True, methods=["GET"])
220    def check_access(self, request: Request, slug: str) -> Response:
221        """Check access to a single application by slug"""
222        # Don't use self.get_object as that checks for view_application permission
223        # which the user might not have, even if they have access
224        application = get_object_or_404(Application, slug=slug)
225        # If the current user is superuser, they can set `for_user`
226        for_user = request.user
227        if request.user.is_superuser and "for_user" in request.query_params:
228            try:
229                for_user = User.objects.filter(pk=request.query_params.get("for_user")).first()
230            except ValueError:
231                raise ValidationError({"for_user": "for_user must be numerical"}) from None
232            if not for_user:
233                raise ValidationError({"for_user": "User not found"})
234        engine = PolicyEngine(application, for_user, request)
235        engine.empty_result = AppAccessWithoutBindings.get()
236        engine.use_cache = False
237        with capture_logs() as logs:
238            engine.build()
239            result = engine.result
240        response = PolicyTestResultSerializer(PolicyResult(False))
241        if result.passing:
242            response = PolicyTestResultSerializer(PolicyResult(True))
243        if request.user.is_superuser:
244            log_messages = []
245            for log in logs:
246                if log.attributes.get("process", "") == "PolicyProcess":
247                    continue
248                log_messages.append(LogEventSerializer(log).data)
249            result.log_messages = log_messages
250            response = PolicyTestResultSerializer(result)
251        return Response(response.data)
252
253    @extend_schema(
254        parameters=[
255            OpenApiParameter(
256                name="superuser_full_list",
257                location=OpenApiParameter.QUERY,
258                type=OpenApiTypes.BOOL,
259            ),
260            OpenApiParameter(
261                name="for_user",
262                location=OpenApiParameter.QUERY,
263                type=OpenApiTypes.INT,
264            ),
265            OpenApiParameter(
266                name="only_with_launch_url",
267                location=OpenApiParameter.QUERY,
268                type=OpenApiTypes.BOOL,
269            ),
270        ]
271    )
272    def list(self, request: Request) -> Response:
273        """Custom list method that checks Policy based access instead of guardian"""
274        should_cache = request.query_params.get("search", "") == ""
275
276        superuser_full_list = (
277            str(request.query_params.get("superuser_full_list", "false")).lower() == "true"
278        )
279        if superuser_full_list and request.user.is_superuser:
280            return super().list(request)
281
282        only_with_launch_url = (
283            str(request.query_params.get("only_with_launch_url", "false")).lower()
284        ) == "true"
285
286        queryset = self._filter_queryset_for_list(self.get_queryset())
287        queryset = queryset.exclude(meta_hide=True)
288        if only_with_launch_url:
289            # Pre-filter at DB level to skip expensive per-app policy evaluation
290            # for apps that can never appear in the launcher (no meta_launch_url
291            # and no provider, so no possible launch URL).
292            queryset = queryset.exclude(meta_launch_url="", provider__isnull=True)
293        paginator: Pagination = self.paginator
294        paginated_apps = paginator.paginate_queryset(queryset, request)
295
296        if "for_user" in request.query_params:
297            try:
298                for_user: int = int(request.query_params.get("for_user", 0))
299                for_user = (
300                    get_objects_for_user(request.user, "authentik_core.view_user_applications")
301                    .filter(pk=for_user)
302                    .first()
303                )
304                if not for_user:
305                    raise ValidationError({"for_user": "User not found"})
306            except ValueError as exc:
307                raise ValidationError from exc
308            allowed_applications = self._get_allowed_applications(paginated_apps, user=for_user)
309
310            serializer = self.get_serializer(allowed_applications, many=True)
311            return self.get_paginated_response(serializer.data)
312
313        allowed_applications = []
314        if not should_cache:
315            allowed_applications = self._get_allowed_applications(paginated_apps)
316        if should_cache:
317            allowed_applications = cache.get(
318                user_app_cache_key(
319                    self.request.user.pk, paginator.page.number, only_with_launch_url
320                )
321            )
322            if allowed_applications:
323                # Re-fetch cached applications since pickled instances lose prefetched
324                # relationships, causing N+1 queries during serialization
325                allowed_applications = self._expand_applications(allowed_applications)
326            else:
327                LOGGER.debug("Caching allowed application list", page=paginator.page.number)
328                allowed_applications = self._get_allowed_applications(paginated_apps)
329                cache.set(
330                    user_app_cache_key(
331                        self.request.user.pk, paginator.page.number, only_with_launch_url
332                    ),
333                    allowed_applications,
334                    timeout=86400,
335                )
336
337        if only_with_launch_url:
338            allowed_applications = self._filter_applications_with_launch_url(allowed_applications)
339
340        serializer = self.get_serializer(allowed_applications, many=True)
341        return self.get_paginated_response(serializer.data)
LOGGER = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
def user_app_cache_key( user_pk: str, page_number: int | None = None, only_with_launch_url: bool = False) -> str:
40def user_app_cache_key(
41    user_pk: str, page_number: int | None = None, only_with_launch_url: bool = False
42) -> str:
43    """Cache key where application list for user is saved"""
44    key = f"{CACHE_PREFIX}app_access/{user_pk}"
45    if only_with_launch_url:
46        key += "/launch"
47    if page_number:
48        key += f"/{page_number}"
49    return key

Cache key where application list for user is saved

class ApplicationSerializer(authentik.core.api.utils.ModelSerializer):
 52class ApplicationSerializer(ModelSerializer):
 53    """Application Serializer"""
 54
 55    launch_url = SerializerMethodField()
 56    provider_obj = ProviderSerializer(
 57        source="get_provider",
 58        required=False,
 59        read_only=True,
 60        allow_null=True,
 61    )
 62    backchannel_providers_obj = ProviderSerializer(
 63        source="backchannel_providers", required=False, read_only=True, many=True
 64    )
 65
 66    meta_icon_url = ReadOnlyField(source="get_meta_icon")
 67    meta_icon_themed_urls = ThemedUrlsSerializer(
 68        source="get_meta_icon_themed_urls", read_only=True, allow_null=True
 69    )
 70
 71    def get_launch_url(self, app: Application) -> str | None:
 72        """Allow formatting of launch URL"""
 73        user = None
 74        user_data = None
 75
 76        if "request" in self.context:
 77            user = self.context["request"].user
 78
 79        # Cache serialized user data to avoid N+1 when formatting launch URLs
 80        # for multiple applications. UserSerializer accesses user.groups which
 81        # would otherwise trigger a query for each application.
 82        if user is not None:
 83            if "_cached_user_data" not in self.context:
 84                # Prefetch groups to avoid N+1
 85                self.context["_cached_user_data"] = UserSerializer(instance=user).data
 86            user_data = self.context["_cached_user_data"]
 87
 88        return app.get_launch_url(user, user_data=user_data)
 89
 90    def validate_slug(self, slug: str) -> str:
 91        if slug in Application.reserved_slugs:
 92            raise ValidationError(
 93                _("The slug '{slug}' is reserved and cannot be used for applications.").format(
 94                    slug=slug
 95                )
 96            )
 97        return slug
 98
 99    def __init__(self, *args, **kwargs) -> None:
100        super().__init__(*args, **kwargs)
101        if SERIALIZER_CONTEXT_BLUEPRINT in self.context:
102            self.fields["icon"] = CharField(source="meta_icon", required=False)
103
104    class Meta:
105        model = Application
106        fields = [
107            "pk",
108            "name",
109            "slug",
110            "provider",
111            "provider_obj",
112            "backchannel_providers",
113            "backchannel_providers_obj",
114            "launch_url",
115            "open_in_new_tab",
116            "meta_launch_url",
117            "meta_icon",
118            "meta_icon_url",
119            "meta_icon_themed_urls",
120            "meta_description",
121            "meta_publisher",
122            "policy_engine_mode",
123            "group",
124            "meta_hide",
125        ]
126        extra_kwargs = {
127            "backchannel_providers": {"required": False},
128        }

Application Serializer

ApplicationSerializer(*args, **kwargs)
 99    def __init__(self, *args, **kwargs) -> None:
100        super().__init__(*args, **kwargs)
101        if SERIALIZER_CONTEXT_BLUEPRINT in self.context:
102            self.fields["icon"] = CharField(source="meta_icon", required=False)
launch_url
provider_obj
backchannel_providers_obj
meta_icon_url
meta_icon_themed_urls
def get_launch_url(self, app: authentik.core.models.Application) -> str | None:
71    def get_launch_url(self, app: Application) -> str | None:
72        """Allow formatting of launch URL"""
73        user = None
74        user_data = None
75
76        if "request" in self.context:
77            user = self.context["request"].user
78
79        # Cache serialized user data to avoid N+1 when formatting launch URLs
80        # for multiple applications. UserSerializer accesses user.groups which
81        # would otherwise trigger a query for each application.
82        if user is not None:
83            if "_cached_user_data" not in self.context:
84                # Prefetch groups to avoid N+1
85                self.context["_cached_user_data"] = UserSerializer(instance=user).data
86            user_data = self.context["_cached_user_data"]
87
88        return app.get_launch_url(user, user_data=user_data)

Allow formatting of launch URL

def validate_slug(self, slug: str) -> str:
90    def validate_slug(self, slug: str) -> str:
91        if slug in Application.reserved_slugs:
92            raise ValidationError(
93                _("The slug '{slug}' is reserved and cannot be used for applications.").format(
94                    slug=slug
95                )
96            )
97        return slug
class ApplicationSerializer.Meta:
104    class Meta:
105        model = Application
106        fields = [
107            "pk",
108            "name",
109            "slug",
110            "provider",
111            "provider_obj",
112            "backchannel_providers",
113            "backchannel_providers_obj",
114            "launch_url",
115            "open_in_new_tab",
116            "meta_launch_url",
117            "meta_icon",
118            "meta_icon_url",
119            "meta_icon_themed_urls",
120            "meta_description",
121            "meta_publisher",
122            "policy_engine_mode",
123            "group",
124            "meta_hide",
125        ]
126        extra_kwargs = {
127            "backchannel_providers": {"required": False},
128        }
fields = ['pk', 'name', 'slug', 'provider', 'provider_obj', 'backchannel_providers', 'backchannel_providers_obj', 'launch_url', 'open_in_new_tab', 'meta_launch_url', 'meta_icon', 'meta_icon_url', 'meta_icon_themed_urls', 'meta_description', 'meta_publisher', 'policy_engine_mode', 'group', 'meta_hide']
extra_kwargs = {'backchannel_providers': {'required': False}}
class ApplicationViewSet(authentik.core.api.used_by.UsedByMixin, rest_framework.viewsets.ModelViewSet):
131class ApplicationViewSet(UsedByMixin, ModelViewSet):
132    """Application Viewset"""
133
134    queryset = (
135        Application.objects.all()
136        .with_provider()
137        .prefetch_related("policies")
138        .prefetch_related("backchannel_providers")
139    )
140    serializer_class = ApplicationSerializer
141    search_fields = [
142        "name",
143        "slug",
144        "meta_launch_url",
145        "meta_description",
146        "meta_publisher",
147        "group",
148    ]
149    filterset_fields = [
150        "name",
151        "slug",
152        "meta_launch_url",
153        "meta_description",
154        "meta_publisher",
155        "group",
156    ]
157    lookup_field = "slug"
158    ordering = ["name"]
159
160    def _filter_queryset_for_list(self, queryset: QuerySet) -> QuerySet:
161        """Custom filter_queryset method which ignores guardian, but still supports sorting"""
162        for backend in list(self.filter_backends):
163            if backend == ObjectFilter:
164                continue
165            queryset = backend().filter_queryset(self.request, queryset, self)
166        return queryset
167
168    def _get_allowed_applications(
169        self, paginated_apps: Iterator[Application], user: User | None = None
170    ) -> list[Application]:
171        applications = []
172        request = self.request._request
173        if user:
174            request = copy(request)
175            request.user = user
176        for application in paginated_apps:
177            engine = PolicyEngine(application, request.user, request)
178            engine.empty_result = AppAccessWithoutBindings.get()
179            engine.build()
180            if engine.passing:
181                applications.append(application)
182        return applications
183
184    def _expand_applications(self, applications: list[Application]) -> QuerySet[Application]:
185        """
186        Re-fetch with proper prefetching for serialization
187        Cached applications don't have prefetched relationships, causing N+1 queries
188        during serialization when get_provider() is called
189        """
190        if not applications:
191            return self.get_queryset().none()
192        pks = [app.pk for app in applications]
193        return (
194            self.get_queryset()
195            .filter(pk__in=pks)
196            .order_by(Case(*[When(pk=pk, then=pos) for pos, pk in enumerate(pks)]))
197        )
198
199    def _filter_applications_with_launch_url(
200        self, paginated_apps: QuerySet[Application]
201    ) -> list[Application]:
202        applications = []
203        for app in paginated_apps:
204            if app.get_launch_url():
205                applications.append(app)
206        return applications
207
208    @extend_schema(
209        parameters=[
210            OpenApiParameter(
211                name="for_user",
212                location=OpenApiParameter.QUERY,
213                type=OpenApiTypes.INT,
214            )
215        ],
216        responses={
217            200: PolicyTestResultSerializer(),
218        },
219    )
220    @action(detail=True, methods=["GET"])
221    def check_access(self, request: Request, slug: str) -> Response:
222        """Check access to a single application by slug"""
223        # Don't use self.get_object as that checks for view_application permission
224        # which the user might not have, even if they have access
225        application = get_object_or_404(Application, slug=slug)
226        # If the current user is superuser, they can set `for_user`
227        for_user = request.user
228        if request.user.is_superuser and "for_user" in request.query_params:
229            try:
230                for_user = User.objects.filter(pk=request.query_params.get("for_user")).first()
231            except ValueError:
232                raise ValidationError({"for_user": "for_user must be numerical"}) from None
233            if not for_user:
234                raise ValidationError({"for_user": "User not found"})
235        engine = PolicyEngine(application, for_user, request)
236        engine.empty_result = AppAccessWithoutBindings.get()
237        engine.use_cache = False
238        with capture_logs() as logs:
239            engine.build()
240            result = engine.result
241        response = PolicyTestResultSerializer(PolicyResult(False))
242        if result.passing:
243            response = PolicyTestResultSerializer(PolicyResult(True))
244        if request.user.is_superuser:
245            log_messages = []
246            for log in logs:
247                if log.attributes.get("process", "") == "PolicyProcess":
248                    continue
249                log_messages.append(LogEventSerializer(log).data)
250            result.log_messages = log_messages
251            response = PolicyTestResultSerializer(result)
252        return Response(response.data)
253
254    @extend_schema(
255        parameters=[
256            OpenApiParameter(
257                name="superuser_full_list",
258                location=OpenApiParameter.QUERY,
259                type=OpenApiTypes.BOOL,
260            ),
261            OpenApiParameter(
262                name="for_user",
263                location=OpenApiParameter.QUERY,
264                type=OpenApiTypes.INT,
265            ),
266            OpenApiParameter(
267                name="only_with_launch_url",
268                location=OpenApiParameter.QUERY,
269                type=OpenApiTypes.BOOL,
270            ),
271        ]
272    )
273    def list(self, request: Request) -> Response:
274        """Custom list method that checks Policy based access instead of guardian"""
275        should_cache = request.query_params.get("search", "") == ""
276
277        superuser_full_list = (
278            str(request.query_params.get("superuser_full_list", "false")).lower() == "true"
279        )
280        if superuser_full_list and request.user.is_superuser:
281            return super().list(request)
282
283        only_with_launch_url = (
284            str(request.query_params.get("only_with_launch_url", "false")).lower()
285        ) == "true"
286
287        queryset = self._filter_queryset_for_list(self.get_queryset())
288        queryset = queryset.exclude(meta_hide=True)
289        if only_with_launch_url:
290            # Pre-filter at DB level to skip expensive per-app policy evaluation
291            # for apps that can never appear in the launcher (no meta_launch_url
292            # and no provider, so no possible launch URL).
293            queryset = queryset.exclude(meta_launch_url="", provider__isnull=True)
294        paginator: Pagination = self.paginator
295        paginated_apps = paginator.paginate_queryset(queryset, request)
296
297        if "for_user" in request.query_params:
298            try:
299                for_user: int = int(request.query_params.get("for_user", 0))
300                for_user = (
301                    get_objects_for_user(request.user, "authentik_core.view_user_applications")
302                    .filter(pk=for_user)
303                    .first()
304                )
305                if not for_user:
306                    raise ValidationError({"for_user": "User not found"})
307            except ValueError as exc:
308                raise ValidationError from exc
309            allowed_applications = self._get_allowed_applications(paginated_apps, user=for_user)
310
311            serializer = self.get_serializer(allowed_applications, many=True)
312            return self.get_paginated_response(serializer.data)
313
314        allowed_applications = []
315        if not should_cache:
316            allowed_applications = self._get_allowed_applications(paginated_apps)
317        if should_cache:
318            allowed_applications = cache.get(
319                user_app_cache_key(
320                    self.request.user.pk, paginator.page.number, only_with_launch_url
321                )
322            )
323            if allowed_applications:
324                # Re-fetch cached applications since pickled instances lose prefetched
325                # relationships, causing N+1 queries during serialization
326                allowed_applications = self._expand_applications(allowed_applications)
327            else:
328                LOGGER.debug("Caching allowed application list", page=paginator.page.number)
329                allowed_applications = self._get_allowed_applications(paginated_apps)
330                cache.set(
331                    user_app_cache_key(
332                        self.request.user.pk, paginator.page.number, only_with_launch_url
333                    ),
334                    allowed_applications,
335                    timeout=86400,
336                )
337
338        if only_with_launch_url:
339            allowed_applications = self._filter_applications_with_launch_url(allowed_applications)
340
341        serializer = self.get_serializer(allowed_applications, many=True)
342        return self.get_paginated_response(serializer.data)

Application Viewset

queryset = <ApplicationQuerySet []>
serializer_class = <class 'ApplicationSerializer'>
search_fields = ['name', 'slug', 'meta_launch_url', 'meta_description', 'meta_publisher', 'group']
filterset_fields = ['name', 'slug', 'meta_launch_url', 'meta_description', 'meta_publisher', 'group']
lookup_field = 'slug'
ordering = ['name']
@extend_schema(parameters=[OpenApiParameter(name='for_user', location=OpenApiParameter.QUERY, type=OpenApiTypes.INT)], responses={200: PolicyTestResultSerializer()})
@action(detail=True, methods=['GET'])
def check_access( self, request: rest_framework.request.Request, slug: str) -> rest_framework.response.Response:
208    @extend_schema(
209        parameters=[
210            OpenApiParameter(
211                name="for_user",
212                location=OpenApiParameter.QUERY,
213                type=OpenApiTypes.INT,
214            )
215        ],
216        responses={
217            200: PolicyTestResultSerializer(),
218        },
219    )
220    @action(detail=True, methods=["GET"])
221    def check_access(self, request: Request, slug: str) -> Response:
222        """Check access to a single application by slug"""
223        # Don't use self.get_object as that checks for view_application permission
224        # which the user might not have, even if they have access
225        application = get_object_or_404(Application, slug=slug)
226        # If the current user is superuser, they can set `for_user`
227        for_user = request.user
228        if request.user.is_superuser and "for_user" in request.query_params:
229            try:
230                for_user = User.objects.filter(pk=request.query_params.get("for_user")).first()
231            except ValueError:
232                raise ValidationError({"for_user": "for_user must be numerical"}) from None
233            if not for_user:
234                raise ValidationError({"for_user": "User not found"})
235        engine = PolicyEngine(application, for_user, request)
236        engine.empty_result = AppAccessWithoutBindings.get()
237        engine.use_cache = False
238        with capture_logs() as logs:
239            engine.build()
240            result = engine.result
241        response = PolicyTestResultSerializer(PolicyResult(False))
242        if result.passing:
243            response = PolicyTestResultSerializer(PolicyResult(True))
244        if request.user.is_superuser:
245            log_messages = []
246            for log in logs:
247                if log.attributes.get("process", "") == "PolicyProcess":
248                    continue
249                log_messages.append(LogEventSerializer(log).data)
250            result.log_messages = log_messages
251            response = PolicyTestResultSerializer(result)
252        return Response(response.data)

Check access to a single application by slug

@extend_schema(parameters=[OpenApiParameter(name='superuser_full_list', location=OpenApiParameter.QUERY, type=OpenApiTypes.BOOL), OpenApiParameter(name='for_user', location=OpenApiParameter.QUERY, type=OpenApiTypes.INT), OpenApiParameter(name='only_with_launch_url', location=OpenApiParameter.QUERY, type=OpenApiTypes.BOOL)])
def list( self, request: rest_framework.request.Request) -> rest_framework.response.Response:
254    @extend_schema(
255        parameters=[
256            OpenApiParameter(
257                name="superuser_full_list",
258                location=OpenApiParameter.QUERY,
259                type=OpenApiTypes.BOOL,
260            ),
261            OpenApiParameter(
262                name="for_user",
263                location=OpenApiParameter.QUERY,
264                type=OpenApiTypes.INT,
265            ),
266            OpenApiParameter(
267                name="only_with_launch_url",
268                location=OpenApiParameter.QUERY,
269                type=OpenApiTypes.BOOL,
270            ),
271        ]
272    )
273    def list(self, request: Request) -> Response:
274        """Custom list method that checks Policy based access instead of guardian"""
275        should_cache = request.query_params.get("search", "") == ""
276
277        superuser_full_list = (
278            str(request.query_params.get("superuser_full_list", "false")).lower() == "true"
279        )
280        if superuser_full_list and request.user.is_superuser:
281            return super().list(request)
282
283        only_with_launch_url = (
284            str(request.query_params.get("only_with_launch_url", "false")).lower()
285        ) == "true"
286
287        queryset = self._filter_queryset_for_list(self.get_queryset())
288        queryset = queryset.exclude(meta_hide=True)
289        if only_with_launch_url:
290            # Pre-filter at DB level to skip expensive per-app policy evaluation
291            # for apps that can never appear in the launcher (no meta_launch_url
292            # and no provider, so no possible launch URL).
293            queryset = queryset.exclude(meta_launch_url="", provider__isnull=True)
294        paginator: Pagination = self.paginator
295        paginated_apps = paginator.paginate_queryset(queryset, request)
296
297        if "for_user" in request.query_params:
298            try:
299                for_user: int = int(request.query_params.get("for_user", 0))
300                for_user = (
301                    get_objects_for_user(request.user, "authentik_core.view_user_applications")
302                    .filter(pk=for_user)
303                    .first()
304                )
305                if not for_user:
306                    raise ValidationError({"for_user": "User not found"})
307            except ValueError as exc:
308                raise ValidationError from exc
309            allowed_applications = self._get_allowed_applications(paginated_apps, user=for_user)
310
311            serializer = self.get_serializer(allowed_applications, many=True)
312            return self.get_paginated_response(serializer.data)
313
314        allowed_applications = []
315        if not should_cache:
316            allowed_applications = self._get_allowed_applications(paginated_apps)
317        if should_cache:
318            allowed_applications = cache.get(
319                user_app_cache_key(
320                    self.request.user.pk, paginator.page.number, only_with_launch_url
321                )
322            )
323            if allowed_applications:
324                # Re-fetch cached applications since pickled instances lose prefetched
325                # relationships, causing N+1 queries during serialization
326                allowed_applications = self._expand_applications(allowed_applications)
327            else:
328                LOGGER.debug("Caching allowed application list", page=paginator.page.number)
329                allowed_applications = self._get_allowed_applications(paginated_apps)
330                cache.set(
331                    user_app_cache_key(
332                        self.request.user.pk, paginator.page.number, only_with_launch_url
333                    ),
334                    allowed_applications,
335                    timeout=86400,
336                )
337
338        if only_with_launch_url:
339            allowed_applications = self._filter_applications_with_launch_url(allowed_applications)
340
341        serializer = self.get_serializer(allowed_applications, many=True)
342        return self.get_paginated_response(serializer.data)

Custom list method that checks Policy based access instead of guardian

name = None
description = None
suffix = None
detail = None
basename = None