authentik.rbac.api.roles
RBAC Roles
1"""RBAC Roles""" 2 3from django.contrib.auth.models import Permission 4from django.http import Http404 5from django_filters.filters import AllValuesMultipleFilter, BooleanFilter, CharFilter, NumberFilter 6from django_filters.filterset import FilterSet 7from drf_spectacular.types import OpenApiTypes 8from drf_spectacular.utils import OpenApiResponse, extend_schema, extend_schema_field 9from guardian.shortcuts import get_objects_for_user 10from rest_framework.decorators import action 11from rest_framework.fields import ( 12 ChoiceField, 13 IntegerField, 14 ListField, 15) 16from rest_framework.permissions import IsAuthenticated 17from rest_framework.request import Request 18from rest_framework.response import Response 19from rest_framework.viewsets import ModelViewSet 20 21from authentik.blueprints.api import ManagedSerializer 22from authentik.blueprints.v1.importer import SERIALIZER_CONTEXT_BLUEPRINT 23from authentik.core.api.used_by import UsedByMixin 24from authentik.core.api.utils import ModelSerializer, PassiveSerializer 25from authentik.core.models import Group, User 26from authentik.rbac.decorators import permission_required 27from authentik.rbac.models import Role, get_permission_choices 28 29 30class RoleSerializer(ManagedSerializer, ModelSerializer): 31 """Role serializer""" 32 33 def __init__(self, *args, **kwargs): 34 super().__init__(*args, **kwargs) 35 if SERIALIZER_CONTEXT_BLUEPRINT in self.context: 36 self.fields["permissions"] = ListField( 37 required=False, child=ChoiceField(choices=get_permission_choices()) 38 ) 39 40 def create(self, validated_data: dict) -> Role: 41 perms_qs = Permission.objects.filter( 42 codename__in=[x.split(".")[1] for x in validated_data.pop("permissions", [])] 43 ).values_list("content_type__app_label", "codename") 44 perms_list = [f"{ct}.{name}" for ct, name in list(perms_qs)] 45 46 instance: Role = super().create(validated_data) 47 instance.assign_perms(perms_list) 48 49 return instance 50 51 def update(self, instance: Role, validated_data: dict) -> Role: 52 perms_qs = Permission.objects.filter( 53 codename__in=[x.split(".")[1] for x in validated_data.pop("permissions", [])] 54 ).values_list("content_type__app_label", "codename") 55 perms_list = [f"{ct}.{name}" for ct, name in list(perms_qs)] 56 57 instance: Role = super().update(instance, validated_data) 58 instance.assign_perms(perms_list) 59 60 return instance 61 62 class Meta: 63 model = Role 64 fields = ["pk", "name"] 65 66 67class RoleFilterSet(FilterSet): 68 """Filter for Role""" 69 70 managed = extend_schema_field(OpenApiTypes.STR)(AllValuesMultipleFilter(field_name="managed")) 71 72 managed__isnull = BooleanFilter(field_name="managed", lookup_expr="isnull") 73 74 inherited = BooleanFilter( 75 method="filter_inherited", 76 label="Include inherited roles (requires users or groups filter)", 77 ) 78 79 users = extend_schema_field(OpenApiTypes.INT)( 80 NumberFilter( 81 method="filter_users", 82 label="Filter by user (use with inherited=true for all roles)", 83 ) 84 ) 85 86 groups = extend_schema_field(OpenApiTypes.UUID)( 87 CharFilter( 88 method="filter_groups", 89 label="Filter by group (use with inherited=true for all roles)", 90 ) 91 ) 92 93 def filter_inherited(self, queryset, name, value): 94 """This filter is handled by filter_users and filter_groups""" 95 return queryset 96 97 def filter_users(self, queryset, name, value): 98 """Filter roles by user, optionally including inherited roles""" 99 user = User.objects.filter(pk=value).first() 100 if not user: 101 return queryset.none() 102 103 include_inherited = self.data.get("inherited", "").lower() == "true" 104 if include_inherited: 105 return user.all_roles() 106 return queryset.filter(users=user) 107 108 def filter_groups(self, queryset, name, value): 109 """Filter roles by group, optionally including inherited roles""" 110 group = Group.objects.filter(pk=value).first() 111 if not group: 112 return queryset.none() 113 114 include_inherited = self.data.get("inherited", "").lower() == "true" 115 if include_inherited: 116 return group.all_roles() 117 return queryset.filter(groups=group) 118 119 class Meta: 120 model = Role 121 fields = [ 122 "name", 123 "managed", 124 ] 125 126 127class RoleViewSet(UsedByMixin, ModelViewSet): 128 """Role viewset""" 129 130 serializer_class = RoleSerializer 131 queryset = Role.objects.all() 132 search_fields = ["name"] 133 ordering = ["name"] 134 filterset_class = RoleFilterSet 135 136 class UserAccountSerializerForRole(PassiveSerializer): 137 """Account adding/removing operations""" 138 139 pk = IntegerField(required=True) 140 141 @permission_required("authentik_rbac.change_role") 142 @extend_schema( 143 request=UserAccountSerializerForRole, 144 responses={ 145 204: OpenApiResponse(description="User added"), 146 404: OpenApiResponse(description="User not found"), 147 }, 148 ) 149 @action( 150 detail=True, 151 methods=["POST"], 152 pagination_class=None, 153 filter_backends=[], 154 permission_classes=[IsAuthenticated], 155 ) 156 def add_user(self, request: Request, pk: str) -> Response: 157 """Add user to role""" 158 role: Role = self.get_object() 159 user: User = ( 160 get_objects_for_user(request.user, "authentik_core.view_user") 161 .filter( 162 pk=request.data.get("pk"), 163 ) 164 .first() 165 ) 166 if not user: 167 raise Http404 168 role.users.add(user) 169 return Response(status=204) 170 171 @permission_required("authentik_rbac.change_role") 172 @extend_schema( 173 request=UserAccountSerializerForRole, 174 responses={ 175 204: OpenApiResponse(description="User removed"), 176 404: OpenApiResponse(description="User not found"), 177 }, 178 ) 179 @action( 180 detail=True, 181 methods=["POST"], 182 pagination_class=None, 183 filter_backends=[], 184 permission_classes=[IsAuthenticated], 185 ) 186 def remove_user(self, request: Request, pk: str) -> Response: 187 """Remove user from role""" 188 role: Role = self.get_object() 189 user: User = ( 190 get_objects_for_user(request.user, "authentik_core.view_user") 191 .filter( 192 pk=request.data.get("pk"), 193 ) 194 .first() 195 ) 196 if not user: 197 raise Http404 198 role.users.remove(user) 199 return Response(status=204)
31class RoleSerializer(ManagedSerializer, ModelSerializer): 32 """Role serializer""" 33 34 def __init__(self, *args, **kwargs): 35 super().__init__(*args, **kwargs) 36 if SERIALIZER_CONTEXT_BLUEPRINT in self.context: 37 self.fields["permissions"] = ListField( 38 required=False, child=ChoiceField(choices=get_permission_choices()) 39 ) 40 41 def create(self, validated_data: dict) -> Role: 42 perms_qs = Permission.objects.filter( 43 codename__in=[x.split(".")[1] for x in validated_data.pop("permissions", [])] 44 ).values_list("content_type__app_label", "codename") 45 perms_list = [f"{ct}.{name}" for ct, name in list(perms_qs)] 46 47 instance: Role = super().create(validated_data) 48 instance.assign_perms(perms_list) 49 50 return instance 51 52 def update(self, instance: Role, validated_data: dict) -> Role: 53 perms_qs = Permission.objects.filter( 54 codename__in=[x.split(".")[1] for x in validated_data.pop("permissions", [])] 55 ).values_list("content_type__app_label", "codename") 56 perms_list = [f"{ct}.{name}" for ct, name in list(perms_qs)] 57 58 instance: Role = super().update(instance, validated_data) 59 instance.assign_perms(perms_list) 60 61 return instance 62 63 class Meta: 64 model = Role 65 fields = ["pk", "name"]
Role serializer
41 def create(self, validated_data: dict) -> Role: 42 perms_qs = Permission.objects.filter( 43 codename__in=[x.split(".")[1] for x in validated_data.pop("permissions", [])] 44 ).values_list("content_type__app_label", "codename") 45 perms_list = [f"{ct}.{name}" for ct, name in list(perms_qs)] 46 47 instance: Role = super().create(validated_data) 48 instance.assign_perms(perms_list) 49 50 return instance
We have a bit of extra checking around this in order to provide descriptive messages when something goes wrong, but this method is essentially just:
return ExampleModel.objects.create(**validated_data)
If there are many to many fields present on the instance then they cannot be set until the model is instantiated, in which case the implementation is like so:
example_relationship = validated_data.pop('example_relationship')
instance = ExampleModel.objects.create(**validated_data)
instance.example_relationship = example_relationship
return instance
The default implementation also does not handle nested relationships.
If you want to support writable nested relationships you'll need
to write an explicit .create() method.
52 def update(self, instance: Role, validated_data: dict) -> Role: 53 perms_qs = Permission.objects.filter( 54 codename__in=[x.split(".")[1] for x in validated_data.pop("permissions", [])] 55 ).values_list("content_type__app_label", "codename") 56 perms_list = [f"{ct}.{name}" for ct, name in list(perms_qs)] 57 58 instance: Role = super().update(instance, validated_data) 59 instance.assign_perms(perms_list) 60 61 return instance
68class RoleFilterSet(FilterSet): 69 """Filter for Role""" 70 71 managed = extend_schema_field(OpenApiTypes.STR)(AllValuesMultipleFilter(field_name="managed")) 72 73 managed__isnull = BooleanFilter(field_name="managed", lookup_expr="isnull") 74 75 inherited = BooleanFilter( 76 method="filter_inherited", 77 label="Include inherited roles (requires users or groups filter)", 78 ) 79 80 users = extend_schema_field(OpenApiTypes.INT)( 81 NumberFilter( 82 method="filter_users", 83 label="Filter by user (use with inherited=true for all roles)", 84 ) 85 ) 86 87 groups = extend_schema_field(OpenApiTypes.UUID)( 88 CharFilter( 89 method="filter_groups", 90 label="Filter by group (use with inherited=true for all roles)", 91 ) 92 ) 93 94 def filter_inherited(self, queryset, name, value): 95 """This filter is handled by filter_users and filter_groups""" 96 return queryset 97 98 def filter_users(self, queryset, name, value): 99 """Filter roles by user, optionally including inherited roles""" 100 user = User.objects.filter(pk=value).first() 101 if not user: 102 return queryset.none() 103 104 include_inherited = self.data.get("inherited", "").lower() == "true" 105 if include_inherited: 106 return user.all_roles() 107 return queryset.filter(users=user) 108 109 def filter_groups(self, queryset, name, value): 110 """Filter roles by group, optionally including inherited roles""" 111 group = Group.objects.filter(pk=value).first() 112 if not group: 113 return queryset.none() 114 115 include_inherited = self.data.get("inherited", "").lower() == "true" 116 if include_inherited: 117 return group.all_roles() 118 return queryset.filter(groups=group) 119 120 class Meta: 121 model = Role 122 fields = [ 123 "name", 124 "managed", 125 ]
Filter for Role
94 def filter_inherited(self, queryset, name, value): 95 """This filter is handled by filter_users and filter_groups""" 96 return queryset
This filter is handled by filter_users and filter_groups
98 def filter_users(self, queryset, name, value): 99 """Filter roles by user, optionally including inherited roles""" 100 user = User.objects.filter(pk=value).first() 101 if not user: 102 return queryset.none() 103 104 include_inherited = self.data.get("inherited", "").lower() == "true" 105 if include_inherited: 106 return user.all_roles() 107 return queryset.filter(users=user)
Filter roles by user, optionally including inherited roles
109 def filter_groups(self, queryset, name, value): 110 """Filter roles by group, optionally including inherited roles""" 111 group = Group.objects.filter(pk=value).first() 112 if not group: 113 return queryset.none() 114 115 include_inherited = self.data.get("inherited", "").lower() == "true" 116 if include_inherited: 117 return group.all_roles() 118 return queryset.filter(groups=group)
Filter roles by group, optionally including inherited roles
128class RoleViewSet(UsedByMixin, ModelViewSet): 129 """Role viewset""" 130 131 serializer_class = RoleSerializer 132 queryset = Role.objects.all() 133 search_fields = ["name"] 134 ordering = ["name"] 135 filterset_class = RoleFilterSet 136 137 class UserAccountSerializerForRole(PassiveSerializer): 138 """Account adding/removing operations""" 139 140 pk = IntegerField(required=True) 141 142 @permission_required("authentik_rbac.change_role") 143 @extend_schema( 144 request=UserAccountSerializerForRole, 145 responses={ 146 204: OpenApiResponse(description="User added"), 147 404: OpenApiResponse(description="User not found"), 148 }, 149 ) 150 @action( 151 detail=True, 152 methods=["POST"], 153 pagination_class=None, 154 filter_backends=[], 155 permission_classes=[IsAuthenticated], 156 ) 157 def add_user(self, request: Request, pk: str) -> Response: 158 """Add user to role""" 159 role: Role = self.get_object() 160 user: User = ( 161 get_objects_for_user(request.user, "authentik_core.view_user") 162 .filter( 163 pk=request.data.get("pk"), 164 ) 165 .first() 166 ) 167 if not user: 168 raise Http404 169 role.users.add(user) 170 return Response(status=204) 171 172 @permission_required("authentik_rbac.change_role") 173 @extend_schema( 174 request=UserAccountSerializerForRole, 175 responses={ 176 204: OpenApiResponse(description="User removed"), 177 404: OpenApiResponse(description="User not found"), 178 }, 179 ) 180 @action( 181 detail=True, 182 methods=["POST"], 183 pagination_class=None, 184 filter_backends=[], 185 permission_classes=[IsAuthenticated], 186 ) 187 def remove_user(self, request: Request, pk: str) -> Response: 188 """Remove user from role""" 189 role: Role = self.get_object() 190 user: User = ( 191 get_objects_for_user(request.user, "authentik_core.view_user") 192 .filter( 193 pk=request.data.get("pk"), 194 ) 195 .first() 196 ) 197 if not user: 198 raise Http404 199 role.users.remove(user) 200 return Response(status=204)
Role viewset
142 @permission_required("authentik_rbac.change_role") 143 @extend_schema( 144 request=UserAccountSerializerForRole, 145 responses={ 146 204: OpenApiResponse(description="User added"), 147 404: OpenApiResponse(description="User not found"), 148 }, 149 ) 150 @action( 151 detail=True, 152 methods=["POST"], 153 pagination_class=None, 154 filter_backends=[], 155 permission_classes=[IsAuthenticated], 156 ) 157 def add_user(self, request: Request, pk: str) -> Response: 158 """Add user to role""" 159 role: Role = self.get_object() 160 user: User = ( 161 get_objects_for_user(request.user, "authentik_core.view_user") 162 .filter( 163 pk=request.data.get("pk"), 164 ) 165 .first() 166 ) 167 if not user: 168 raise Http404 169 role.users.add(user) 170 return Response(status=204)
Add user to role
172 @permission_required("authentik_rbac.change_role") 173 @extend_schema( 174 request=UserAccountSerializerForRole, 175 responses={ 176 204: OpenApiResponse(description="User removed"), 177 404: OpenApiResponse(description="User not found"), 178 }, 179 ) 180 @action( 181 detail=True, 182 methods=["POST"], 183 pagination_class=None, 184 filter_backends=[], 185 permission_classes=[IsAuthenticated], 186 ) 187 def remove_user(self, request: Request, pk: str) -> Response: 188 """Remove user from role""" 189 role: Role = self.get_object() 190 user: User = ( 191 get_objects_for_user(request.user, "authentik_core.view_user") 192 .filter( 193 pk=request.data.get("pk"), 194 ) 195 .first() 196 ) 197 if not user: 198 raise Http404 199 role.users.remove(user) 200 return Response(status=204)
Remove user from role
Inherited Members
137 class UserAccountSerializerForRole(PassiveSerializer): 138 """Account adding/removing operations""" 139 140 pk = IntegerField(required=True)
Account adding/removing operations