authentik.rbac.api.roles

RBAC Roles

  1"""RBAC Roles"""
  2
  3from django.contrib.auth.models import Permission
  4from django.http import Http404
  5from django_filters.filters import AllValuesMultipleFilter, BooleanFilter, CharFilter, NumberFilter
  6from django_filters.filterset import FilterSet
  7from drf_spectacular.types import OpenApiTypes
  8from drf_spectacular.utils import OpenApiResponse, extend_schema, extend_schema_field
  9from guardian.shortcuts import get_objects_for_user
 10from rest_framework.decorators import action
 11from rest_framework.fields import (
 12    ChoiceField,
 13    IntegerField,
 14    ListField,
 15)
 16from rest_framework.permissions import IsAuthenticated
 17from rest_framework.request import Request
 18from rest_framework.response import Response
 19from rest_framework.viewsets import ModelViewSet
 20
 21from authentik.blueprints.api import ManagedSerializer
 22from authentik.blueprints.v1.importer import SERIALIZER_CONTEXT_BLUEPRINT
 23from authentik.core.api.used_by import UsedByMixin
 24from authentik.core.api.utils import ModelSerializer, PassiveSerializer
 25from authentik.core.models import Group, User
 26from authentik.rbac.decorators import permission_required
 27from authentik.rbac.models import Role, get_permission_choices
 28
 29
 30class RoleSerializer(ManagedSerializer, ModelSerializer):
 31    """Role serializer"""
 32
 33    def __init__(self, *args, **kwargs):
 34        super().__init__(*args, **kwargs)
 35        if SERIALIZER_CONTEXT_BLUEPRINT in self.context:
 36            self.fields["permissions"] = ListField(
 37                required=False, child=ChoiceField(choices=get_permission_choices())
 38            )
 39
 40    def create(self, validated_data: dict) -> Role:
 41        perms_qs = Permission.objects.filter(
 42            codename__in=[x.split(".")[1] for x in validated_data.pop("permissions", [])]
 43        ).values_list("content_type__app_label", "codename")
 44        perms_list = [f"{ct}.{name}" for ct, name in list(perms_qs)]
 45
 46        instance: Role = super().create(validated_data)
 47        instance.assign_perms(perms_list)
 48
 49        return instance
 50
 51    def update(self, instance: Role, validated_data: dict) -> Role:
 52        perms_qs = Permission.objects.filter(
 53            codename__in=[x.split(".")[1] for x in validated_data.pop("permissions", [])]
 54        ).values_list("content_type__app_label", "codename")
 55        perms_list = [f"{ct}.{name}" for ct, name in list(perms_qs)]
 56
 57        instance: Role = super().update(instance, validated_data)
 58        instance.assign_perms(perms_list)
 59
 60        return instance
 61
 62    class Meta:
 63        model = Role
 64        fields = ["pk", "name"]
 65
 66
 67class RoleFilterSet(FilterSet):
 68    """Filter for Role"""
 69
 70    managed = extend_schema_field(OpenApiTypes.STR)(AllValuesMultipleFilter(field_name="managed"))
 71
 72    managed__isnull = BooleanFilter(field_name="managed", lookup_expr="isnull")
 73
 74    inherited = BooleanFilter(
 75        method="filter_inherited",
 76        label="Include inherited roles (requires users or groups filter)",
 77    )
 78
 79    users = extend_schema_field(OpenApiTypes.INT)(
 80        NumberFilter(
 81            method="filter_users",
 82            label="Filter by user (use with inherited=true for all roles)",
 83        )
 84    )
 85
 86    groups = extend_schema_field(OpenApiTypes.UUID)(
 87        CharFilter(
 88            method="filter_groups",
 89            label="Filter by group (use with inherited=true for all roles)",
 90        )
 91    )
 92
 93    def filter_inherited(self, queryset, name, value):
 94        """This filter is handled by filter_users and filter_groups"""
 95        return queryset
 96
 97    def filter_users(self, queryset, name, value):
 98        """Filter roles by user, optionally including inherited roles"""
 99        user = User.objects.filter(pk=value).first()
100        if not user:
101            return queryset.none()
102
103        include_inherited = self.data.get("inherited", "").lower() == "true"
104        if include_inherited:
105            return user.all_roles()
106        return queryset.filter(users=user)
107
108    def filter_groups(self, queryset, name, value):
109        """Filter roles by group, optionally including inherited roles"""
110        group = Group.objects.filter(pk=value).first()
111        if not group:
112            return queryset.none()
113
114        include_inherited = self.data.get("inherited", "").lower() == "true"
115        if include_inherited:
116            return group.all_roles()
117        return queryset.filter(groups=group)
118
119    class Meta:
120        model = Role
121        fields = [
122            "name",
123            "managed",
124        ]
125
126
127class RoleViewSet(UsedByMixin, ModelViewSet):
128    """Role viewset"""
129
130    serializer_class = RoleSerializer
131    queryset = Role.objects.all()
132    search_fields = ["name"]
133    ordering = ["name"]
134    filterset_class = RoleFilterSet
135
136    class UserAccountSerializerForRole(PassiveSerializer):
137        """Account adding/removing operations"""
138
139        pk = IntegerField(required=True)
140
141    @permission_required("authentik_rbac.change_role")
142    @extend_schema(
143        request=UserAccountSerializerForRole,
144        responses={
145            204: OpenApiResponse(description="User added"),
146            404: OpenApiResponse(description="User not found"),
147        },
148    )
149    @action(
150        detail=True,
151        methods=["POST"],
152        pagination_class=None,
153        filter_backends=[],
154        permission_classes=[IsAuthenticated],
155    )
156    def add_user(self, request: Request, pk: str) -> Response:
157        """Add user to role"""
158        role: Role = self.get_object()
159        user: User = (
160            get_objects_for_user(request.user, "authentik_core.view_user")
161            .filter(
162                pk=request.data.get("pk"),
163            )
164            .first()
165        )
166        if not user:
167            raise Http404
168        role.users.add(user)
169        return Response(status=204)
170
171    @permission_required("authentik_rbac.change_role")
172    @extend_schema(
173        request=UserAccountSerializerForRole,
174        responses={
175            204: OpenApiResponse(description="User removed"),
176            404: OpenApiResponse(description="User not found"),
177        },
178    )
179    @action(
180        detail=True,
181        methods=["POST"],
182        pagination_class=None,
183        filter_backends=[],
184        permission_classes=[IsAuthenticated],
185    )
186    def remove_user(self, request: Request, pk: str) -> Response:
187        """Remove user from role"""
188        role: Role = self.get_object()
189        user: User = (
190            get_objects_for_user(request.user, "authentik_core.view_user")
191            .filter(
192                pk=request.data.get("pk"),
193            )
194            .first()
195        )
196        if not user:
197            raise Http404
198        role.users.remove(user)
199        return Response(status=204)
31class RoleSerializer(ManagedSerializer, ModelSerializer):
32    """Role serializer"""
33
34    def __init__(self, *args, **kwargs):
35        super().__init__(*args, **kwargs)
36        if SERIALIZER_CONTEXT_BLUEPRINT in self.context:
37            self.fields["permissions"] = ListField(
38                required=False, child=ChoiceField(choices=get_permission_choices())
39            )
40
41    def create(self, validated_data: dict) -> Role:
42        perms_qs = Permission.objects.filter(
43            codename__in=[x.split(".")[1] for x in validated_data.pop("permissions", [])]
44        ).values_list("content_type__app_label", "codename")
45        perms_list = [f"{ct}.{name}" for ct, name in list(perms_qs)]
46
47        instance: Role = super().create(validated_data)
48        instance.assign_perms(perms_list)
49
50        return instance
51
52    def update(self, instance: Role, validated_data: dict) -> Role:
53        perms_qs = Permission.objects.filter(
54            codename__in=[x.split(".")[1] for x in validated_data.pop("permissions", [])]
55        ).values_list("content_type__app_label", "codename")
56        perms_list = [f"{ct}.{name}" for ct, name in list(perms_qs)]
57
58        instance: Role = super().update(instance, validated_data)
59        instance.assign_perms(perms_list)
60
61        return instance
62
63    class Meta:
64        model = Role
65        fields = ["pk", "name"]

Role serializer

RoleSerializer(*args, **kwargs)
34    def __init__(self, *args, **kwargs):
35        super().__init__(*args, **kwargs)
36        if SERIALIZER_CONTEXT_BLUEPRINT in self.context:
37            self.fields["permissions"] = ListField(
38                required=False, child=ChoiceField(choices=get_permission_choices())
39            )
def create(self, validated_data: dict) -> authentik.rbac.models.Role:
41    def create(self, validated_data: dict) -> Role:
42        perms_qs = Permission.objects.filter(
43            codename__in=[x.split(".")[1] for x in validated_data.pop("permissions", [])]
44        ).values_list("content_type__app_label", "codename")
45        perms_list = [f"{ct}.{name}" for ct, name in list(perms_qs)]
46
47        instance: Role = super().create(validated_data)
48        instance.assign_perms(perms_list)
49
50        return instance

We have a bit of extra checking around this in order to provide descriptive messages when something goes wrong, but this method is essentially just:

return ExampleModel.objects.create(**validated_data)

If there are many to many fields present on the instance then they cannot be set until the model is instantiated, in which case the implementation is like so:

example_relationship = validated_data.pop('example_relationship')
instance = ExampleModel.objects.create(**validated_data)
instance.example_relationship = example_relationship
return instance

The default implementation also does not handle nested relationships. If you want to support writable nested relationships you'll need to write an explicit .create() method.

def update( self, instance: authentik.rbac.models.Role, validated_data: dict) -> authentik.rbac.models.Role:
52    def update(self, instance: Role, validated_data: dict) -> Role:
53        perms_qs = Permission.objects.filter(
54            codename__in=[x.split(".")[1] for x in validated_data.pop("permissions", [])]
55        ).values_list("content_type__app_label", "codename")
56        perms_list = [f"{ct}.{name}" for ct, name in list(perms_qs)]
57
58        instance: Role = super().update(instance, validated_data)
59        instance.assign_perms(perms_list)
60
61        return instance
class RoleSerializer.Meta:
63    class Meta:
64        model = Role
65        fields = ["pk", "name"]
model = <class 'authentik.rbac.models.Role'>
fields = ['pk', 'name']
class RoleFilterSet(django_filters.filterset.FilterSet):
 68class RoleFilterSet(FilterSet):
 69    """Filter for Role"""
 70
 71    managed = extend_schema_field(OpenApiTypes.STR)(AllValuesMultipleFilter(field_name="managed"))
 72
 73    managed__isnull = BooleanFilter(field_name="managed", lookup_expr="isnull")
 74
 75    inherited = BooleanFilter(
 76        method="filter_inherited",
 77        label="Include inherited roles (requires users or groups filter)",
 78    )
 79
 80    users = extend_schema_field(OpenApiTypes.INT)(
 81        NumberFilter(
 82            method="filter_users",
 83            label="Filter by user (use with inherited=true for all roles)",
 84        )
 85    )
 86
 87    groups = extend_schema_field(OpenApiTypes.UUID)(
 88        CharFilter(
 89            method="filter_groups",
 90            label="Filter by group (use with inherited=true for all roles)",
 91        )
 92    )
 93
 94    def filter_inherited(self, queryset, name, value):
 95        """This filter is handled by filter_users and filter_groups"""
 96        return queryset
 97
 98    def filter_users(self, queryset, name, value):
 99        """Filter roles by user, optionally including inherited roles"""
100        user = User.objects.filter(pk=value).first()
101        if not user:
102            return queryset.none()
103
104        include_inherited = self.data.get("inherited", "").lower() == "true"
105        if include_inherited:
106            return user.all_roles()
107        return queryset.filter(users=user)
108
109    def filter_groups(self, queryset, name, value):
110        """Filter roles by group, optionally including inherited roles"""
111        group = Group.objects.filter(pk=value).first()
112        if not group:
113            return queryset.none()
114
115        include_inherited = self.data.get("inherited", "").lower() == "true"
116        if include_inherited:
117            return group.all_roles()
118        return queryset.filter(groups=group)
119
120    class Meta:
121        model = Role
122        fields = [
123            "name",
124            "managed",
125        ]

Filter for Role

managed
managed__isnull
inherited
users
groups
def filter_inherited(self, queryset, name, value):
94    def filter_inherited(self, queryset, name, value):
95        """This filter is handled by filter_users and filter_groups"""
96        return queryset

This filter is handled by filter_users and filter_groups

def filter_users(self, queryset, name, value):
 98    def filter_users(self, queryset, name, value):
 99        """Filter roles by user, optionally including inherited roles"""
100        user = User.objects.filter(pk=value).first()
101        if not user:
102            return queryset.none()
103
104        include_inherited = self.data.get("inherited", "").lower() == "true"
105        if include_inherited:
106            return user.all_roles()
107        return queryset.filter(users=user)

Filter roles by user, optionally including inherited roles

def filter_groups(self, queryset, name, value):
109    def filter_groups(self, queryset, name, value):
110        """Filter roles by group, optionally including inherited roles"""
111        group = Group.objects.filter(pk=value).first()
112        if not group:
113            return queryset.none()
114
115        include_inherited = self.data.get("inherited", "").lower() == "true"
116        if include_inherited:
117            return group.all_roles()
118        return queryset.filter(groups=group)

Filter roles by group, optionally including inherited roles

declared_filters = OrderedDict({'managed': <django_filters.filters.AllValuesMultipleFilter object>, 'managed__isnull': <django_filters.filters.BooleanFilter object>, 'inherited': <django_filters.filters.BooleanFilter object>, 'users': <django_filters.filters.NumberFilter object>, 'groups': <django_filters.filters.CharFilter object>})
base_filters = OrderedDict({'name': <django_filters.filters.CharFilter object>, 'managed': <django_filters.filters.AllValuesMultipleFilter object>, 'managed__isnull': <django_filters.filters.BooleanFilter object>, 'inherited': <django_filters.filters.BooleanFilter object>, 'users': <django_filters.filters.NumberFilter object>, 'groups': <django_filters.filters.CharFilter object>})
class RoleFilterSet.Meta:
120    class Meta:
121        model = Role
122        fields = [
123            "name",
124            "managed",
125        ]
model = <class 'authentik.rbac.models.Role'>
fields = ['name', 'managed']
class RoleViewSet(authentik.core.api.used_by.UsedByMixin, rest_framework.viewsets.ModelViewSet):
128class RoleViewSet(UsedByMixin, ModelViewSet):
129    """Role viewset"""
130
131    serializer_class = RoleSerializer
132    queryset = Role.objects.all()
133    search_fields = ["name"]
134    ordering = ["name"]
135    filterset_class = RoleFilterSet
136
137    class UserAccountSerializerForRole(PassiveSerializer):
138        """Account adding/removing operations"""
139
140        pk = IntegerField(required=True)
141
142    @permission_required("authentik_rbac.change_role")
143    @extend_schema(
144        request=UserAccountSerializerForRole,
145        responses={
146            204: OpenApiResponse(description="User added"),
147            404: OpenApiResponse(description="User not found"),
148        },
149    )
150    @action(
151        detail=True,
152        methods=["POST"],
153        pagination_class=None,
154        filter_backends=[],
155        permission_classes=[IsAuthenticated],
156    )
157    def add_user(self, request: Request, pk: str) -> Response:
158        """Add user to role"""
159        role: Role = self.get_object()
160        user: User = (
161            get_objects_for_user(request.user, "authentik_core.view_user")
162            .filter(
163                pk=request.data.get("pk"),
164            )
165            .first()
166        )
167        if not user:
168            raise Http404
169        role.users.add(user)
170        return Response(status=204)
171
172    @permission_required("authentik_rbac.change_role")
173    @extend_schema(
174        request=UserAccountSerializerForRole,
175        responses={
176            204: OpenApiResponse(description="User removed"),
177            404: OpenApiResponse(description="User not found"),
178        },
179    )
180    @action(
181        detail=True,
182        methods=["POST"],
183        pagination_class=None,
184        filter_backends=[],
185        permission_classes=[IsAuthenticated],
186    )
187    def remove_user(self, request: Request, pk: str) -> Response:
188        """Remove user from role"""
189        role: Role = self.get_object()
190        user: User = (
191            get_objects_for_user(request.user, "authentik_core.view_user")
192            .filter(
193                pk=request.data.get("pk"),
194            )
195            .first()
196        )
197        if not user:
198            raise Http404
199        role.users.remove(user)
200        return Response(status=204)

Role viewset

serializer_class = <class 'RoleSerializer'>
queryset = <QuerySet []>
search_fields = ['name']
ordering = ['name']
filterset_class = <class 'RoleFilterSet'>
@permission_required('authentik_rbac.change_role')
@extend_schema(request=UserAccountSerializerForRole, responses={204: OpenApiResponse(description='User added'), 404: OpenApiResponse(description='User not found')})
@action(detail=True, methods=['POST'], pagination_class=None, filter_backends=[], permission_classes=[IsAuthenticated])
def add_user( self, request: rest_framework.request.Request, pk: str) -> rest_framework.response.Response:
142    @permission_required("authentik_rbac.change_role")
143    @extend_schema(
144        request=UserAccountSerializerForRole,
145        responses={
146            204: OpenApiResponse(description="User added"),
147            404: OpenApiResponse(description="User not found"),
148        },
149    )
150    @action(
151        detail=True,
152        methods=["POST"],
153        pagination_class=None,
154        filter_backends=[],
155        permission_classes=[IsAuthenticated],
156    )
157    def add_user(self, request: Request, pk: str) -> Response:
158        """Add user to role"""
159        role: Role = self.get_object()
160        user: User = (
161            get_objects_for_user(request.user, "authentik_core.view_user")
162            .filter(
163                pk=request.data.get("pk"),
164            )
165            .first()
166        )
167        if not user:
168            raise Http404
169        role.users.add(user)
170        return Response(status=204)

Add user to role

@permission_required('authentik_rbac.change_role')
@extend_schema(request=UserAccountSerializerForRole, responses={204: OpenApiResponse(description='User removed'), 404: OpenApiResponse(description='User not found')})
@action(detail=True, methods=['POST'], pagination_class=None, filter_backends=[], permission_classes=[IsAuthenticated])
def remove_user( self, request: rest_framework.request.Request, pk: str) -> rest_framework.response.Response:
172    @permission_required("authentik_rbac.change_role")
173    @extend_schema(
174        request=UserAccountSerializerForRole,
175        responses={
176            204: OpenApiResponse(description="User removed"),
177            404: OpenApiResponse(description="User not found"),
178        },
179    )
180    @action(
181        detail=True,
182        methods=["POST"],
183        pagination_class=None,
184        filter_backends=[],
185        permission_classes=[IsAuthenticated],
186    )
187    def remove_user(self, request: Request, pk: str) -> Response:
188        """Remove user from role"""
189        role: Role = self.get_object()
190        user: User = (
191            get_objects_for_user(request.user, "authentik_core.view_user")
192            .filter(
193                pk=request.data.get("pk"),
194            )
195            .first()
196        )
197        if not user:
198            raise Http404
199        role.users.remove(user)
200        return Response(status=204)

Remove user from role

name = None
description = None
suffix = None
detail = None
basename = None
class RoleViewSet.UserAccountSerializerForRole(authentik.core.api.utils.PassiveSerializer):
137    class UserAccountSerializerForRole(PassiveSerializer):
138        """Account adding/removing operations"""
139
140        pk = IntegerField(required=True)

Account adding/removing operations

pk