authentik.core.api.applications

Application API Views

  1"""Application API Views"""
  2
  3from collections.abc import Iterator
  4from copy import copy
  5
  6from django.core.cache import cache
  7from django.db.models import Case, QuerySet
  8from django.db.models.expressions import When
  9from django.shortcuts import get_object_or_404
 10from django.utils.translation import gettext as _
 11from drf_spectacular.types import OpenApiTypes
 12from drf_spectacular.utils import OpenApiParameter, extend_schema
 13from guardian.shortcuts import get_objects_for_user
 14from rest_framework.decorators import action
 15from rest_framework.exceptions import ValidationError
 16from rest_framework.fields import CharField, ReadOnlyField, SerializerMethodField
 17from rest_framework.request import Request
 18from rest_framework.response import Response
 19from rest_framework.viewsets import ModelViewSet
 20from structlog.stdlib import get_logger
 21
 22from authentik.api.pagination import Pagination
 23from authentik.blueprints.v1.importer import SERIALIZER_CONTEXT_BLUEPRINT
 24from authentik.core.api.providers import ProviderSerializer
 25from authentik.core.api.used_by import UsedByMixin
 26from authentik.core.api.users import UserSerializer
 27from authentik.core.api.utils import ModelSerializer, ThemedUrlsSerializer
 28from authentik.core.apps import AppAccessWithoutBindings
 29from authentik.core.models import Application, User
 30from authentik.events.logs import LogEventSerializer, capture_logs
 31from authentik.policies.api.exec import PolicyTestResultSerializer
 32from authentik.policies.engine import PolicyEngine
 33from authentik.policies.types import CACHE_PREFIX, PolicyResult
 34from authentik.rbac.filters import ObjectFilter
 35
 36LOGGER = get_logger()
 37
 38
 39def user_app_cache_key(user_pk: str, page_number: int | None = None) -> str:
 40    """Cache key where application list for user is saved"""
 41    key = f"{CACHE_PREFIX}app_access/{user_pk}"
 42    if page_number:
 43        key += f"/{page_number}"
 44    return key
 45
 46
 47class ApplicationSerializer(ModelSerializer):
 48    """Application Serializer"""
 49
 50    launch_url = SerializerMethodField()
 51    provider_obj = ProviderSerializer(
 52        source="get_provider",
 53        required=False,
 54        read_only=True,
 55        allow_null=True,
 56    )
 57    backchannel_providers_obj = ProviderSerializer(
 58        source="backchannel_providers", required=False, read_only=True, many=True
 59    )
 60
 61    meta_icon_url = ReadOnlyField(source="get_meta_icon")
 62    meta_icon_themed_urls = ThemedUrlsSerializer(
 63        source="get_meta_icon_themed_urls", read_only=True, allow_null=True
 64    )
 65
 66    def get_launch_url(self, app: Application) -> str | None:
 67        """Allow formatting of launch URL"""
 68        user = None
 69        user_data = None
 70
 71        if "request" in self.context:
 72            user = self.context["request"].user
 73
 74        # Cache serialized user data to avoid N+1 when formatting launch URLs
 75        # for multiple applications. UserSerializer accesses user.groups which
 76        # would otherwise trigger a query for each application.
 77        if user is not None:
 78            if "_cached_user_data" not in self.context:
 79                # Prefetch groups to avoid N+1
 80                self.context["_cached_user_data"] = UserSerializer(instance=user).data
 81            user_data = self.context["_cached_user_data"]
 82
 83        return app.get_launch_url(user, user_data=user_data)
 84
 85    def validate_slug(self, slug: str) -> str:
 86        if slug in Application.reserved_slugs:
 87            raise ValidationError(
 88                _("The slug '{slug}' is reserved and cannot be used for applications.").format(
 89                    slug=slug
 90                )
 91            )
 92        return slug
 93
 94    def __init__(self, *args, **kwargs) -> None:
 95        super().__init__(*args, **kwargs)
 96        if SERIALIZER_CONTEXT_BLUEPRINT in self.context:
 97            self.fields["icon"] = CharField(source="meta_icon", required=False)
 98
 99    class Meta:
100        model = Application
101        fields = [
102            "pk",
103            "name",
104            "slug",
105            "provider",
106            "provider_obj",
107            "backchannel_providers",
108            "backchannel_providers_obj",
109            "launch_url",
110            "open_in_new_tab",
111            "meta_launch_url",
112            "meta_icon",
113            "meta_icon_url",
114            "meta_icon_themed_urls",
115            "meta_description",
116            "meta_publisher",
117            "policy_engine_mode",
118            "group",
119        ]
120        extra_kwargs = {
121            "backchannel_providers": {"required": False},
122        }
123
124
125class ApplicationViewSet(UsedByMixin, ModelViewSet):
126    """Application Viewset"""
127
128    queryset = (
129        Application.objects.all()
130        .with_provider()
131        .prefetch_related("policies")
132        .prefetch_related("backchannel_providers")
133    )
134    serializer_class = ApplicationSerializer
135    search_fields = [
136        "name",
137        "slug",
138        "meta_launch_url",
139        "meta_description",
140        "meta_publisher",
141        "group",
142    ]
143    filterset_fields = [
144        "name",
145        "slug",
146        "meta_launch_url",
147        "meta_description",
148        "meta_publisher",
149        "group",
150    ]
151    lookup_field = "slug"
152    ordering = ["name"]
153
154    def _filter_queryset_for_list(self, queryset: QuerySet) -> QuerySet:
155        """Custom filter_queryset method which ignores guardian, but still supports sorting"""
156        for backend in list(self.filter_backends):
157            if backend == ObjectFilter:
158                continue
159            queryset = backend().filter_queryset(self.request, queryset, self)
160        return queryset
161
162    def _get_allowed_applications(
163        self, paginated_apps: Iterator[Application], user: User | None = None
164    ) -> list[Application]:
165        applications = []
166        request = self.request._request
167        if user:
168            request = copy(request)
169            request.user = user
170        for application in paginated_apps:
171            engine = PolicyEngine(application, request.user, request)
172            engine.empty_result = AppAccessWithoutBindings.get()
173            engine.build()
174            if engine.passing:
175                applications.append(application)
176        return applications
177
178    def _expand_applications(self, applications: list[Application]) -> QuerySet[Application]:
179        """
180        Re-fetch with proper prefetching for serialization
181        Cached applications don't have prefetched relationships, causing N+1 queries
182        during serialization when get_provider() is called
183        """
184        if not applications:
185            return self.get_queryset().none()
186        pks = [app.pk for app in applications]
187        return (
188            self.get_queryset()
189            .filter(pk__in=pks)
190            .order_by(Case(*[When(pk=pk, then=pos) for pos, pk in enumerate(pks)]))
191        )
192
193    def _filter_applications_with_launch_url(
194        self, paginated_apps: QuerySet[Application]
195    ) -> list[Application]:
196        applications = []
197        for app in paginated_apps:
198            if app.get_launch_url():
199                applications.append(app)
200        return applications
201
202    @extend_schema(
203        parameters=[
204            OpenApiParameter(
205                name="for_user",
206                location=OpenApiParameter.QUERY,
207                type=OpenApiTypes.INT,
208            )
209        ],
210        responses={
211            200: PolicyTestResultSerializer(),
212        },
213    )
214    @action(detail=True, methods=["GET"])
215    def check_access(self, request: Request, slug: str) -> Response:
216        """Check access to a single application by slug"""
217        # Don't use self.get_object as that checks for view_application permission
218        # which the user might not have, even if they have access
219        application = get_object_or_404(Application, slug=slug)
220        # If the current user is superuser, they can set `for_user`
221        for_user = request.user
222        if request.user.is_superuser and "for_user" in request.query_params:
223            try:
224                for_user = User.objects.filter(pk=request.query_params.get("for_user")).first()
225            except ValueError:
226                raise ValidationError({"for_user": "for_user must be numerical"}) from None
227            if not for_user:
228                raise ValidationError({"for_user": "User not found"})
229        engine = PolicyEngine(application, for_user, request)
230        engine.empty_result = AppAccessWithoutBindings.get()
231        engine.use_cache = False
232        with capture_logs() as logs:
233            engine.build()
234            result = engine.result
235        response = PolicyTestResultSerializer(PolicyResult(False))
236        if result.passing:
237            response = PolicyTestResultSerializer(PolicyResult(True))
238        if request.user.is_superuser:
239            log_messages = []
240            for log in logs:
241                if log.attributes.get("process", "") == "PolicyProcess":
242                    continue
243                log_messages.append(LogEventSerializer(log).data)
244            result.log_messages = log_messages
245            response = PolicyTestResultSerializer(result)
246        return Response(response.data)
247
248    @extend_schema(
249        parameters=[
250            OpenApiParameter(
251                name="superuser_full_list",
252                location=OpenApiParameter.QUERY,
253                type=OpenApiTypes.BOOL,
254            ),
255            OpenApiParameter(
256                name="for_user",
257                location=OpenApiParameter.QUERY,
258                type=OpenApiTypes.INT,
259            ),
260            OpenApiParameter(
261                name="only_with_launch_url",
262                location=OpenApiParameter.QUERY,
263                type=OpenApiTypes.BOOL,
264            ),
265        ]
266    )
267    def list(self, request: Request) -> Response:
268        """Custom list method that checks Policy based access instead of guardian"""
269        should_cache = request.query_params.get("search", "") == ""
270
271        superuser_full_list = (
272            str(request.query_params.get("superuser_full_list", "false")).lower() == "true"
273        )
274        if superuser_full_list and request.user.is_superuser:
275            return super().list(request)
276
277        only_with_launch_url = str(
278            request.query_params.get("only_with_launch_url", "false")
279        ).lower()
280
281        queryset = self._filter_queryset_for_list(self.get_queryset())
282        paginator: Pagination = self.paginator
283        paginated_apps = paginator.paginate_queryset(queryset, request)
284
285        if "for_user" in request.query_params:
286            try:
287                for_user: int = int(request.query_params.get("for_user", 0))
288                for_user = (
289                    get_objects_for_user(request.user, "authentik_core.view_user_applications")
290                    .filter(pk=for_user)
291                    .first()
292                )
293                if not for_user:
294                    raise ValidationError({"for_user": "User not found"})
295            except ValueError as exc:
296                raise ValidationError from exc
297            allowed_applications = self._get_allowed_applications(paginated_apps, user=for_user)
298            allowed_applications = self._expand_applications(allowed_applications)
299
300            serializer = self.get_serializer(allowed_applications, many=True)
301            return self.get_paginated_response(serializer.data)
302
303        allowed_applications = []
304        if not should_cache:
305            allowed_applications = self._get_allowed_applications(paginated_apps)
306        if should_cache:
307            allowed_applications = cache.get(
308                user_app_cache_key(self.request.user.pk, paginator.page.number)
309            )
310            if not allowed_applications:
311                LOGGER.debug("Caching allowed application list", page=paginator.page.number)
312                allowed_applications = self._get_allowed_applications(paginated_apps)
313                cache.set(
314                    user_app_cache_key(self.request.user.pk, paginator.page.number),
315                    allowed_applications,
316                    timeout=86400,
317                )
318        allowed_applications = self._expand_applications(allowed_applications)
319
320        if only_with_launch_url == "true":
321            allowed_applications = self._filter_applications_with_launch_url(allowed_applications)
322
323        serializer = self.get_serializer(allowed_applications, many=True)
324        return self.get_paginated_response(serializer.data)
LOGGER = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
def user_app_cache_key(user_pk: str, page_number: int | None = None) -> str:
40def user_app_cache_key(user_pk: str, page_number: int | None = None) -> str:
41    """Cache key where application list for user is saved"""
42    key = f"{CACHE_PREFIX}app_access/{user_pk}"
43    if page_number:
44        key += f"/{page_number}"
45    return key

Cache key where application list for user is saved

class ApplicationSerializer(authentik.core.api.utils.ModelSerializer):
 48class ApplicationSerializer(ModelSerializer):
 49    """Application Serializer"""
 50
 51    launch_url = SerializerMethodField()
 52    provider_obj = ProviderSerializer(
 53        source="get_provider",
 54        required=False,
 55        read_only=True,
 56        allow_null=True,
 57    )
 58    backchannel_providers_obj = ProviderSerializer(
 59        source="backchannel_providers", required=False, read_only=True, many=True
 60    )
 61
 62    meta_icon_url = ReadOnlyField(source="get_meta_icon")
 63    meta_icon_themed_urls = ThemedUrlsSerializer(
 64        source="get_meta_icon_themed_urls", read_only=True, allow_null=True
 65    )
 66
 67    def get_launch_url(self, app: Application) -> str | None:
 68        """Allow formatting of launch URL"""
 69        user = None
 70        user_data = None
 71
 72        if "request" in self.context:
 73            user = self.context["request"].user
 74
 75        # Cache serialized user data to avoid N+1 when formatting launch URLs
 76        # for multiple applications. UserSerializer accesses user.groups which
 77        # would otherwise trigger a query for each application.
 78        if user is not None:
 79            if "_cached_user_data" not in self.context:
 80                # Prefetch groups to avoid N+1
 81                self.context["_cached_user_data"] = UserSerializer(instance=user).data
 82            user_data = self.context["_cached_user_data"]
 83
 84        return app.get_launch_url(user, user_data=user_data)
 85
 86    def validate_slug(self, slug: str) -> str:
 87        if slug in Application.reserved_slugs:
 88            raise ValidationError(
 89                _("The slug '{slug}' is reserved and cannot be used for applications.").format(
 90                    slug=slug
 91                )
 92            )
 93        return slug
 94
 95    def __init__(self, *args, **kwargs) -> None:
 96        super().__init__(*args, **kwargs)
 97        if SERIALIZER_CONTEXT_BLUEPRINT in self.context:
 98            self.fields["icon"] = CharField(source="meta_icon", required=False)
 99
100    class Meta:
101        model = Application
102        fields = [
103            "pk",
104            "name",
105            "slug",
106            "provider",
107            "provider_obj",
108            "backchannel_providers",
109            "backchannel_providers_obj",
110            "launch_url",
111            "open_in_new_tab",
112            "meta_launch_url",
113            "meta_icon",
114            "meta_icon_url",
115            "meta_icon_themed_urls",
116            "meta_description",
117            "meta_publisher",
118            "policy_engine_mode",
119            "group",
120        ]
121        extra_kwargs = {
122            "backchannel_providers": {"required": False},
123        }

Application Serializer

ApplicationSerializer(*args, **kwargs)
95    def __init__(self, *args, **kwargs) -> None:
96        super().__init__(*args, **kwargs)
97        if SERIALIZER_CONTEXT_BLUEPRINT in self.context:
98            self.fields["icon"] = CharField(source="meta_icon", required=False)
launch_url
provider_obj
backchannel_providers_obj
meta_icon_url
meta_icon_themed_urls
def get_launch_url(self, app: authentik.core.models.Application) -> str | None:
67    def get_launch_url(self, app: Application) -> str | None:
68        """Allow formatting of launch URL"""
69        user = None
70        user_data = None
71
72        if "request" in self.context:
73            user = self.context["request"].user
74
75        # Cache serialized user data to avoid N+1 when formatting launch URLs
76        # for multiple applications. UserSerializer accesses user.groups which
77        # would otherwise trigger a query for each application.
78        if user is not None:
79            if "_cached_user_data" not in self.context:
80                # Prefetch groups to avoid N+1
81                self.context["_cached_user_data"] = UserSerializer(instance=user).data
82            user_data = self.context["_cached_user_data"]
83
84        return app.get_launch_url(user, user_data=user_data)

Allow formatting of launch URL

def validate_slug(self, slug: str) -> str:
86    def validate_slug(self, slug: str) -> str:
87        if slug in Application.reserved_slugs:
88            raise ValidationError(
89                _("The slug '{slug}' is reserved and cannot be used for applications.").format(
90                    slug=slug
91                )
92            )
93        return slug
class ApplicationSerializer.Meta:
100    class Meta:
101        model = Application
102        fields = [
103            "pk",
104            "name",
105            "slug",
106            "provider",
107            "provider_obj",
108            "backchannel_providers",
109            "backchannel_providers_obj",
110            "launch_url",
111            "open_in_new_tab",
112            "meta_launch_url",
113            "meta_icon",
114            "meta_icon_url",
115            "meta_icon_themed_urls",
116            "meta_description",
117            "meta_publisher",
118            "policy_engine_mode",
119            "group",
120        ]
121        extra_kwargs = {
122            "backchannel_providers": {"required": False},
123        }
fields = ['pk', 'name', 'slug', 'provider', 'provider_obj', 'backchannel_providers', 'backchannel_providers_obj', 'launch_url', 'open_in_new_tab', 'meta_launch_url', 'meta_icon', 'meta_icon_url', 'meta_icon_themed_urls', 'meta_description', 'meta_publisher', 'policy_engine_mode', 'group']
extra_kwargs = {'backchannel_providers': {'required': False}}
class ApplicationViewSet(authentik.core.api.used_by.UsedByMixin, rest_framework.viewsets.ModelViewSet):
126class ApplicationViewSet(UsedByMixin, ModelViewSet):
127    """Application Viewset"""
128
129    queryset = (
130        Application.objects.all()
131        .with_provider()
132        .prefetch_related("policies")
133        .prefetch_related("backchannel_providers")
134    )
135    serializer_class = ApplicationSerializer
136    search_fields = [
137        "name",
138        "slug",
139        "meta_launch_url",
140        "meta_description",
141        "meta_publisher",
142        "group",
143    ]
144    filterset_fields = [
145        "name",
146        "slug",
147        "meta_launch_url",
148        "meta_description",
149        "meta_publisher",
150        "group",
151    ]
152    lookup_field = "slug"
153    ordering = ["name"]
154
155    def _filter_queryset_for_list(self, queryset: QuerySet) -> QuerySet:
156        """Custom filter_queryset method which ignores guardian, but still supports sorting"""
157        for backend in list(self.filter_backends):
158            if backend == ObjectFilter:
159                continue
160            queryset = backend().filter_queryset(self.request, queryset, self)
161        return queryset
162
163    def _get_allowed_applications(
164        self, paginated_apps: Iterator[Application], user: User | None = None
165    ) -> list[Application]:
166        applications = []
167        request = self.request._request
168        if user:
169            request = copy(request)
170            request.user = user
171        for application in paginated_apps:
172            engine = PolicyEngine(application, request.user, request)
173            engine.empty_result = AppAccessWithoutBindings.get()
174            engine.build()
175            if engine.passing:
176                applications.append(application)
177        return applications
178
179    def _expand_applications(self, applications: list[Application]) -> QuerySet[Application]:
180        """
181        Re-fetch with proper prefetching for serialization
182        Cached applications don't have prefetched relationships, causing N+1 queries
183        during serialization when get_provider() is called
184        """
185        if not applications:
186            return self.get_queryset().none()
187        pks = [app.pk for app in applications]
188        return (
189            self.get_queryset()
190            .filter(pk__in=pks)
191            .order_by(Case(*[When(pk=pk, then=pos) for pos, pk in enumerate(pks)]))
192        )
193
194    def _filter_applications_with_launch_url(
195        self, paginated_apps: QuerySet[Application]
196    ) -> list[Application]:
197        applications = []
198        for app in paginated_apps:
199            if app.get_launch_url():
200                applications.append(app)
201        return applications
202
203    @extend_schema(
204        parameters=[
205            OpenApiParameter(
206                name="for_user",
207                location=OpenApiParameter.QUERY,
208                type=OpenApiTypes.INT,
209            )
210        ],
211        responses={
212            200: PolicyTestResultSerializer(),
213        },
214    )
215    @action(detail=True, methods=["GET"])
216    def check_access(self, request: Request, slug: str) -> Response:
217        """Check access to a single application by slug"""
218        # Don't use self.get_object as that checks for view_application permission
219        # which the user might not have, even if they have access
220        application = get_object_or_404(Application, slug=slug)
221        # If the current user is superuser, they can set `for_user`
222        for_user = request.user
223        if request.user.is_superuser and "for_user" in request.query_params:
224            try:
225                for_user = User.objects.filter(pk=request.query_params.get("for_user")).first()
226            except ValueError:
227                raise ValidationError({"for_user": "for_user must be numerical"}) from None
228            if not for_user:
229                raise ValidationError({"for_user": "User not found"})
230        engine = PolicyEngine(application, for_user, request)
231        engine.empty_result = AppAccessWithoutBindings.get()
232        engine.use_cache = False
233        with capture_logs() as logs:
234            engine.build()
235            result = engine.result
236        response = PolicyTestResultSerializer(PolicyResult(False))
237        if result.passing:
238            response = PolicyTestResultSerializer(PolicyResult(True))
239        if request.user.is_superuser:
240            log_messages = []
241            for log in logs:
242                if log.attributes.get("process", "") == "PolicyProcess":
243                    continue
244                log_messages.append(LogEventSerializer(log).data)
245            result.log_messages = log_messages
246            response = PolicyTestResultSerializer(result)
247        return Response(response.data)
248
249    @extend_schema(
250        parameters=[
251            OpenApiParameter(
252                name="superuser_full_list",
253                location=OpenApiParameter.QUERY,
254                type=OpenApiTypes.BOOL,
255            ),
256            OpenApiParameter(
257                name="for_user",
258                location=OpenApiParameter.QUERY,
259                type=OpenApiTypes.INT,
260            ),
261            OpenApiParameter(
262                name="only_with_launch_url",
263                location=OpenApiParameter.QUERY,
264                type=OpenApiTypes.BOOL,
265            ),
266        ]
267    )
268    def list(self, request: Request) -> Response:
269        """Custom list method that checks Policy based access instead of guardian"""
270        should_cache = request.query_params.get("search", "") == ""
271
272        superuser_full_list = (
273            str(request.query_params.get("superuser_full_list", "false")).lower() == "true"
274        )
275        if superuser_full_list and request.user.is_superuser:
276            return super().list(request)
277
278        only_with_launch_url = str(
279            request.query_params.get("only_with_launch_url", "false")
280        ).lower()
281
282        queryset = self._filter_queryset_for_list(self.get_queryset())
283        paginator: Pagination = self.paginator
284        paginated_apps = paginator.paginate_queryset(queryset, request)
285
286        if "for_user" in request.query_params:
287            try:
288                for_user: int = int(request.query_params.get("for_user", 0))
289                for_user = (
290                    get_objects_for_user(request.user, "authentik_core.view_user_applications")
291                    .filter(pk=for_user)
292                    .first()
293                )
294                if not for_user:
295                    raise ValidationError({"for_user": "User not found"})
296            except ValueError as exc:
297                raise ValidationError from exc
298            allowed_applications = self._get_allowed_applications(paginated_apps, user=for_user)
299            allowed_applications = self._expand_applications(allowed_applications)
300
301            serializer = self.get_serializer(allowed_applications, many=True)
302            return self.get_paginated_response(serializer.data)
303
304        allowed_applications = []
305        if not should_cache:
306            allowed_applications = self._get_allowed_applications(paginated_apps)
307        if should_cache:
308            allowed_applications = cache.get(
309                user_app_cache_key(self.request.user.pk, paginator.page.number)
310            )
311            if not allowed_applications:
312                LOGGER.debug("Caching allowed application list", page=paginator.page.number)
313                allowed_applications = self._get_allowed_applications(paginated_apps)
314                cache.set(
315                    user_app_cache_key(self.request.user.pk, paginator.page.number),
316                    allowed_applications,
317                    timeout=86400,
318                )
319        allowed_applications = self._expand_applications(allowed_applications)
320
321        if only_with_launch_url == "true":
322            allowed_applications = self._filter_applications_with_launch_url(allowed_applications)
323
324        serializer = self.get_serializer(allowed_applications, many=True)
325        return self.get_paginated_response(serializer.data)

Application Viewset

queryset = <ApplicationQuerySet []>
serializer_class = <class 'ApplicationSerializer'>
search_fields = ['name', 'slug', 'meta_launch_url', 'meta_description', 'meta_publisher', 'group']
filterset_fields = ['name', 'slug', 'meta_launch_url', 'meta_description', 'meta_publisher', 'group']
lookup_field = 'slug'
ordering = ['name']
@extend_schema(parameters=[OpenApiParameter(name='for_user', location=OpenApiParameter.QUERY, type=OpenApiTypes.INT)], responses={200: PolicyTestResultSerializer()})
@action(detail=True, methods=['GET'])
def check_access( self, request: rest_framework.request.Request, slug: str) -> rest_framework.response.Response:
203    @extend_schema(
204        parameters=[
205            OpenApiParameter(
206                name="for_user",
207                location=OpenApiParameter.QUERY,
208                type=OpenApiTypes.INT,
209            )
210        ],
211        responses={
212            200: PolicyTestResultSerializer(),
213        },
214    )
215    @action(detail=True, methods=["GET"])
216    def check_access(self, request: Request, slug: str) -> Response:
217        """Check access to a single application by slug"""
218        # Don't use self.get_object as that checks for view_application permission
219        # which the user might not have, even if they have access
220        application = get_object_or_404(Application, slug=slug)
221        # If the current user is superuser, they can set `for_user`
222        for_user = request.user
223        if request.user.is_superuser and "for_user" in request.query_params:
224            try:
225                for_user = User.objects.filter(pk=request.query_params.get("for_user")).first()
226            except ValueError:
227                raise ValidationError({"for_user": "for_user must be numerical"}) from None
228            if not for_user:
229                raise ValidationError({"for_user": "User not found"})
230        engine = PolicyEngine(application, for_user, request)
231        engine.empty_result = AppAccessWithoutBindings.get()
232        engine.use_cache = False
233        with capture_logs() as logs:
234            engine.build()
235            result = engine.result
236        response = PolicyTestResultSerializer(PolicyResult(False))
237        if result.passing:
238            response = PolicyTestResultSerializer(PolicyResult(True))
239        if request.user.is_superuser:
240            log_messages = []
241            for log in logs:
242                if log.attributes.get("process", "") == "PolicyProcess":
243                    continue
244                log_messages.append(LogEventSerializer(log).data)
245            result.log_messages = log_messages
246            response = PolicyTestResultSerializer(result)
247        return Response(response.data)

Check access to a single application by slug

@extend_schema(parameters=[OpenApiParameter(name='superuser_full_list', location=OpenApiParameter.QUERY, type=OpenApiTypes.BOOL), OpenApiParameter(name='for_user', location=OpenApiParameter.QUERY, type=OpenApiTypes.INT), OpenApiParameter(name='only_with_launch_url', location=OpenApiParameter.QUERY, type=OpenApiTypes.BOOL)])
def list( self, request: rest_framework.request.Request) -> rest_framework.response.Response:
249    @extend_schema(
250        parameters=[
251            OpenApiParameter(
252                name="superuser_full_list",
253                location=OpenApiParameter.QUERY,
254                type=OpenApiTypes.BOOL,
255            ),
256            OpenApiParameter(
257                name="for_user",
258                location=OpenApiParameter.QUERY,
259                type=OpenApiTypes.INT,
260            ),
261            OpenApiParameter(
262                name="only_with_launch_url",
263                location=OpenApiParameter.QUERY,
264                type=OpenApiTypes.BOOL,
265            ),
266        ]
267    )
268    def list(self, request: Request) -> Response:
269        """Custom list method that checks Policy based access instead of guardian"""
270        should_cache = request.query_params.get("search", "") == ""
271
272        superuser_full_list = (
273            str(request.query_params.get("superuser_full_list", "false")).lower() == "true"
274        )
275        if superuser_full_list and request.user.is_superuser:
276            return super().list(request)
277
278        only_with_launch_url = str(
279            request.query_params.get("only_with_launch_url", "false")
280        ).lower()
281
282        queryset = self._filter_queryset_for_list(self.get_queryset())
283        paginator: Pagination = self.paginator
284        paginated_apps = paginator.paginate_queryset(queryset, request)
285
286        if "for_user" in request.query_params:
287            try:
288                for_user: int = int(request.query_params.get("for_user", 0))
289                for_user = (
290                    get_objects_for_user(request.user, "authentik_core.view_user_applications")
291                    .filter(pk=for_user)
292                    .first()
293                )
294                if not for_user:
295                    raise ValidationError({"for_user": "User not found"})
296            except ValueError as exc:
297                raise ValidationError from exc
298            allowed_applications = self._get_allowed_applications(paginated_apps, user=for_user)
299            allowed_applications = self._expand_applications(allowed_applications)
300
301            serializer = self.get_serializer(allowed_applications, many=True)
302            return self.get_paginated_response(serializer.data)
303
304        allowed_applications = []
305        if not should_cache:
306            allowed_applications = self._get_allowed_applications(paginated_apps)
307        if should_cache:
308            allowed_applications = cache.get(
309                user_app_cache_key(self.request.user.pk, paginator.page.number)
310            )
311            if not allowed_applications:
312                LOGGER.debug("Caching allowed application list", page=paginator.page.number)
313                allowed_applications = self._get_allowed_applications(paginated_apps)
314                cache.set(
315                    user_app_cache_key(self.request.user.pk, paginator.page.number),
316                    allowed_applications,
317                    timeout=86400,
318                )
319        allowed_applications = self._expand_applications(allowed_applications)
320
321        if only_with_launch_url == "true":
322            allowed_applications = self._filter_applications_with_launch_url(allowed_applications)
323
324        serializer = self.get_serializer(allowed_applications, many=True)
325        return self.get_paginated_response(serializer.data)

Custom list method that checks Policy based access instead of guardian

name = None
description = None
suffix = None
detail = None
basename = None