authentik.rbac.decorators

API Decorators

 1"""API Decorators"""
 2
 3from collections.abc import Callable
 4from functools import wraps
 5
 6from rest_framework.request import Request
 7from rest_framework.response import Response
 8from rest_framework.viewsets import ModelViewSet
 9from structlog.stdlib import get_logger
10
11LOGGER = get_logger()
12
13
14def permission_required(obj_perm: str | None = None, global_perms: list[str] | None = None):
15    """Check permissions for a single custom action"""
16
17    def _check_obj_perm(self: ModelViewSet, request: Request):
18        # Check obj_perm both globally and on the specific object
19        # Having the global permission has higher priority
20        if request.user.has_perm(obj_perm):
21            return
22        obj = self.get_object()
23        if not request.user.has_perm(obj_perm, obj):
24            LOGGER.debug("denying access for object", user=request.user, perm=obj_perm, obj=obj)
25            self.permission_denied(request)
26
27    def wrapper_outer(func: Callable):
28        """Check permissions for a single custom action"""
29
30        @wraps(func)
31        def wrapper(self: ModelViewSet, request: Request, *args, **kwargs) -> Response:
32            if obj_perm:
33                _check_obj_perm(self, request)
34            if global_perms:
35                for other_perm in global_perms:
36                    if not request.user.has_perm(other_perm):
37                        LOGGER.debug("denying access for other", user=request.user, perm=other_perm)
38                        return self.permission_denied(request)
39            return func(self, request, *args, **kwargs)
40
41        return wrapper
42
43    return wrapper_outer
LOGGER = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
def permission_required(obj_perm: str | None = None, global_perms: list[str] | None = None):
15def permission_required(obj_perm: str | None = None, global_perms: list[str] | None = None):
16    """Check permissions for a single custom action"""
17
18    def _check_obj_perm(self: ModelViewSet, request: Request):
19        # Check obj_perm both globally and on the specific object
20        # Having the global permission has higher priority
21        if request.user.has_perm(obj_perm):
22            return
23        obj = self.get_object()
24        if not request.user.has_perm(obj_perm, obj):
25            LOGGER.debug("denying access for object", user=request.user, perm=obj_perm, obj=obj)
26            self.permission_denied(request)
27
28    def wrapper_outer(func: Callable):
29        """Check permissions for a single custom action"""
30
31        @wraps(func)
32        def wrapper(self: ModelViewSet, request: Request, *args, **kwargs) -> Response:
33            if obj_perm:
34                _check_obj_perm(self, request)
35            if global_perms:
36                for other_perm in global_perms:
37                    if not request.user.has_perm(other_perm):
38                        LOGGER.debug("denying access for other", user=request.user, perm=other_perm)
39                        return self.permission_denied(request)
40            return func(self, request, *args, **kwargs)
41
42        return wrapper
43
44    return wrapper_outer

Check permissions for a single custom action