authentik.core.api.groups
Groups API Viewset
1"""Groups API Viewset""" 2 3from json import loads 4 5from django.db.models import Prefetch 6from django.http import Http404 7from django.utils.translation import gettext as _ 8from django_filters.filters import CharFilter, ModelMultipleChoiceFilter 9from django_filters.filterset import FilterSet 10from djangoql.schema import BoolField, StrField 11from drf_spectacular.utils import ( 12 OpenApiParameter, 13 OpenApiResponse, 14 extend_schema, 15 extend_schema_field, 16) 17from guardian.shortcuts import get_objects_for_user 18from rest_framework.authentication import SessionAuthentication 19from rest_framework.decorators import action 20from rest_framework.fields import CharField, IntegerField, SerializerMethodField 21from rest_framework.permissions import IsAuthenticated 22from rest_framework.relations import ManyRelatedField, PrimaryKeyRelatedField 23from rest_framework.request import Request 24from rest_framework.response import Response 25from rest_framework.serializers import ListSerializer, ValidationError 26from rest_framework.viewsets import ModelViewSet 27 28from authentik.api.authentication import TokenAuthentication 29from authentik.api.search.fields import ( 30 JSONSearchField, 31) 32from authentik.api.validation import validate 33from authentik.core.api.used_by import UsedByMixin 34from authentik.core.api.utils import JSONDictField, ModelSerializer, PassiveSerializer 35from authentik.core.models import Group, User 36from authentik.endpoints.connectors.agent.auth import AgentAuth 37from authentik.rbac.api.roles import RoleSerializer 38from authentik.rbac.decorators import permission_required 39 40 41class BulkManyRelatedField(ManyRelatedField): 42 """ManyRelatedField that validates all PKs in a single query instead of one per PK.""" 43 44 def to_internal_value(self, data): 45 if isinstance(data, str) or not hasattr(data, "__iter__"): 46 self.fail("not_a_list", input_type=type(data).__name__) 47 if not self.allow_empty and len(data) == 0: 48 self.fail("empty") 49 50 child = self.child_relation 51 pk_field = child.pk_field 52 # Coerce PKs through pk_field if defined 53 pk_map = {} 54 for item in data: 55 if isinstance(item, bool): 56 self.fail("incorrect_type", data_type=type(item).__name__) 57 pk = pk_field.to_internal_value(item) if pk_field else item 58 pk_map[pk] = item # map coerced PK -> original value for error reporting 59 60 queryset = child.get_queryset() 61 # Use count to validate all PKs exist in a single query 62 found_count = queryset.filter(pk__in=pk_map.keys()).count() 63 if found_count < len(pk_map): 64 # Some PKs not found — fall back to per-PK checks for error reporting. 65 # This only runs when there's an actual validation error (rare path). 66 for pk, original in pk_map.items(): 67 if not queryset.filter(pk=pk).exists(): 68 child.fail("does_not_exist", pk_value=original) 69 70 # Return raw PKs — Django's M2M set() accepts both objects and PKs, 71 # using get_prep_value() for type coercion. This avoids loading all 72 # objects into memory and avoids triggering post_init signals. 73 return list(pk_map.keys()) 74 75 def to_representation(self, iterable): 76 # For non-prefetched querysets, get PKs directly without loading model instances. 77 # When prefetched, _result_cache is a list (possibly empty); when not, it's None. 78 if hasattr(iterable, "values_list") and getattr(iterable, "_result_cache", None) is None: 79 return list(iterable.values_list("pk", flat=True)) 80 return super().to_representation(iterable) 81 82 83class BulkPrimaryKeyRelatedField(PrimaryKeyRelatedField): 84 """PrimaryKeyRelatedField that uses bulk validation when many=True.""" 85 86 @classmethod 87 def many_init(cls, *args, **kwargs): 88 allow_empty = kwargs.pop("allow_empty", None) 89 max_length = kwargs.pop("max_length", None) 90 min_length = kwargs.pop("min_length", None) 91 child_relation = cls(*args, **kwargs) 92 list_kwargs = { 93 "child_relation": child_relation, 94 } 95 if allow_empty is not None: 96 list_kwargs["allow_empty"] = allow_empty 97 if max_length is not None: 98 list_kwargs["max_length"] = max_length 99 if min_length is not None: 100 list_kwargs["min_length"] = min_length 101 list_kwargs.update( 102 { 103 key: value 104 for key, value in kwargs.items() 105 if key in ("required", "default", "source") 106 } 107 ) 108 return BulkManyRelatedField(**list_kwargs) 109 110 111PARTIAL_USER_SERIALIZER_MODEL_FIELDS = [ 112 "pk", 113 "username", 114 "name", 115 "is_active", 116 "last_login", 117 "email", 118 "attributes", 119] 120 121 122class PartialUserSerializer(ModelSerializer): 123 """Partial User Serializer, does not include child relations.""" 124 125 attributes = JSONDictField(required=False) 126 uid = CharField(read_only=True) 127 128 class Meta: 129 model = User 130 fields = PARTIAL_USER_SERIALIZER_MODEL_FIELDS + ["uid"] 131 132 133class RelatedGroupSerializer(ModelSerializer): 134 """Stripped down group serializer to show relevant children/parents for groups""" 135 136 attributes = JSONDictField(required=False) 137 138 class Meta: 139 model = Group 140 fields = [ 141 "pk", 142 "name", 143 "is_superuser", 144 "attributes", 145 "group_uuid", 146 ] 147 148 149class GroupSerializer(ModelSerializer): 150 """Group Serializer""" 151 152 attributes = JSONDictField(required=False) 153 users = BulkPrimaryKeyRelatedField(queryset=User.objects.all(), many=True, default=list) 154 parents = PrimaryKeyRelatedField(queryset=Group.objects.all(), many=True, required=False) 155 parents_obj = SerializerMethodField(allow_null=True) 156 children_obj = SerializerMethodField(allow_null=True) 157 users_obj = SerializerMethodField(allow_null=True) 158 roles_obj = ListSerializer( 159 child=RoleSerializer(), 160 read_only=True, 161 source="roles", 162 required=False, 163 ) 164 inherited_roles_obj = SerializerMethodField(allow_null=True) 165 num_pk = IntegerField(read_only=True) 166 167 @property 168 def _should_include_users(self) -> bool: 169 request: Request = self.context.get("request", None) 170 if not request: 171 return True 172 return str(request.query_params.get("include_users", "true")).lower() == "true" 173 174 @property 175 def _should_include_children(self) -> bool: 176 request: Request = self.context.get("request", None) 177 if not request: 178 return True 179 return str(request.query_params.get("include_children", "false")).lower() == "true" 180 181 @property 182 def _should_include_parents(self) -> bool: 183 request: Request = self.context.get("request", None) 184 if not request: 185 return True 186 return str(request.query_params.get("include_parents", "false")).lower() == "true" 187 188 @property 189 def _should_include_inherited_roles(self) -> bool: 190 request: Request = self.context.get("request", None) 191 if not request: 192 return True 193 return str(request.query_params.get("include_inherited_roles", "false")).lower() == "true" 194 195 @extend_schema_field(PartialUserSerializer(many=True)) 196 def get_users_obj(self, instance: Group) -> list[PartialUserSerializer] | None: 197 if not self._should_include_users: 198 return None 199 return PartialUserSerializer(instance.users, many=True).data 200 201 @extend_schema_field(RelatedGroupSerializer(many=True)) 202 def get_children_obj(self, instance: Group) -> list[RelatedGroupSerializer] | None: 203 if not self._should_include_children: 204 return None 205 return RelatedGroupSerializer(instance.children, many=True).data 206 207 @extend_schema_field(RelatedGroupSerializer(many=True)) 208 def get_parents_obj(self, instance: Group) -> list[RelatedGroupSerializer] | None: 209 if not self._should_include_parents: 210 return None 211 return RelatedGroupSerializer(instance.parents, many=True).data 212 213 @extend_schema_field(RoleSerializer(many=True)) 214 def get_inherited_roles_obj(self, instance: Group) -> list | None: 215 """Return only inherited roles from ancestor groups (excludes direct roles)""" 216 if not self._should_include_inherited_roles: 217 return None 218 direct_role_pks = instance.roles.values_list("pk", flat=True) 219 inherited_roles = instance.all_roles().exclude(pk__in=direct_role_pks) 220 return RoleSerializer(inherited_roles, many=True).data 221 222 def validate_is_superuser(self, superuser: bool): 223 """Ensure that the user creating this group has permissions to set the superuser flag""" 224 request: Request = self.context.get("request", None) 225 if not request: 226 return superuser 227 # If we're updating an instance, and the state hasn't changed, we don't need to check perms 228 if self.instance and superuser == self.instance.is_superuser: 229 return superuser 230 user: User = request.user 231 perm = ( 232 "authentik_core.enable_group_superuser" 233 if superuser 234 else "authentik_core.disable_group_superuser" 235 ) 236 if self.instance or superuser: 237 has_perm = user.has_perm(perm) or user.has_perm(perm, self.instance) 238 if not has_perm: 239 raise ValidationError( 240 _( 241 ( 242 "User does not have permission to set " 243 "superuser status to {superuser_status}." 244 ).format_map({"superuser_status": superuser}) 245 ) 246 ) 247 return superuser 248 249 def validate_users(self, users: list) -> list: 250 """Require add_user_to_group permission when adding new members via group PATCH.""" 251 request: Request = self.context.get("request", None) 252 if not request: 253 return users 254 if not self.instance: 255 return users 256 # BulkManyRelatedField returns raw PKs, not model instances 257 current_user_pks = set(self.instance.users.values_list("pk", flat=True)) 258 new_users = [u for u in users if u not in current_user_pks] 259 if not new_users: 260 return users 261 has_perm = request.user.has_perm( 262 "authentik_core.add_user_to_group" 263 ) or request.user.has_perm("authentik_core.add_user_to_group", self.instance) 264 if not has_perm: 265 raise ValidationError(_("User does not have permission to add members to this group.")) 266 return users 267 268 class Meta: 269 model = Group 270 fields = [ 271 "pk", 272 "num_pk", 273 "name", 274 "is_superuser", 275 "parents", 276 "parents_obj", 277 "users", 278 "users_obj", 279 "attributes", 280 "roles", 281 "roles_obj", 282 "inherited_roles_obj", 283 "children", 284 "children_obj", 285 ] 286 extra_kwargs = { 287 "children": { 288 "required": False, 289 "default": list, 290 }, 291 "parents": { 292 "required": False, 293 "default": list, 294 }, 295 } 296 297 298class GroupFilter(FilterSet): 299 """Filter for groups""" 300 301 attributes = CharFilter( 302 field_name="attributes", 303 lookup_expr="", 304 label="Attributes", 305 method="filter_attributes", 306 ) 307 308 members_by_username = ModelMultipleChoiceFilter( 309 field_name="users__username", 310 to_field_name="username", 311 queryset=User.objects.all(), 312 ) 313 members_by_pk = ModelMultipleChoiceFilter( 314 field_name="users", 315 queryset=User.objects.all(), 316 distinct=False, 317 ) 318 319 def filter_attributes(self, queryset, name, value): 320 """Filter attributes by query args""" 321 try: 322 value = loads(value) 323 except ValueError: 324 raise ValidationError(detail="filter: failed to parse JSON") from None 325 if not isinstance(value, dict): 326 raise ValidationError(detail="filter: value must be key:value mapping") 327 qs = {} 328 for key, _value in value.items(): 329 qs[f"attributes__{key}"] = _value 330 try: 331 _ = len(queryset.filter(**qs)) 332 return queryset.filter(**qs) 333 except ValueError: 334 return queryset 335 336 class Meta: 337 model = Group 338 fields = ["name", "is_superuser", "members_by_pk", "attributes", "members_by_username"] 339 340 341class GroupViewSet(UsedByMixin, ModelViewSet): 342 """Group Viewset""" 343 344 class UserAccountSerializer(PassiveSerializer): 345 """Account adding/removing operations""" 346 347 pk = IntegerField(required=True) 348 349 queryset = Group.objects.none() 350 serializer_class = GroupSerializer 351 search_fields = ["name", "is_superuser"] 352 filterset_class = GroupFilter 353 ordering = ["name"] 354 authentication_classes = [ 355 TokenAuthentication, 356 SessionAuthentication, 357 AgentAuth, 358 ] 359 360 def get_ql_fields(self): 361 return [ 362 StrField(Group, "name"), 363 BoolField(Group, "is_superuser", nullable=True), 364 JSONSearchField(Group, "attributes"), 365 ] 366 367 def get_queryset(self): 368 # Always prefetch parents and children since their PKs are always serialized 369 base_qs = Group.objects.all().prefetch_related("roles", "parents", "children") 370 371 if self.serializer_class(context={"request": self.request})._should_include_users: 372 # Only fetch fields needed by PartialUserSerializer to reduce DB load and instantiation 373 # time 374 base_qs = base_qs.prefetch_related( 375 Prefetch( 376 "users", 377 queryset=User.objects.all().only(*PARTIAL_USER_SERIALIZER_MODEL_FIELDS), 378 ) 379 ) 380 # When include_users=false, skip users prefetch entirely. 381 # BulkManyRelatedField.to_representation will use values_list to get PKs 382 # directly without loading User instances into memory. 383 384 return base_qs 385 386 @extend_schema( 387 parameters=[ 388 OpenApiParameter("include_users", bool, default=True), 389 OpenApiParameter("include_children", bool, default=False), 390 OpenApiParameter("include_parents", bool, default=False), 391 OpenApiParameter("include_inherited_roles", bool, default=False), 392 ] 393 ) 394 def list(self, request, *args, **kwargs): 395 return super().list(request, *args, **kwargs) 396 397 @extend_schema( 398 parameters=[ 399 OpenApiParameter("include_users", bool, default=True), 400 OpenApiParameter("include_children", bool, default=False), 401 OpenApiParameter("include_parents", bool, default=False), 402 OpenApiParameter("include_inherited_roles", bool, default=False), 403 ] 404 ) 405 def retrieve(self, request, *args, **kwargs): 406 return super().retrieve(request, *args, **kwargs) 407 408 @permission_required("authentik_core.add_user_to_group") 409 @extend_schema( 410 request=UserAccountSerializer, 411 responses={ 412 204: OpenApiResponse(description="User added"), 413 404: OpenApiResponse(description="User not found"), 414 }, 415 ) 416 @action( 417 detail=True, 418 methods=["POST"], 419 pagination_class=None, 420 filter_backends=[], 421 permission_classes=[IsAuthenticated], 422 ) 423 @validate(UserAccountSerializer) 424 def add_user(self, request: Request, body: UserAccountSerializer, pk: str) -> Response: 425 """Add user to group""" 426 group: Group = self.get_object() 427 user: User = ( 428 get_objects_for_user(request.user, "authentik_core.view_user") 429 .filter( 430 pk=body.validated_data.get("pk"), 431 ) 432 .first() 433 ) 434 if not user: 435 raise Http404 436 group.users.add(user) 437 return Response(status=204) 438 439 @permission_required("authentik_core.remove_user_from_group") 440 @extend_schema( 441 request=UserAccountSerializer, 442 responses={ 443 204: OpenApiResponse(description="User removed"), 444 404: OpenApiResponse(description="User not found"), 445 }, 446 ) 447 @action( 448 detail=True, 449 methods=["POST"], 450 pagination_class=None, 451 filter_backends=[], 452 permission_classes=[IsAuthenticated], 453 ) 454 @validate(UserAccountSerializer) 455 def remove_user(self, request: Request, body: UserAccountSerializer, pk: str) -> Response: 456 """Remove user from group""" 457 group: Group = self.get_object() 458 user: User = ( 459 get_objects_for_user(request.user, "authentik_core.view_user") 460 .filter( 461 pk=body.validated_data.get("pk"), 462 ) 463 .first() 464 ) 465 if not user: 466 raise Http404 467 group.users.remove(user) 468 return Response(status=204)
42class BulkManyRelatedField(ManyRelatedField): 43 """ManyRelatedField that validates all PKs in a single query instead of one per PK.""" 44 45 def to_internal_value(self, data): 46 if isinstance(data, str) or not hasattr(data, "__iter__"): 47 self.fail("not_a_list", input_type=type(data).__name__) 48 if not self.allow_empty and len(data) == 0: 49 self.fail("empty") 50 51 child = self.child_relation 52 pk_field = child.pk_field 53 # Coerce PKs through pk_field if defined 54 pk_map = {} 55 for item in data: 56 if isinstance(item, bool): 57 self.fail("incorrect_type", data_type=type(item).__name__) 58 pk = pk_field.to_internal_value(item) if pk_field else item 59 pk_map[pk] = item # map coerced PK -> original value for error reporting 60 61 queryset = child.get_queryset() 62 # Use count to validate all PKs exist in a single query 63 found_count = queryset.filter(pk__in=pk_map.keys()).count() 64 if found_count < len(pk_map): 65 # Some PKs not found — fall back to per-PK checks for error reporting. 66 # This only runs when there's an actual validation error (rare path). 67 for pk, original in pk_map.items(): 68 if not queryset.filter(pk=pk).exists(): 69 child.fail("does_not_exist", pk_value=original) 70 71 # Return raw PKs — Django's M2M set() accepts both objects and PKs, 72 # using get_prep_value() for type coercion. This avoids loading all 73 # objects into memory and avoids triggering post_init signals. 74 return list(pk_map.keys()) 75 76 def to_representation(self, iterable): 77 # For non-prefetched querysets, get PKs directly without loading model instances. 78 # When prefetched, _result_cache is a list (possibly empty); when not, it's None. 79 if hasattr(iterable, "values_list") and getattr(iterable, "_result_cache", None) is None: 80 return list(iterable.values_list("pk", flat=True)) 81 return super().to_representation(iterable)
ManyRelatedField that validates all PKs in a single query instead of one per PK.
45 def to_internal_value(self, data): 46 if isinstance(data, str) or not hasattr(data, "__iter__"): 47 self.fail("not_a_list", input_type=type(data).__name__) 48 if not self.allow_empty and len(data) == 0: 49 self.fail("empty") 50 51 child = self.child_relation 52 pk_field = child.pk_field 53 # Coerce PKs through pk_field if defined 54 pk_map = {} 55 for item in data: 56 if isinstance(item, bool): 57 self.fail("incorrect_type", data_type=type(item).__name__) 58 pk = pk_field.to_internal_value(item) if pk_field else item 59 pk_map[pk] = item # map coerced PK -> original value for error reporting 60 61 queryset = child.get_queryset() 62 # Use count to validate all PKs exist in a single query 63 found_count = queryset.filter(pk__in=pk_map.keys()).count() 64 if found_count < len(pk_map): 65 # Some PKs not found — fall back to per-PK checks for error reporting. 66 # This only runs when there's an actual validation error (rare path). 67 for pk, original in pk_map.items(): 68 if not queryset.filter(pk=pk).exists(): 69 child.fail("does_not_exist", pk_value=original) 70 71 # Return raw PKs — Django's M2M set() accepts both objects and PKs, 72 # using get_prep_value() for type coercion. This avoids loading all 73 # objects into memory and avoids triggering post_init signals. 74 return list(pk_map.keys())
Transform the incoming primitive data into a native value.
76 def to_representation(self, iterable): 77 # For non-prefetched querysets, get PKs directly without loading model instances. 78 # When prefetched, _result_cache is a list (possibly empty); when not, it's None. 79 if hasattr(iterable, "values_list") and getattr(iterable, "_result_cache", None) is None: 80 return list(iterable.values_list("pk", flat=True)) 81 return super().to_representation(iterable)
Transform the outgoing native value into primitive data.
84class BulkPrimaryKeyRelatedField(PrimaryKeyRelatedField): 85 """PrimaryKeyRelatedField that uses bulk validation when many=True.""" 86 87 @classmethod 88 def many_init(cls, *args, **kwargs): 89 allow_empty = kwargs.pop("allow_empty", None) 90 max_length = kwargs.pop("max_length", None) 91 min_length = kwargs.pop("min_length", None) 92 child_relation = cls(*args, **kwargs) 93 list_kwargs = { 94 "child_relation": child_relation, 95 } 96 if allow_empty is not None: 97 list_kwargs["allow_empty"] = allow_empty 98 if max_length is not None: 99 list_kwargs["max_length"] = max_length 100 if min_length is not None: 101 list_kwargs["min_length"] = min_length 102 list_kwargs.update( 103 { 104 key: value 105 for key, value in kwargs.items() 106 if key in ("required", "default", "source") 107 } 108 ) 109 return BulkManyRelatedField(**list_kwargs)
PrimaryKeyRelatedField that uses bulk validation when many=True.
87 @classmethod 88 def many_init(cls, *args, **kwargs): 89 allow_empty = kwargs.pop("allow_empty", None) 90 max_length = kwargs.pop("max_length", None) 91 min_length = kwargs.pop("min_length", None) 92 child_relation = cls(*args, **kwargs) 93 list_kwargs = { 94 "child_relation": child_relation, 95 } 96 if allow_empty is not None: 97 list_kwargs["allow_empty"] = allow_empty 98 if max_length is not None: 99 list_kwargs["max_length"] = max_length 100 if min_length is not None: 101 list_kwargs["min_length"] = min_length 102 list_kwargs.update( 103 { 104 key: value 105 for key, value in kwargs.items() 106 if key in ("required", "default", "source") 107 } 108 ) 109 return BulkManyRelatedField(**list_kwargs)
This method handles creating a parent ManyRelatedField instance
when the many=True keyword argument is passed.
Typically you won't need to override this method.
Note that we're over-cautious in passing most arguments to both parent and child classes in order to try to cover the general case. If you're overriding this method you'll probably want something much simpler, eg:
@classmethod def many_init(cls, *args, **kwargs): kwargs['child'] = cls() return CustomManyRelatedField(*args, **kwargs)
123class PartialUserSerializer(ModelSerializer): 124 """Partial User Serializer, does not include child relations.""" 125 126 attributes = JSONDictField(required=False) 127 uid = CharField(read_only=True) 128 129 class Meta: 130 model = User 131 fields = PARTIAL_USER_SERIALIZER_MODEL_FIELDS + ["uid"]
Partial User Serializer, does not include child relations.
Inherited Members
134class RelatedGroupSerializer(ModelSerializer): 135 """Stripped down group serializer to show relevant children/parents for groups""" 136 137 attributes = JSONDictField(required=False) 138 139 class Meta: 140 model = Group 141 fields = [ 142 "pk", 143 "name", 144 "is_superuser", 145 "attributes", 146 "group_uuid", 147 ]
Stripped down group serializer to show relevant children/parents for groups
Inherited Members
139 class Meta: 140 model = Group 141 fields = [ 142 "pk", 143 "name", 144 "is_superuser", 145 "attributes", 146 "group_uuid", 147 ]
150class GroupSerializer(ModelSerializer): 151 """Group Serializer""" 152 153 attributes = JSONDictField(required=False) 154 users = BulkPrimaryKeyRelatedField(queryset=User.objects.all(), many=True, default=list) 155 parents = PrimaryKeyRelatedField(queryset=Group.objects.all(), many=True, required=False) 156 parents_obj = SerializerMethodField(allow_null=True) 157 children_obj = SerializerMethodField(allow_null=True) 158 users_obj = SerializerMethodField(allow_null=True) 159 roles_obj = ListSerializer( 160 child=RoleSerializer(), 161 read_only=True, 162 source="roles", 163 required=False, 164 ) 165 inherited_roles_obj = SerializerMethodField(allow_null=True) 166 num_pk = IntegerField(read_only=True) 167 168 @property 169 def _should_include_users(self) -> bool: 170 request: Request = self.context.get("request", None) 171 if not request: 172 return True 173 return str(request.query_params.get("include_users", "true")).lower() == "true" 174 175 @property 176 def _should_include_children(self) -> bool: 177 request: Request = self.context.get("request", None) 178 if not request: 179 return True 180 return str(request.query_params.get("include_children", "false")).lower() == "true" 181 182 @property 183 def _should_include_parents(self) -> bool: 184 request: Request = self.context.get("request", None) 185 if not request: 186 return True 187 return str(request.query_params.get("include_parents", "false")).lower() == "true" 188 189 @property 190 def _should_include_inherited_roles(self) -> bool: 191 request: Request = self.context.get("request", None) 192 if not request: 193 return True 194 return str(request.query_params.get("include_inherited_roles", "false")).lower() == "true" 195 196 @extend_schema_field(PartialUserSerializer(many=True)) 197 def get_users_obj(self, instance: Group) -> list[PartialUserSerializer] | None: 198 if not self._should_include_users: 199 return None 200 return PartialUserSerializer(instance.users, many=True).data 201 202 @extend_schema_field(RelatedGroupSerializer(many=True)) 203 def get_children_obj(self, instance: Group) -> list[RelatedGroupSerializer] | None: 204 if not self._should_include_children: 205 return None 206 return RelatedGroupSerializer(instance.children, many=True).data 207 208 @extend_schema_field(RelatedGroupSerializer(many=True)) 209 def get_parents_obj(self, instance: Group) -> list[RelatedGroupSerializer] | None: 210 if not self._should_include_parents: 211 return None 212 return RelatedGroupSerializer(instance.parents, many=True).data 213 214 @extend_schema_field(RoleSerializer(many=True)) 215 def get_inherited_roles_obj(self, instance: Group) -> list | None: 216 """Return only inherited roles from ancestor groups (excludes direct roles)""" 217 if not self._should_include_inherited_roles: 218 return None 219 direct_role_pks = instance.roles.values_list("pk", flat=True) 220 inherited_roles = instance.all_roles().exclude(pk__in=direct_role_pks) 221 return RoleSerializer(inherited_roles, many=True).data 222 223 def validate_is_superuser(self, superuser: bool): 224 """Ensure that the user creating this group has permissions to set the superuser flag""" 225 request: Request = self.context.get("request", None) 226 if not request: 227 return superuser 228 # If we're updating an instance, and the state hasn't changed, we don't need to check perms 229 if self.instance and superuser == self.instance.is_superuser: 230 return superuser 231 user: User = request.user 232 perm = ( 233 "authentik_core.enable_group_superuser" 234 if superuser 235 else "authentik_core.disable_group_superuser" 236 ) 237 if self.instance or superuser: 238 has_perm = user.has_perm(perm) or user.has_perm(perm, self.instance) 239 if not has_perm: 240 raise ValidationError( 241 _( 242 ( 243 "User does not have permission to set " 244 "superuser status to {superuser_status}." 245 ).format_map({"superuser_status": superuser}) 246 ) 247 ) 248 return superuser 249 250 def validate_users(self, users: list) -> list: 251 """Require add_user_to_group permission when adding new members via group PATCH.""" 252 request: Request = self.context.get("request", None) 253 if not request: 254 return users 255 if not self.instance: 256 return users 257 # BulkManyRelatedField returns raw PKs, not model instances 258 current_user_pks = set(self.instance.users.values_list("pk", flat=True)) 259 new_users = [u for u in users if u not in current_user_pks] 260 if not new_users: 261 return users 262 has_perm = request.user.has_perm( 263 "authentik_core.add_user_to_group" 264 ) or request.user.has_perm("authentik_core.add_user_to_group", self.instance) 265 if not has_perm: 266 raise ValidationError(_("User does not have permission to add members to this group.")) 267 return users 268 269 class Meta: 270 model = Group 271 fields = [ 272 "pk", 273 "num_pk", 274 "name", 275 "is_superuser", 276 "parents", 277 "parents_obj", 278 "users", 279 "users_obj", 280 "attributes", 281 "roles", 282 "roles_obj", 283 "inherited_roles_obj", 284 "children", 285 "children_obj", 286 ] 287 extra_kwargs = { 288 "children": { 289 "required": False, 290 "default": list, 291 }, 292 "parents": { 293 "required": False, 294 "default": list, 295 }, 296 }
Group Serializer
214 @extend_schema_field(RoleSerializer(many=True)) 215 def get_inherited_roles_obj(self, instance: Group) -> list | None: 216 """Return only inherited roles from ancestor groups (excludes direct roles)""" 217 if not self._should_include_inherited_roles: 218 return None 219 direct_role_pks = instance.roles.values_list("pk", flat=True) 220 inherited_roles = instance.all_roles().exclude(pk__in=direct_role_pks) 221 return RoleSerializer(inherited_roles, many=True).data
Return only inherited roles from ancestor groups (excludes direct roles)
223 def validate_is_superuser(self, superuser: bool): 224 """Ensure that the user creating this group has permissions to set the superuser flag""" 225 request: Request = self.context.get("request", None) 226 if not request: 227 return superuser 228 # If we're updating an instance, and the state hasn't changed, we don't need to check perms 229 if self.instance and superuser == self.instance.is_superuser: 230 return superuser 231 user: User = request.user 232 perm = ( 233 "authentik_core.enable_group_superuser" 234 if superuser 235 else "authentik_core.disable_group_superuser" 236 ) 237 if self.instance or superuser: 238 has_perm = user.has_perm(perm) or user.has_perm(perm, self.instance) 239 if not has_perm: 240 raise ValidationError( 241 _( 242 ( 243 "User does not have permission to set " 244 "superuser status to {superuser_status}." 245 ).format_map({"superuser_status": superuser}) 246 ) 247 ) 248 return superuser
Ensure that the user creating this group has permissions to set the superuser flag
250 def validate_users(self, users: list) -> list: 251 """Require add_user_to_group permission when adding new members via group PATCH.""" 252 request: Request = self.context.get("request", None) 253 if not request: 254 return users 255 if not self.instance: 256 return users 257 # BulkManyRelatedField returns raw PKs, not model instances 258 current_user_pks = set(self.instance.users.values_list("pk", flat=True)) 259 new_users = [u for u in users if u not in current_user_pks] 260 if not new_users: 261 return users 262 has_perm = request.user.has_perm( 263 "authentik_core.add_user_to_group" 264 ) or request.user.has_perm("authentik_core.add_user_to_group", self.instance) 265 if not has_perm: 266 raise ValidationError(_("User does not have permission to add members to this group.")) 267 return users
Require add_user_to_group permission when adding new members via group PATCH.
Inherited Members
269 class Meta: 270 model = Group 271 fields = [ 272 "pk", 273 "num_pk", 274 "name", 275 "is_superuser", 276 "parents", 277 "parents_obj", 278 "users", 279 "users_obj", 280 "attributes", 281 "roles", 282 "roles_obj", 283 "inherited_roles_obj", 284 "children", 285 "children_obj", 286 ] 287 extra_kwargs = { 288 "children": { 289 "required": False, 290 "default": list, 291 }, 292 "parents": { 293 "required": False, 294 "default": list, 295 }, 296 }
299class GroupFilter(FilterSet): 300 """Filter for groups""" 301 302 attributes = CharFilter( 303 field_name="attributes", 304 lookup_expr="", 305 label="Attributes", 306 method="filter_attributes", 307 ) 308 309 members_by_username = ModelMultipleChoiceFilter( 310 field_name="users__username", 311 to_field_name="username", 312 queryset=User.objects.all(), 313 ) 314 members_by_pk = ModelMultipleChoiceFilter( 315 field_name="users", 316 queryset=User.objects.all(), 317 distinct=False, 318 ) 319 320 def filter_attributes(self, queryset, name, value): 321 """Filter attributes by query args""" 322 try: 323 value = loads(value) 324 except ValueError: 325 raise ValidationError(detail="filter: failed to parse JSON") from None 326 if not isinstance(value, dict): 327 raise ValidationError(detail="filter: value must be key:value mapping") 328 qs = {} 329 for key, _value in value.items(): 330 qs[f"attributes__{key}"] = _value 331 try: 332 _ = len(queryset.filter(**qs)) 333 return queryset.filter(**qs) 334 except ValueError: 335 return queryset 336 337 class Meta: 338 model = Group 339 fields = ["name", "is_superuser", "members_by_pk", "attributes", "members_by_username"]
Filter for groups
320 def filter_attributes(self, queryset, name, value): 321 """Filter attributes by query args""" 322 try: 323 value = loads(value) 324 except ValueError: 325 raise ValidationError(detail="filter: failed to parse JSON") from None 326 if not isinstance(value, dict): 327 raise ValidationError(detail="filter: value must be key:value mapping") 328 qs = {} 329 for key, _value in value.items(): 330 qs[f"attributes__{key}"] = _value 331 try: 332 _ = len(queryset.filter(**qs)) 333 return queryset.filter(**qs) 334 except ValueError: 335 return queryset
Filter attributes by query args
337 class Meta: 338 model = Group 339 fields = ["name", "is_superuser", "members_by_pk", "attributes", "members_by_username"]
342class GroupViewSet(UsedByMixin, ModelViewSet): 343 """Group Viewset""" 344 345 class UserAccountSerializer(PassiveSerializer): 346 """Account adding/removing operations""" 347 348 pk = IntegerField(required=True) 349 350 queryset = Group.objects.none() 351 serializer_class = GroupSerializer 352 search_fields = ["name", "is_superuser"] 353 filterset_class = GroupFilter 354 ordering = ["name"] 355 authentication_classes = [ 356 TokenAuthentication, 357 SessionAuthentication, 358 AgentAuth, 359 ] 360 361 def get_ql_fields(self): 362 return [ 363 StrField(Group, "name"), 364 BoolField(Group, "is_superuser", nullable=True), 365 JSONSearchField(Group, "attributes"), 366 ] 367 368 def get_queryset(self): 369 # Always prefetch parents and children since their PKs are always serialized 370 base_qs = Group.objects.all().prefetch_related("roles", "parents", "children") 371 372 if self.serializer_class(context={"request": self.request})._should_include_users: 373 # Only fetch fields needed by PartialUserSerializer to reduce DB load and instantiation 374 # time 375 base_qs = base_qs.prefetch_related( 376 Prefetch( 377 "users", 378 queryset=User.objects.all().only(*PARTIAL_USER_SERIALIZER_MODEL_FIELDS), 379 ) 380 ) 381 # When include_users=false, skip users prefetch entirely. 382 # BulkManyRelatedField.to_representation will use values_list to get PKs 383 # directly without loading User instances into memory. 384 385 return base_qs 386 387 @extend_schema( 388 parameters=[ 389 OpenApiParameter("include_users", bool, default=True), 390 OpenApiParameter("include_children", bool, default=False), 391 OpenApiParameter("include_parents", bool, default=False), 392 OpenApiParameter("include_inherited_roles", bool, default=False), 393 ] 394 ) 395 def list(self, request, *args, **kwargs): 396 return super().list(request, *args, **kwargs) 397 398 @extend_schema( 399 parameters=[ 400 OpenApiParameter("include_users", bool, default=True), 401 OpenApiParameter("include_children", bool, default=False), 402 OpenApiParameter("include_parents", bool, default=False), 403 OpenApiParameter("include_inherited_roles", bool, default=False), 404 ] 405 ) 406 def retrieve(self, request, *args, **kwargs): 407 return super().retrieve(request, *args, **kwargs) 408 409 @permission_required("authentik_core.add_user_to_group") 410 @extend_schema( 411 request=UserAccountSerializer, 412 responses={ 413 204: OpenApiResponse(description="User added"), 414 404: OpenApiResponse(description="User not found"), 415 }, 416 ) 417 @action( 418 detail=True, 419 methods=["POST"], 420 pagination_class=None, 421 filter_backends=[], 422 permission_classes=[IsAuthenticated], 423 ) 424 @validate(UserAccountSerializer) 425 def add_user(self, request: Request, body: UserAccountSerializer, pk: str) -> Response: 426 """Add user to group""" 427 group: Group = self.get_object() 428 user: User = ( 429 get_objects_for_user(request.user, "authentik_core.view_user") 430 .filter( 431 pk=body.validated_data.get("pk"), 432 ) 433 .first() 434 ) 435 if not user: 436 raise Http404 437 group.users.add(user) 438 return Response(status=204) 439 440 @permission_required("authentik_core.remove_user_from_group") 441 @extend_schema( 442 request=UserAccountSerializer, 443 responses={ 444 204: OpenApiResponse(description="User removed"), 445 404: OpenApiResponse(description="User not found"), 446 }, 447 ) 448 @action( 449 detail=True, 450 methods=["POST"], 451 pagination_class=None, 452 filter_backends=[], 453 permission_classes=[IsAuthenticated], 454 ) 455 @validate(UserAccountSerializer) 456 def remove_user(self, request: Request, body: UserAccountSerializer, pk: str) -> Response: 457 """Remove user from group""" 458 group: Group = self.get_object() 459 user: User = ( 460 get_objects_for_user(request.user, "authentik_core.view_user") 461 .filter( 462 pk=body.validated_data.get("pk"), 463 ) 464 .first() 465 ) 466 if not user: 467 raise Http404 468 group.users.remove(user) 469 return Response(status=204)
Group Viewset
368 def get_queryset(self): 369 # Always prefetch parents and children since their PKs are always serialized 370 base_qs = Group.objects.all().prefetch_related("roles", "parents", "children") 371 372 if self.serializer_class(context={"request": self.request})._should_include_users: 373 # Only fetch fields needed by PartialUserSerializer to reduce DB load and instantiation 374 # time 375 base_qs = base_qs.prefetch_related( 376 Prefetch( 377 "users", 378 queryset=User.objects.all().only(*PARTIAL_USER_SERIALIZER_MODEL_FIELDS), 379 ) 380 ) 381 # When include_users=false, skip users prefetch entirely. 382 # BulkManyRelatedField.to_representation will use values_list to get PKs 383 # directly without loading User instances into memory. 384 385 return base_qs
Get the list of items for this view.
This must be an iterable, and may be a queryset.
Defaults to using self.queryset.
This method should always be used rather than accessing self.queryset
directly, as self.queryset gets evaluated only once, and those results
are cached for all subsequent requests.
You may want to override this if you need to provide different querysets depending on the incoming request.
(Eg. return a list of items that is specific to the user)
387 @extend_schema( 388 parameters=[ 389 OpenApiParameter("include_users", bool, default=True), 390 OpenApiParameter("include_children", bool, default=False), 391 OpenApiParameter("include_parents", bool, default=False), 392 OpenApiParameter("include_inherited_roles", bool, default=False), 393 ] 394 ) 395 def list(self, request, *args, **kwargs): 396 return super().list(request, *args, **kwargs)
398 @extend_schema( 399 parameters=[ 400 OpenApiParameter("include_users", bool, default=True), 401 OpenApiParameter("include_children", bool, default=False), 402 OpenApiParameter("include_parents", bool, default=False), 403 OpenApiParameter("include_inherited_roles", bool, default=False), 404 ] 405 ) 406 def retrieve(self, request, *args, **kwargs): 407 return super().retrieve(request, *args, **kwargs)
409 @permission_required("authentik_core.add_user_to_group") 410 @extend_schema( 411 request=UserAccountSerializer, 412 responses={ 413 204: OpenApiResponse(description="User added"), 414 404: OpenApiResponse(description="User not found"), 415 }, 416 ) 417 @action( 418 detail=True, 419 methods=["POST"], 420 pagination_class=None, 421 filter_backends=[], 422 permission_classes=[IsAuthenticated], 423 ) 424 @validate(UserAccountSerializer) 425 def add_user(self, request: Request, body: UserAccountSerializer, pk: str) -> Response: 426 """Add user to group""" 427 group: Group = self.get_object() 428 user: User = ( 429 get_objects_for_user(request.user, "authentik_core.view_user") 430 .filter( 431 pk=body.validated_data.get("pk"), 432 ) 433 .first() 434 ) 435 if not user: 436 raise Http404 437 group.users.add(user) 438 return Response(status=204)
Add user to group
440 @permission_required("authentik_core.remove_user_from_group") 441 @extend_schema( 442 request=UserAccountSerializer, 443 responses={ 444 204: OpenApiResponse(description="User removed"), 445 404: OpenApiResponse(description="User not found"), 446 }, 447 ) 448 @action( 449 detail=True, 450 methods=["POST"], 451 pagination_class=None, 452 filter_backends=[], 453 permission_classes=[IsAuthenticated], 454 ) 455 @validate(UserAccountSerializer) 456 def remove_user(self, request: Request, body: UserAccountSerializer, pk: str) -> Response: 457 """Remove user from group""" 458 group: Group = self.get_object() 459 user: User = ( 460 get_objects_for_user(request.user, "authentik_core.view_user") 461 .filter( 462 pk=body.validated_data.get("pk"), 463 ) 464 .first() 465 ) 466 if not user: 467 raise Http404 468 group.users.remove(user) 469 return Response(status=204)
Remove user from group
Inherited Members
345 class UserAccountSerializer(PassiveSerializer): 346 """Account adding/removing operations""" 347 348 pk = IntegerField(required=True)
Account adding/removing operations