authentik.core.api.groups
Groups API Viewset
1"""Groups API Viewset""" 2 3from json import loads 4 5from django.db.models import Prefetch 6from django.http import Http404 7from django.utils.translation import gettext as _ 8from django_filters.filters import CharFilter, ModelMultipleChoiceFilter 9from django_filters.filterset import FilterSet 10from drf_spectacular.utils import ( 11 OpenApiParameter, 12 OpenApiResponse, 13 extend_schema, 14 extend_schema_field, 15) 16from guardian.shortcuts import get_objects_for_user 17from rest_framework.authentication import SessionAuthentication 18from rest_framework.decorators import action 19from rest_framework.fields import CharField, IntegerField, SerializerMethodField 20from rest_framework.permissions import IsAuthenticated 21from rest_framework.relations import PrimaryKeyRelatedField 22from rest_framework.request import Request 23from rest_framework.response import Response 24from rest_framework.serializers import ListSerializer, ValidationError 25from rest_framework.viewsets import ModelViewSet 26 27from authentik.api.authentication import TokenAuthentication 28from authentik.api.validation import validate 29from authentik.core.api.used_by import UsedByMixin 30from authentik.core.api.utils import JSONDictField, ModelSerializer, PassiveSerializer 31from authentik.core.models import Group, User 32from authentik.endpoints.connectors.agent.auth import AgentAuth 33from authentik.rbac.api.roles import RoleSerializer 34from authentik.rbac.decorators import permission_required 35 36PARTIAL_USER_SERIALIZER_MODEL_FIELDS = [ 37 "pk", 38 "username", 39 "name", 40 "is_active", 41 "last_login", 42 "email", 43 "attributes", 44] 45 46 47class PartialUserSerializer(ModelSerializer): 48 """Partial User Serializer, does not include child relations.""" 49 50 attributes = JSONDictField(required=False) 51 uid = CharField(read_only=True) 52 53 class Meta: 54 model = User 55 fields = PARTIAL_USER_SERIALIZER_MODEL_FIELDS + ["uid"] 56 57 58class RelatedGroupSerializer(ModelSerializer): 59 """Stripped down group serializer to show relevant children/parents for groups""" 60 61 attributes = JSONDictField(required=False) 62 63 class Meta: 64 model = Group 65 fields = [ 66 "pk", 67 "name", 68 "is_superuser", 69 "attributes", 70 "group_uuid", 71 ] 72 73 74class GroupSerializer(ModelSerializer): 75 """Group Serializer""" 76 77 attributes = JSONDictField(required=False) 78 parents = PrimaryKeyRelatedField(queryset=Group.objects.all(), many=True, required=False) 79 parents_obj = SerializerMethodField(allow_null=True) 80 children_obj = SerializerMethodField(allow_null=True) 81 users_obj = SerializerMethodField(allow_null=True) 82 roles_obj = ListSerializer( 83 child=RoleSerializer(), 84 read_only=True, 85 source="roles", 86 required=False, 87 ) 88 inherited_roles_obj = SerializerMethodField(allow_null=True) 89 num_pk = IntegerField(read_only=True) 90 91 @property 92 def _should_include_users(self) -> bool: 93 request: Request = self.context.get("request", None) 94 if not request: 95 return True 96 return str(request.query_params.get("include_users", "true")).lower() == "true" 97 98 @property 99 def _should_include_children(self) -> bool: 100 request: Request = self.context.get("request", None) 101 if not request: 102 return True 103 return str(request.query_params.get("include_children", "false")).lower() == "true" 104 105 @property 106 def _should_include_parents(self) -> bool: 107 request: Request = self.context.get("request", None) 108 if not request: 109 return True 110 return str(request.query_params.get("include_parents", "false")).lower() == "true" 111 112 @property 113 def _should_include_inherited_roles(self) -> bool: 114 request: Request = self.context.get("request", None) 115 if not request: 116 return True 117 return str(request.query_params.get("include_inherited_roles", "false")).lower() == "true" 118 119 @extend_schema_field(PartialUserSerializer(many=True)) 120 def get_users_obj(self, instance: Group) -> list[PartialUserSerializer] | None: 121 if not self._should_include_users: 122 return None 123 return PartialUserSerializer(instance.users, many=True).data 124 125 @extend_schema_field(RelatedGroupSerializer(many=True)) 126 def get_children_obj(self, instance: Group) -> list[RelatedGroupSerializer] | None: 127 if not self._should_include_children: 128 return None 129 return RelatedGroupSerializer(instance.children, many=True).data 130 131 @extend_schema_field(RelatedGroupSerializer(many=True)) 132 def get_parents_obj(self, instance: Group) -> list[RelatedGroupSerializer] | None: 133 if not self._should_include_parents: 134 return None 135 return RelatedGroupSerializer(instance.parents, many=True).data 136 137 @extend_schema_field(RoleSerializer(many=True)) 138 def get_inherited_roles_obj(self, instance: Group) -> list | None: 139 """Return only inherited roles from ancestor groups (excludes direct roles)""" 140 if not self._should_include_inherited_roles: 141 return None 142 direct_role_pks = instance.roles.values_list("pk", flat=True) 143 inherited_roles = instance.all_roles().exclude(pk__in=direct_role_pks) 144 return RoleSerializer(inherited_roles, many=True).data 145 146 def validate_is_superuser(self, superuser: bool): 147 """Ensure that the user creating this group has permissions to set the superuser flag""" 148 request: Request = self.context.get("request", None) 149 if not request: 150 return superuser 151 # If we're updating an instance, and the state hasn't changed, we don't need to check perms 152 if self.instance and superuser == self.instance.is_superuser: 153 return superuser 154 user: User = request.user 155 perm = ( 156 "authentik_core.enable_group_superuser" 157 if superuser 158 else "authentik_core.disable_group_superuser" 159 ) 160 if self.instance or superuser: 161 has_perm = user.has_perm(perm) or user.has_perm(perm, self.instance) 162 if not has_perm: 163 raise ValidationError( 164 _( 165 ( 166 "User does not have permission to set " 167 "superuser status to {superuser_status}." 168 ).format_map({"superuser_status": superuser}) 169 ) 170 ) 171 return superuser 172 173 class Meta: 174 model = Group 175 fields = [ 176 "pk", 177 "num_pk", 178 "name", 179 "is_superuser", 180 "parents", 181 "parents_obj", 182 "users", 183 "users_obj", 184 "attributes", 185 "roles", 186 "roles_obj", 187 "inherited_roles_obj", 188 "children", 189 "children_obj", 190 ] 191 extra_kwargs = { 192 "users": { 193 "default": list, 194 }, 195 "children": { 196 "required": False, 197 "default": list, 198 }, 199 "parents": { 200 "required": False, 201 "default": list, 202 }, 203 } 204 205 206class GroupFilter(FilterSet): 207 """Filter for groups""" 208 209 attributes = CharFilter( 210 field_name="attributes", 211 lookup_expr="", 212 label="Attributes", 213 method="filter_attributes", 214 ) 215 216 members_by_username = ModelMultipleChoiceFilter( 217 field_name="users__username", 218 to_field_name="username", 219 queryset=User.objects.all(), 220 ) 221 members_by_pk = ModelMultipleChoiceFilter( 222 field_name="users", 223 queryset=User.objects.all(), 224 ) 225 226 def filter_attributes(self, queryset, name, value): 227 """Filter attributes by query args""" 228 try: 229 value = loads(value) 230 except ValueError: 231 raise ValidationError(detail="filter: failed to parse JSON") from None 232 if not isinstance(value, dict): 233 raise ValidationError(detail="filter: value must be key:value mapping") 234 qs = {} 235 for key, _value in value.items(): 236 qs[f"attributes__{key}"] = _value 237 try: 238 _ = len(queryset.filter(**qs)) 239 return queryset.filter(**qs) 240 except ValueError: 241 return queryset 242 243 class Meta: 244 model = Group 245 fields = ["name", "is_superuser", "members_by_pk", "attributes", "members_by_username"] 246 247 248class GroupViewSet(UsedByMixin, ModelViewSet): 249 """Group Viewset""" 250 251 class UserAccountSerializer(PassiveSerializer): 252 """Account adding/removing operations""" 253 254 pk = IntegerField(required=True) 255 256 queryset = Group.objects.none() 257 serializer_class = GroupSerializer 258 search_fields = ["name", "is_superuser"] 259 filterset_class = GroupFilter 260 ordering = ["name"] 261 authentication_classes = [ 262 TokenAuthentication, 263 SessionAuthentication, 264 AgentAuth, 265 ] 266 267 def get_ql_fields(self): 268 from djangoql.schema import BoolField, StrField 269 270 from authentik.enterprise.search.fields import ( 271 JSONSearchField, 272 ) 273 274 return [ 275 StrField(Group, "name"), 276 BoolField(Group, "is_superuser", nullable=True), 277 JSONSearchField(Group, "attributes"), 278 ] 279 280 def get_queryset(self): 281 base_qs = Group.objects.all().prefetch_related("roles") 282 283 if self.serializer_class(context={"request": self.request})._should_include_users: 284 # Only fetch fields needed by PartialUserSerializer to reduce DB load and instantiation 285 # time 286 base_qs = base_qs.prefetch_related( 287 Prefetch( 288 "users", 289 queryset=User.objects.all().only(*PARTIAL_USER_SERIALIZER_MODEL_FIELDS), 290 ) 291 ) 292 else: 293 base_qs = base_qs.prefetch_related( 294 Prefetch("users", queryset=User.objects.all().only("id")) 295 ) 296 297 if self.serializer_class(context={"request": self.request})._should_include_children: 298 base_qs = base_qs.prefetch_related("children") 299 300 if self.serializer_class(context={"request": self.request})._should_include_parents: 301 base_qs = base_qs.prefetch_related("parents") 302 303 return base_qs 304 305 @extend_schema( 306 parameters=[ 307 OpenApiParameter("include_users", bool, default=True), 308 OpenApiParameter("include_children", bool, default=False), 309 OpenApiParameter("include_parents", bool, default=False), 310 OpenApiParameter("include_inherited_roles", bool, default=False), 311 ] 312 ) 313 def list(self, request, *args, **kwargs): 314 return super().list(request, *args, **kwargs) 315 316 @extend_schema( 317 parameters=[ 318 OpenApiParameter("include_users", bool, default=True), 319 OpenApiParameter("include_children", bool, default=False), 320 OpenApiParameter("include_parents", bool, default=False), 321 OpenApiParameter("include_inherited_roles", bool, default=False), 322 ] 323 ) 324 def retrieve(self, request, *args, **kwargs): 325 return super().retrieve(request, *args, **kwargs) 326 327 @permission_required("authentik_core.add_user_to_group") 328 @extend_schema( 329 request=UserAccountSerializer, 330 responses={ 331 204: OpenApiResponse(description="User added"), 332 404: OpenApiResponse(description="User not found"), 333 }, 334 ) 335 @action( 336 detail=True, 337 methods=["POST"], 338 pagination_class=None, 339 filter_backends=[], 340 permission_classes=[IsAuthenticated], 341 ) 342 @validate(UserAccountSerializer) 343 def add_user(self, request: Request, body: UserAccountSerializer, pk: str) -> Response: 344 """Add user to group""" 345 group: Group = self.get_object() 346 user: User = ( 347 get_objects_for_user(request.user, "authentik_core.view_user") 348 .filter( 349 pk=body.validated_data.get("pk"), 350 ) 351 .first() 352 ) 353 if not user: 354 raise Http404 355 group.users.add(user) 356 return Response(status=204) 357 358 @permission_required("authentik_core.remove_user_from_group") 359 @extend_schema( 360 request=UserAccountSerializer, 361 responses={ 362 204: OpenApiResponse(description="User removed"), 363 404: OpenApiResponse(description="User not found"), 364 }, 365 ) 366 @action( 367 detail=True, 368 methods=["POST"], 369 pagination_class=None, 370 filter_backends=[], 371 permission_classes=[IsAuthenticated], 372 ) 373 @validate(UserAccountSerializer) 374 def remove_user(self, request: Request, body: UserAccountSerializer, pk: str) -> Response: 375 """Remove user from group""" 376 group: Group = self.get_object() 377 user: User = ( 378 get_objects_for_user(request.user, "authentik_core.view_user") 379 .filter( 380 pk=body.validated_data.get("pk"), 381 ) 382 .first() 383 ) 384 if not user: 385 raise Http404 386 group.users.remove(user) 387 return Response(status=204)
48class PartialUserSerializer(ModelSerializer): 49 """Partial User Serializer, does not include child relations.""" 50 51 attributes = JSONDictField(required=False) 52 uid = CharField(read_only=True) 53 54 class Meta: 55 model = User 56 fields = PARTIAL_USER_SERIALIZER_MODEL_FIELDS + ["uid"]
Partial User Serializer, does not include child relations.
Inherited Members
59class RelatedGroupSerializer(ModelSerializer): 60 """Stripped down group serializer to show relevant children/parents for groups""" 61 62 attributes = JSONDictField(required=False) 63 64 class Meta: 65 model = Group 66 fields = [ 67 "pk", 68 "name", 69 "is_superuser", 70 "attributes", 71 "group_uuid", 72 ]
Stripped down group serializer to show relevant children/parents for groups
Inherited Members
64 class Meta: 65 model = Group 66 fields = [ 67 "pk", 68 "name", 69 "is_superuser", 70 "attributes", 71 "group_uuid", 72 ]
75class GroupSerializer(ModelSerializer): 76 """Group Serializer""" 77 78 attributes = JSONDictField(required=False) 79 parents = PrimaryKeyRelatedField(queryset=Group.objects.all(), many=True, required=False) 80 parents_obj = SerializerMethodField(allow_null=True) 81 children_obj = SerializerMethodField(allow_null=True) 82 users_obj = SerializerMethodField(allow_null=True) 83 roles_obj = ListSerializer( 84 child=RoleSerializer(), 85 read_only=True, 86 source="roles", 87 required=False, 88 ) 89 inherited_roles_obj = SerializerMethodField(allow_null=True) 90 num_pk = IntegerField(read_only=True) 91 92 @property 93 def _should_include_users(self) -> bool: 94 request: Request = self.context.get("request", None) 95 if not request: 96 return True 97 return str(request.query_params.get("include_users", "true")).lower() == "true" 98 99 @property 100 def _should_include_children(self) -> bool: 101 request: Request = self.context.get("request", None) 102 if not request: 103 return True 104 return str(request.query_params.get("include_children", "false")).lower() == "true" 105 106 @property 107 def _should_include_parents(self) -> bool: 108 request: Request = self.context.get("request", None) 109 if not request: 110 return True 111 return str(request.query_params.get("include_parents", "false")).lower() == "true" 112 113 @property 114 def _should_include_inherited_roles(self) -> bool: 115 request: Request = self.context.get("request", None) 116 if not request: 117 return True 118 return str(request.query_params.get("include_inherited_roles", "false")).lower() == "true" 119 120 @extend_schema_field(PartialUserSerializer(many=True)) 121 def get_users_obj(self, instance: Group) -> list[PartialUserSerializer] | None: 122 if not self._should_include_users: 123 return None 124 return PartialUserSerializer(instance.users, many=True).data 125 126 @extend_schema_field(RelatedGroupSerializer(many=True)) 127 def get_children_obj(self, instance: Group) -> list[RelatedGroupSerializer] | None: 128 if not self._should_include_children: 129 return None 130 return RelatedGroupSerializer(instance.children, many=True).data 131 132 @extend_schema_field(RelatedGroupSerializer(many=True)) 133 def get_parents_obj(self, instance: Group) -> list[RelatedGroupSerializer] | None: 134 if not self._should_include_parents: 135 return None 136 return RelatedGroupSerializer(instance.parents, many=True).data 137 138 @extend_schema_field(RoleSerializer(many=True)) 139 def get_inherited_roles_obj(self, instance: Group) -> list | None: 140 """Return only inherited roles from ancestor groups (excludes direct roles)""" 141 if not self._should_include_inherited_roles: 142 return None 143 direct_role_pks = instance.roles.values_list("pk", flat=True) 144 inherited_roles = instance.all_roles().exclude(pk__in=direct_role_pks) 145 return RoleSerializer(inherited_roles, many=True).data 146 147 def validate_is_superuser(self, superuser: bool): 148 """Ensure that the user creating this group has permissions to set the superuser flag""" 149 request: Request = self.context.get("request", None) 150 if not request: 151 return superuser 152 # If we're updating an instance, and the state hasn't changed, we don't need to check perms 153 if self.instance and superuser == self.instance.is_superuser: 154 return superuser 155 user: User = request.user 156 perm = ( 157 "authentik_core.enable_group_superuser" 158 if superuser 159 else "authentik_core.disable_group_superuser" 160 ) 161 if self.instance or superuser: 162 has_perm = user.has_perm(perm) or user.has_perm(perm, self.instance) 163 if not has_perm: 164 raise ValidationError( 165 _( 166 ( 167 "User does not have permission to set " 168 "superuser status to {superuser_status}." 169 ).format_map({"superuser_status": superuser}) 170 ) 171 ) 172 return superuser 173 174 class Meta: 175 model = Group 176 fields = [ 177 "pk", 178 "num_pk", 179 "name", 180 "is_superuser", 181 "parents", 182 "parents_obj", 183 "users", 184 "users_obj", 185 "attributes", 186 "roles", 187 "roles_obj", 188 "inherited_roles_obj", 189 "children", 190 "children_obj", 191 ] 192 extra_kwargs = { 193 "users": { 194 "default": list, 195 }, 196 "children": { 197 "required": False, 198 "default": list, 199 }, 200 "parents": { 201 "required": False, 202 "default": list, 203 }, 204 }
Group Serializer
138 @extend_schema_field(RoleSerializer(many=True)) 139 def get_inherited_roles_obj(self, instance: Group) -> list | None: 140 """Return only inherited roles from ancestor groups (excludes direct roles)""" 141 if not self._should_include_inherited_roles: 142 return None 143 direct_role_pks = instance.roles.values_list("pk", flat=True) 144 inherited_roles = instance.all_roles().exclude(pk__in=direct_role_pks) 145 return RoleSerializer(inherited_roles, many=True).data
Return only inherited roles from ancestor groups (excludes direct roles)
147 def validate_is_superuser(self, superuser: bool): 148 """Ensure that the user creating this group has permissions to set the superuser flag""" 149 request: Request = self.context.get("request", None) 150 if not request: 151 return superuser 152 # If we're updating an instance, and the state hasn't changed, we don't need to check perms 153 if self.instance and superuser == self.instance.is_superuser: 154 return superuser 155 user: User = request.user 156 perm = ( 157 "authentik_core.enable_group_superuser" 158 if superuser 159 else "authentik_core.disable_group_superuser" 160 ) 161 if self.instance or superuser: 162 has_perm = user.has_perm(perm) or user.has_perm(perm, self.instance) 163 if not has_perm: 164 raise ValidationError( 165 _( 166 ( 167 "User does not have permission to set " 168 "superuser status to {superuser_status}." 169 ).format_map({"superuser_status": superuser}) 170 ) 171 ) 172 return superuser
Ensure that the user creating this group has permissions to set the superuser flag
Inherited Members
174 class Meta: 175 model = Group 176 fields = [ 177 "pk", 178 "num_pk", 179 "name", 180 "is_superuser", 181 "parents", 182 "parents_obj", 183 "users", 184 "users_obj", 185 "attributes", 186 "roles", 187 "roles_obj", 188 "inherited_roles_obj", 189 "children", 190 "children_obj", 191 ] 192 extra_kwargs = { 193 "users": { 194 "default": list, 195 }, 196 "children": { 197 "required": False, 198 "default": list, 199 }, 200 "parents": { 201 "required": False, 202 "default": list, 203 }, 204 }
207class GroupFilter(FilterSet): 208 """Filter for groups""" 209 210 attributes = CharFilter( 211 field_name="attributes", 212 lookup_expr="", 213 label="Attributes", 214 method="filter_attributes", 215 ) 216 217 members_by_username = ModelMultipleChoiceFilter( 218 field_name="users__username", 219 to_field_name="username", 220 queryset=User.objects.all(), 221 ) 222 members_by_pk = ModelMultipleChoiceFilter( 223 field_name="users", 224 queryset=User.objects.all(), 225 ) 226 227 def filter_attributes(self, queryset, name, value): 228 """Filter attributes by query args""" 229 try: 230 value = loads(value) 231 except ValueError: 232 raise ValidationError(detail="filter: failed to parse JSON") from None 233 if not isinstance(value, dict): 234 raise ValidationError(detail="filter: value must be key:value mapping") 235 qs = {} 236 for key, _value in value.items(): 237 qs[f"attributes__{key}"] = _value 238 try: 239 _ = len(queryset.filter(**qs)) 240 return queryset.filter(**qs) 241 except ValueError: 242 return queryset 243 244 class Meta: 245 model = Group 246 fields = ["name", "is_superuser", "members_by_pk", "attributes", "members_by_username"]
Filter for groups
227 def filter_attributes(self, queryset, name, value): 228 """Filter attributes by query args""" 229 try: 230 value = loads(value) 231 except ValueError: 232 raise ValidationError(detail="filter: failed to parse JSON") from None 233 if not isinstance(value, dict): 234 raise ValidationError(detail="filter: value must be key:value mapping") 235 qs = {} 236 for key, _value in value.items(): 237 qs[f"attributes__{key}"] = _value 238 try: 239 _ = len(queryset.filter(**qs)) 240 return queryset.filter(**qs) 241 except ValueError: 242 return queryset
Filter attributes by query args
244 class Meta: 245 model = Group 246 fields = ["name", "is_superuser", "members_by_pk", "attributes", "members_by_username"]
249class GroupViewSet(UsedByMixin, ModelViewSet): 250 """Group Viewset""" 251 252 class UserAccountSerializer(PassiveSerializer): 253 """Account adding/removing operations""" 254 255 pk = IntegerField(required=True) 256 257 queryset = Group.objects.none() 258 serializer_class = GroupSerializer 259 search_fields = ["name", "is_superuser"] 260 filterset_class = GroupFilter 261 ordering = ["name"] 262 authentication_classes = [ 263 TokenAuthentication, 264 SessionAuthentication, 265 AgentAuth, 266 ] 267 268 def get_ql_fields(self): 269 from djangoql.schema import BoolField, StrField 270 271 from authentik.enterprise.search.fields import ( 272 JSONSearchField, 273 ) 274 275 return [ 276 StrField(Group, "name"), 277 BoolField(Group, "is_superuser", nullable=True), 278 JSONSearchField(Group, "attributes"), 279 ] 280 281 def get_queryset(self): 282 base_qs = Group.objects.all().prefetch_related("roles") 283 284 if self.serializer_class(context={"request": self.request})._should_include_users: 285 # Only fetch fields needed by PartialUserSerializer to reduce DB load and instantiation 286 # time 287 base_qs = base_qs.prefetch_related( 288 Prefetch( 289 "users", 290 queryset=User.objects.all().only(*PARTIAL_USER_SERIALIZER_MODEL_FIELDS), 291 ) 292 ) 293 else: 294 base_qs = base_qs.prefetch_related( 295 Prefetch("users", queryset=User.objects.all().only("id")) 296 ) 297 298 if self.serializer_class(context={"request": self.request})._should_include_children: 299 base_qs = base_qs.prefetch_related("children") 300 301 if self.serializer_class(context={"request": self.request})._should_include_parents: 302 base_qs = base_qs.prefetch_related("parents") 303 304 return base_qs 305 306 @extend_schema( 307 parameters=[ 308 OpenApiParameter("include_users", bool, default=True), 309 OpenApiParameter("include_children", bool, default=False), 310 OpenApiParameter("include_parents", bool, default=False), 311 OpenApiParameter("include_inherited_roles", bool, default=False), 312 ] 313 ) 314 def list(self, request, *args, **kwargs): 315 return super().list(request, *args, **kwargs) 316 317 @extend_schema( 318 parameters=[ 319 OpenApiParameter("include_users", bool, default=True), 320 OpenApiParameter("include_children", bool, default=False), 321 OpenApiParameter("include_parents", bool, default=False), 322 OpenApiParameter("include_inherited_roles", bool, default=False), 323 ] 324 ) 325 def retrieve(self, request, *args, **kwargs): 326 return super().retrieve(request, *args, **kwargs) 327 328 @permission_required("authentik_core.add_user_to_group") 329 @extend_schema( 330 request=UserAccountSerializer, 331 responses={ 332 204: OpenApiResponse(description="User added"), 333 404: OpenApiResponse(description="User not found"), 334 }, 335 ) 336 @action( 337 detail=True, 338 methods=["POST"], 339 pagination_class=None, 340 filter_backends=[], 341 permission_classes=[IsAuthenticated], 342 ) 343 @validate(UserAccountSerializer) 344 def add_user(self, request: Request, body: UserAccountSerializer, pk: str) -> Response: 345 """Add user to group""" 346 group: Group = self.get_object() 347 user: User = ( 348 get_objects_for_user(request.user, "authentik_core.view_user") 349 .filter( 350 pk=body.validated_data.get("pk"), 351 ) 352 .first() 353 ) 354 if not user: 355 raise Http404 356 group.users.add(user) 357 return Response(status=204) 358 359 @permission_required("authentik_core.remove_user_from_group") 360 @extend_schema( 361 request=UserAccountSerializer, 362 responses={ 363 204: OpenApiResponse(description="User removed"), 364 404: OpenApiResponse(description="User not found"), 365 }, 366 ) 367 @action( 368 detail=True, 369 methods=["POST"], 370 pagination_class=None, 371 filter_backends=[], 372 permission_classes=[IsAuthenticated], 373 ) 374 @validate(UserAccountSerializer) 375 def remove_user(self, request: Request, body: UserAccountSerializer, pk: str) -> Response: 376 """Remove user from group""" 377 group: Group = self.get_object() 378 user: User = ( 379 get_objects_for_user(request.user, "authentik_core.view_user") 380 .filter( 381 pk=body.validated_data.get("pk"), 382 ) 383 .first() 384 ) 385 if not user: 386 raise Http404 387 group.users.remove(user) 388 return Response(status=204)
Group Viewset
268 def get_ql_fields(self): 269 from djangoql.schema import BoolField, StrField 270 271 from authentik.enterprise.search.fields import ( 272 JSONSearchField, 273 ) 274 275 return [ 276 StrField(Group, "name"), 277 BoolField(Group, "is_superuser", nullable=True), 278 JSONSearchField(Group, "attributes"), 279 ]
281 def get_queryset(self): 282 base_qs = Group.objects.all().prefetch_related("roles") 283 284 if self.serializer_class(context={"request": self.request})._should_include_users: 285 # Only fetch fields needed by PartialUserSerializer to reduce DB load and instantiation 286 # time 287 base_qs = base_qs.prefetch_related( 288 Prefetch( 289 "users", 290 queryset=User.objects.all().only(*PARTIAL_USER_SERIALIZER_MODEL_FIELDS), 291 ) 292 ) 293 else: 294 base_qs = base_qs.prefetch_related( 295 Prefetch("users", queryset=User.objects.all().only("id")) 296 ) 297 298 if self.serializer_class(context={"request": self.request})._should_include_children: 299 base_qs = base_qs.prefetch_related("children") 300 301 if self.serializer_class(context={"request": self.request})._should_include_parents: 302 base_qs = base_qs.prefetch_related("parents") 303 304 return base_qs
Get the list of items for this view.
This must be an iterable, and may be a queryset.
Defaults to using self.queryset.
This method should always be used rather than accessing self.queryset
directly, as self.queryset gets evaluated only once, and those results
are cached for all subsequent requests.
You may want to override this if you need to provide different querysets depending on the incoming request.
(Eg. return a list of items that is specific to the user)
306 @extend_schema( 307 parameters=[ 308 OpenApiParameter("include_users", bool, default=True), 309 OpenApiParameter("include_children", bool, default=False), 310 OpenApiParameter("include_parents", bool, default=False), 311 OpenApiParameter("include_inherited_roles", bool, default=False), 312 ] 313 ) 314 def list(self, request, *args, **kwargs): 315 return super().list(request, *args, **kwargs)
317 @extend_schema( 318 parameters=[ 319 OpenApiParameter("include_users", bool, default=True), 320 OpenApiParameter("include_children", bool, default=False), 321 OpenApiParameter("include_parents", bool, default=False), 322 OpenApiParameter("include_inherited_roles", bool, default=False), 323 ] 324 ) 325 def retrieve(self, request, *args, **kwargs): 326 return super().retrieve(request, *args, **kwargs)
328 @permission_required("authentik_core.add_user_to_group") 329 @extend_schema( 330 request=UserAccountSerializer, 331 responses={ 332 204: OpenApiResponse(description="User added"), 333 404: OpenApiResponse(description="User not found"), 334 }, 335 ) 336 @action( 337 detail=True, 338 methods=["POST"], 339 pagination_class=None, 340 filter_backends=[], 341 permission_classes=[IsAuthenticated], 342 ) 343 @validate(UserAccountSerializer) 344 def add_user(self, request: Request, body: UserAccountSerializer, pk: str) -> Response: 345 """Add user to group""" 346 group: Group = self.get_object() 347 user: User = ( 348 get_objects_for_user(request.user, "authentik_core.view_user") 349 .filter( 350 pk=body.validated_data.get("pk"), 351 ) 352 .first() 353 ) 354 if not user: 355 raise Http404 356 group.users.add(user) 357 return Response(status=204)
Add user to group
359 @permission_required("authentik_core.remove_user_from_group") 360 @extend_schema( 361 request=UserAccountSerializer, 362 responses={ 363 204: OpenApiResponse(description="User removed"), 364 404: OpenApiResponse(description="User not found"), 365 }, 366 ) 367 @action( 368 detail=True, 369 methods=["POST"], 370 pagination_class=None, 371 filter_backends=[], 372 permission_classes=[IsAuthenticated], 373 ) 374 @validate(UserAccountSerializer) 375 def remove_user(self, request: Request, body: UserAccountSerializer, pk: str) -> Response: 376 """Remove user from group""" 377 group: Group = self.get_object() 378 user: User = ( 379 get_objects_for_user(request.user, "authentik_core.view_user") 380 .filter( 381 pk=body.validated_data.get("pk"), 382 ) 383 .first() 384 ) 385 if not user: 386 raise Http404 387 group.users.remove(user) 388 return Response(status=204)
Remove user from group
Inherited Members
252 class UserAccountSerializer(PassiveSerializer): 253 """Account adding/removing operations""" 254 255 pk = IntegerField(required=True)
Account adding/removing operations