authentik.core.api.groups

Groups API Viewset

  1"""Groups API Viewset"""
  2
  3from json import loads
  4
  5from django.db.models import Prefetch
  6from django.http import Http404
  7from django.utils.translation import gettext as _
  8from django_filters.filters import CharFilter, ModelMultipleChoiceFilter
  9from django_filters.filterset import FilterSet
 10from djangoql.schema import BoolField, StrField
 11from drf_spectacular.utils import (
 12    OpenApiParameter,
 13    OpenApiResponse,
 14    extend_schema,
 15    extend_schema_field,
 16)
 17from guardian.shortcuts import get_objects_for_user
 18from rest_framework.authentication import SessionAuthentication
 19from rest_framework.decorators import action
 20from rest_framework.fields import CharField, IntegerField, SerializerMethodField
 21from rest_framework.permissions import IsAuthenticated
 22from rest_framework.relations import ManyRelatedField, PrimaryKeyRelatedField
 23from rest_framework.request import Request
 24from rest_framework.response import Response
 25from rest_framework.serializers import ListSerializer, ValidationError
 26from rest_framework.viewsets import ModelViewSet
 27
 28from authentik.api.authentication import TokenAuthentication
 29from authentik.api.search.fields import (
 30    JSONSearchField,
 31)
 32from authentik.api.validation import validate
 33from authentik.core.api.used_by import UsedByMixin
 34from authentik.core.api.utils import JSONDictField, ModelSerializer, PassiveSerializer
 35from authentik.core.models import Group, User
 36from authentik.endpoints.connectors.agent.auth import AgentAuth
 37from authentik.rbac.api.roles import RoleSerializer
 38from authentik.rbac.decorators import permission_required
 39
 40
 41class BulkManyRelatedField(ManyRelatedField):
 42    """ManyRelatedField that validates all PKs in a single query instead of one per PK."""
 43
 44    def to_internal_value(self, data):
 45        if isinstance(data, str) or not hasattr(data, "__iter__"):
 46            self.fail("not_a_list", input_type=type(data).__name__)
 47        if not self.allow_empty and len(data) == 0:
 48            self.fail("empty")
 49
 50        child = self.child_relation
 51        pk_field = child.pk_field
 52        # Coerce PKs through pk_field if defined
 53        pk_map = {}
 54        for item in data:
 55            if isinstance(item, bool):
 56                self.fail("incorrect_type", data_type=type(item).__name__)
 57            pk = pk_field.to_internal_value(item) if pk_field else item
 58            pk_map[pk] = item  # map coerced PK -> original value for error reporting
 59
 60        queryset = child.get_queryset()
 61        # Use count to validate all PKs exist in a single query
 62        found_count = queryset.filter(pk__in=pk_map.keys()).count()
 63        if found_count < len(pk_map):
 64            # Some PKs not found — fall back to per-PK checks for error reporting.
 65            # This only runs when there's an actual validation error (rare path).
 66            for pk, original in pk_map.items():
 67                if not queryset.filter(pk=pk).exists():
 68                    child.fail("does_not_exist", pk_value=original)
 69
 70        # Return raw PKs — Django's M2M set() accepts both objects and PKs,
 71        # using get_prep_value() for type coercion. This avoids loading all
 72        # objects into memory and avoids triggering post_init signals.
 73        return list(pk_map.keys())
 74
 75    def to_representation(self, iterable):
 76        # For non-prefetched querysets, get PKs directly without loading model instances.
 77        # When prefetched, _result_cache is a list (possibly empty); when not, it's None.
 78        if hasattr(iterable, "values_list") and getattr(iterable, "_result_cache", None) is None:
 79            return list(iterable.values_list("pk", flat=True))
 80        return super().to_representation(iterable)
 81
 82
 83class BulkPrimaryKeyRelatedField(PrimaryKeyRelatedField):
 84    """PrimaryKeyRelatedField that uses bulk validation when many=True."""
 85
 86    @classmethod
 87    def many_init(cls, *args, **kwargs):
 88        allow_empty = kwargs.pop("allow_empty", None)
 89        max_length = kwargs.pop("max_length", None)
 90        min_length = kwargs.pop("min_length", None)
 91        child_relation = cls(*args, **kwargs)
 92        list_kwargs = {
 93            "child_relation": child_relation,
 94        }
 95        if allow_empty is not None:
 96            list_kwargs["allow_empty"] = allow_empty
 97        if max_length is not None:
 98            list_kwargs["max_length"] = max_length
 99        if min_length is not None:
100            list_kwargs["min_length"] = min_length
101        list_kwargs.update(
102            {
103                key: value
104                for key, value in kwargs.items()
105                if key in ("required", "default", "source")
106            }
107        )
108        return BulkManyRelatedField(**list_kwargs)
109
110
111PARTIAL_USER_SERIALIZER_MODEL_FIELDS = [
112    "pk",
113    "username",
114    "name",
115    "is_active",
116    "last_login",
117    "email",
118    "attributes",
119]
120
121
122class PartialUserSerializer(ModelSerializer):
123    """Partial User Serializer, does not include child relations."""
124
125    attributes = JSONDictField(required=False)
126    uid = CharField(read_only=True)
127
128    class Meta:
129        model = User
130        fields = PARTIAL_USER_SERIALIZER_MODEL_FIELDS + ["uid"]
131
132
133class RelatedGroupSerializer(ModelSerializer):
134    """Stripped down group serializer to show relevant children/parents for groups"""
135
136    attributes = JSONDictField(required=False)
137
138    class Meta:
139        model = Group
140        fields = [
141            "pk",
142            "name",
143            "is_superuser",
144            "attributes",
145            "group_uuid",
146        ]
147
148
149class GroupSerializer(ModelSerializer):
150    """Group Serializer"""
151
152    attributes = JSONDictField(required=False)
153    users = BulkPrimaryKeyRelatedField(queryset=User.objects.all(), many=True, default=list)
154    parents = PrimaryKeyRelatedField(queryset=Group.objects.all(), many=True, required=False)
155    parents_obj = SerializerMethodField(allow_null=True)
156    children_obj = SerializerMethodField(allow_null=True)
157    users_obj = SerializerMethodField(allow_null=True)
158    roles_obj = ListSerializer(
159        child=RoleSerializer(),
160        read_only=True,
161        source="roles",
162        required=False,
163    )
164    inherited_roles_obj = SerializerMethodField(allow_null=True)
165    num_pk = IntegerField(read_only=True)
166
167    @property
168    def _should_include_users(self) -> bool:
169        request: Request = self.context.get("request", None)
170        if not request:
171            return True
172        return str(request.query_params.get("include_users", "true")).lower() == "true"
173
174    @property
175    def _should_include_children(self) -> bool:
176        request: Request = self.context.get("request", None)
177        if not request:
178            return True
179        return str(request.query_params.get("include_children", "false")).lower() == "true"
180
181    @property
182    def _should_include_parents(self) -> bool:
183        request: Request = self.context.get("request", None)
184        if not request:
185            return True
186        return str(request.query_params.get("include_parents", "false")).lower() == "true"
187
188    @property
189    def _should_include_inherited_roles(self) -> bool:
190        request: Request = self.context.get("request", None)
191        if not request:
192            return True
193        return str(request.query_params.get("include_inherited_roles", "false")).lower() == "true"
194
195    @extend_schema_field(PartialUserSerializer(many=True))
196    def get_users_obj(self, instance: Group) -> list[PartialUserSerializer] | None:
197        if not self._should_include_users:
198            return None
199        return PartialUserSerializer(instance.users, many=True).data
200
201    @extend_schema_field(RelatedGroupSerializer(many=True))
202    def get_children_obj(self, instance: Group) -> list[RelatedGroupSerializer] | None:
203        if not self._should_include_children:
204            return None
205        return RelatedGroupSerializer(instance.children, many=True).data
206
207    @extend_schema_field(RelatedGroupSerializer(many=True))
208    def get_parents_obj(self, instance: Group) -> list[RelatedGroupSerializer] | None:
209        if not self._should_include_parents:
210            return None
211        return RelatedGroupSerializer(instance.parents, many=True).data
212
213    @extend_schema_field(RoleSerializer(many=True))
214    def get_inherited_roles_obj(self, instance: Group) -> list | None:
215        """Return only inherited roles from ancestor groups (excludes direct roles)"""
216        if not self._should_include_inherited_roles:
217            return None
218        direct_role_pks = instance.roles.values_list("pk", flat=True)
219        inherited_roles = instance.all_roles().exclude(pk__in=direct_role_pks)
220        return RoleSerializer(inherited_roles, many=True).data
221
222    def validate_is_superuser(self, superuser: bool):
223        """Ensure that the user creating this group has permissions to set the superuser flag"""
224        request: Request = self.context.get("request", None)
225        if not request:
226            return superuser
227        # If we're updating an instance, and the state hasn't changed, we don't need to check perms
228        if self.instance and superuser == self.instance.is_superuser:
229            return superuser
230        user: User = request.user
231        perm = (
232            "authentik_core.enable_group_superuser"
233            if superuser
234            else "authentik_core.disable_group_superuser"
235        )
236        if self.instance or superuser:
237            has_perm = user.has_perm(perm) or user.has_perm(perm, self.instance)
238            if not has_perm:
239                raise ValidationError(
240                    _(
241                        (
242                            "User does not have permission to set "
243                            "superuser status to {superuser_status}."
244                        ).format_map({"superuser_status": superuser})
245                    )
246                )
247        return superuser
248
249    def validate_users(self, users: list) -> list:
250        """Require add_user_to_group permission when adding new members via group PATCH."""
251        request: Request = self.context.get("request", None)
252        if not request:
253            return users
254        if not self.instance:
255            return users
256        # BulkManyRelatedField returns raw PKs, not model instances
257        current_user_pks = set(self.instance.users.values_list("pk", flat=True))
258        new_users = [u for u in users if u not in current_user_pks]
259        if not new_users:
260            return users
261        has_perm = request.user.has_perm(
262            "authentik_core.add_user_to_group"
263        ) or request.user.has_perm("authentik_core.add_user_to_group", self.instance)
264        if not has_perm:
265            raise ValidationError(_("User does not have permission to add members to this group."))
266        return users
267
268    class Meta:
269        model = Group
270        fields = [
271            "pk",
272            "num_pk",
273            "name",
274            "is_superuser",
275            "parents",
276            "parents_obj",
277            "users",
278            "users_obj",
279            "attributes",
280            "roles",
281            "roles_obj",
282            "inherited_roles_obj",
283            "children",
284            "children_obj",
285        ]
286        extra_kwargs = {
287            "children": {
288                "required": False,
289                "default": list,
290            },
291            "parents": {
292                "required": False,
293                "default": list,
294            },
295        }
296
297
298class GroupFilter(FilterSet):
299    """Filter for groups"""
300
301    attributes = CharFilter(
302        field_name="attributes",
303        lookup_expr="",
304        label="Attributes",
305        method="filter_attributes",
306    )
307
308    members_by_username = ModelMultipleChoiceFilter(
309        field_name="users__username",
310        to_field_name="username",
311        queryset=User.objects.all(),
312    )
313    members_by_pk = ModelMultipleChoiceFilter(
314        field_name="users",
315        queryset=User.objects.all(),
316        distinct=False,
317    )
318
319    def filter_attributes(self, queryset, name, value):
320        """Filter attributes by query args"""
321        try:
322            value = loads(value)
323        except ValueError:
324            raise ValidationError(detail="filter: failed to parse JSON") from None
325        if not isinstance(value, dict):
326            raise ValidationError(detail="filter: value must be key:value mapping")
327        qs = {}
328        for key, _value in value.items():
329            qs[f"attributes__{key}"] = _value
330        try:
331            _ = len(queryset.filter(**qs))
332            return queryset.filter(**qs)
333        except ValueError:
334            return queryset
335
336    class Meta:
337        model = Group
338        fields = ["name", "is_superuser", "members_by_pk", "attributes", "members_by_username"]
339
340
341class GroupViewSet(UsedByMixin, ModelViewSet):
342    """Group Viewset"""
343
344    class UserAccountSerializer(PassiveSerializer):
345        """Account adding/removing operations"""
346
347        pk = IntegerField(required=True)
348
349    queryset = Group.objects.none()
350    serializer_class = GroupSerializer
351    search_fields = ["name", "is_superuser"]
352    filterset_class = GroupFilter
353    ordering = ["name"]
354    authentication_classes = [
355        TokenAuthentication,
356        SessionAuthentication,
357        AgentAuth,
358    ]
359
360    def get_ql_fields(self):
361        return [
362            StrField(Group, "name"),
363            BoolField(Group, "is_superuser", nullable=True),
364            JSONSearchField(Group, "attributes"),
365        ]
366
367    def get_queryset(self):
368        # Always prefetch parents and children since their PKs are always serialized
369        base_qs = Group.objects.all().prefetch_related("roles", "parents", "children")
370
371        if self.serializer_class(context={"request": self.request})._should_include_users:
372            # Only fetch fields needed by PartialUserSerializer to reduce DB load and instantiation
373            # time
374            base_qs = base_qs.prefetch_related(
375                Prefetch(
376                    "users",
377                    queryset=User.objects.all().only(*PARTIAL_USER_SERIALIZER_MODEL_FIELDS),
378                )
379            )
380        # When include_users=false, skip users prefetch entirely.
381        # BulkManyRelatedField.to_representation will use values_list to get PKs
382        # directly without loading User instances into memory.
383
384        return base_qs
385
386    @extend_schema(
387        parameters=[
388            OpenApiParameter("include_users", bool, default=True),
389            OpenApiParameter("include_children", bool, default=False),
390            OpenApiParameter("include_parents", bool, default=False),
391            OpenApiParameter("include_inherited_roles", bool, default=False),
392        ]
393    )
394    def list(self, request, *args, **kwargs):
395        return super().list(request, *args, **kwargs)
396
397    @extend_schema(
398        parameters=[
399            OpenApiParameter("include_users", bool, default=True),
400            OpenApiParameter("include_children", bool, default=False),
401            OpenApiParameter("include_parents", bool, default=False),
402            OpenApiParameter("include_inherited_roles", bool, default=False),
403        ]
404    )
405    def retrieve(self, request, *args, **kwargs):
406        return super().retrieve(request, *args, **kwargs)
407
408    @permission_required("authentik_core.add_user_to_group")
409    @extend_schema(
410        request=UserAccountSerializer,
411        responses={
412            204: OpenApiResponse(description="User added"),
413            404: OpenApiResponse(description="User not found"),
414        },
415    )
416    @action(
417        detail=True,
418        methods=["POST"],
419        pagination_class=None,
420        filter_backends=[],
421        permission_classes=[IsAuthenticated],
422    )
423    @validate(UserAccountSerializer)
424    def add_user(self, request: Request, body: UserAccountSerializer, pk: str) -> Response:
425        """Add user to group"""
426        group: Group = self.get_object()
427        user: User = (
428            get_objects_for_user(request.user, "authentik_core.view_user")
429            .filter(
430                pk=body.validated_data.get("pk"),
431            )
432            .first()
433        )
434        if not user:
435            raise Http404
436        group.users.add(user)
437        return Response(status=204)
438
439    @permission_required("authentik_core.remove_user_from_group")
440    @extend_schema(
441        request=UserAccountSerializer,
442        responses={
443            204: OpenApiResponse(description="User removed"),
444            404: OpenApiResponse(description="User not found"),
445        },
446    )
447    @action(
448        detail=True,
449        methods=["POST"],
450        pagination_class=None,
451        filter_backends=[],
452        permission_classes=[IsAuthenticated],
453    )
454    @validate(UserAccountSerializer)
455    def remove_user(self, request: Request, body: UserAccountSerializer, pk: str) -> Response:
456        """Remove user from group"""
457        group: Group = self.get_object()
458        user: User = (
459            get_objects_for_user(request.user, "authentik_core.view_user")
460            .filter(
461                pk=body.validated_data.get("pk"),
462            )
463            .first()
464        )
465        if not user:
466            raise Http404
467        group.users.remove(user)
468        return Response(status=204)
class BulkManyRelatedField(rest_framework.relations.ManyRelatedField):
42class BulkManyRelatedField(ManyRelatedField):
43    """ManyRelatedField that validates all PKs in a single query instead of one per PK."""
44
45    def to_internal_value(self, data):
46        if isinstance(data, str) or not hasattr(data, "__iter__"):
47            self.fail("not_a_list", input_type=type(data).__name__)
48        if not self.allow_empty and len(data) == 0:
49            self.fail("empty")
50
51        child = self.child_relation
52        pk_field = child.pk_field
53        # Coerce PKs through pk_field if defined
54        pk_map = {}
55        for item in data:
56            if isinstance(item, bool):
57                self.fail("incorrect_type", data_type=type(item).__name__)
58            pk = pk_field.to_internal_value(item) if pk_field else item
59            pk_map[pk] = item  # map coerced PK -> original value for error reporting
60
61        queryset = child.get_queryset()
62        # Use count to validate all PKs exist in a single query
63        found_count = queryset.filter(pk__in=pk_map.keys()).count()
64        if found_count < len(pk_map):
65            # Some PKs not found — fall back to per-PK checks for error reporting.
66            # This only runs when there's an actual validation error (rare path).
67            for pk, original in pk_map.items():
68                if not queryset.filter(pk=pk).exists():
69                    child.fail("does_not_exist", pk_value=original)
70
71        # Return raw PKs — Django's M2M set() accepts both objects and PKs,
72        # using get_prep_value() for type coercion. This avoids loading all
73        # objects into memory and avoids triggering post_init signals.
74        return list(pk_map.keys())
75
76    def to_representation(self, iterable):
77        # For non-prefetched querysets, get PKs directly without loading model instances.
78        # When prefetched, _result_cache is a list (possibly empty); when not, it's None.
79        if hasattr(iterable, "values_list") and getattr(iterable, "_result_cache", None) is None:
80            return list(iterable.values_list("pk", flat=True))
81        return super().to_representation(iterable)

ManyRelatedField that validates all PKs in a single query instead of one per PK.

def to_internal_value(self, data):
45    def to_internal_value(self, data):
46        if isinstance(data, str) or not hasattr(data, "__iter__"):
47            self.fail("not_a_list", input_type=type(data).__name__)
48        if not self.allow_empty and len(data) == 0:
49            self.fail("empty")
50
51        child = self.child_relation
52        pk_field = child.pk_field
53        # Coerce PKs through pk_field if defined
54        pk_map = {}
55        for item in data:
56            if isinstance(item, bool):
57                self.fail("incorrect_type", data_type=type(item).__name__)
58            pk = pk_field.to_internal_value(item) if pk_field else item
59            pk_map[pk] = item  # map coerced PK -> original value for error reporting
60
61        queryset = child.get_queryset()
62        # Use count to validate all PKs exist in a single query
63        found_count = queryset.filter(pk__in=pk_map.keys()).count()
64        if found_count < len(pk_map):
65            # Some PKs not found — fall back to per-PK checks for error reporting.
66            # This only runs when there's an actual validation error (rare path).
67            for pk, original in pk_map.items():
68                if not queryset.filter(pk=pk).exists():
69                    child.fail("does_not_exist", pk_value=original)
70
71        # Return raw PKs — Django's M2M set() accepts both objects and PKs,
72        # using get_prep_value() for type coercion. This avoids loading all
73        # objects into memory and avoids triggering post_init signals.
74        return list(pk_map.keys())

Transform the incoming primitive data into a native value.

def to_representation(self, iterable):
76    def to_representation(self, iterable):
77        # For non-prefetched querysets, get PKs directly without loading model instances.
78        # When prefetched, _result_cache is a list (possibly empty); when not, it's None.
79        if hasattr(iterable, "values_list") and getattr(iterable, "_result_cache", None) is None:
80            return list(iterable.values_list("pk", flat=True))
81        return super().to_representation(iterable)

Transform the outgoing native value into primitive data.

class BulkPrimaryKeyRelatedField(rest_framework.relations.PrimaryKeyRelatedField):
 84class BulkPrimaryKeyRelatedField(PrimaryKeyRelatedField):
 85    """PrimaryKeyRelatedField that uses bulk validation when many=True."""
 86
 87    @classmethod
 88    def many_init(cls, *args, **kwargs):
 89        allow_empty = kwargs.pop("allow_empty", None)
 90        max_length = kwargs.pop("max_length", None)
 91        min_length = kwargs.pop("min_length", None)
 92        child_relation = cls(*args, **kwargs)
 93        list_kwargs = {
 94            "child_relation": child_relation,
 95        }
 96        if allow_empty is not None:
 97            list_kwargs["allow_empty"] = allow_empty
 98        if max_length is not None:
 99            list_kwargs["max_length"] = max_length
100        if min_length is not None:
101            list_kwargs["min_length"] = min_length
102        list_kwargs.update(
103            {
104                key: value
105                for key, value in kwargs.items()
106                if key in ("required", "default", "source")
107            }
108        )
109        return BulkManyRelatedField(**list_kwargs)

PrimaryKeyRelatedField that uses bulk validation when many=True.

@classmethod
def many_init(cls, *args, **kwargs):
 87    @classmethod
 88    def many_init(cls, *args, **kwargs):
 89        allow_empty = kwargs.pop("allow_empty", None)
 90        max_length = kwargs.pop("max_length", None)
 91        min_length = kwargs.pop("min_length", None)
 92        child_relation = cls(*args, **kwargs)
 93        list_kwargs = {
 94            "child_relation": child_relation,
 95        }
 96        if allow_empty is not None:
 97            list_kwargs["allow_empty"] = allow_empty
 98        if max_length is not None:
 99            list_kwargs["max_length"] = max_length
100        if min_length is not None:
101            list_kwargs["min_length"] = min_length
102        list_kwargs.update(
103            {
104                key: value
105                for key, value in kwargs.items()
106                if key in ("required", "default", "source")
107            }
108        )
109        return BulkManyRelatedField(**list_kwargs)

This method handles creating a parent ManyRelatedField instance when the many=True keyword argument is passed.

Typically you won't need to override this method.

Note that we're over-cautious in passing most arguments to both parent and child classes in order to try to cover the general case. If you're overriding this method you'll probably want something much simpler, eg:

@classmethod def many_init(cls, *args, **kwargs): kwargs['child'] = cls() return CustomManyRelatedField(*args, **kwargs)

PARTIAL_USER_SERIALIZER_MODEL_FIELDS = ['pk', 'username', 'name', 'is_active', 'last_login', 'email', 'attributes']
class PartialUserSerializer(authentik.core.api.utils.ModelSerializer):
123class PartialUserSerializer(ModelSerializer):
124    """Partial User Serializer, does not include child relations."""
125
126    attributes = JSONDictField(required=False)
127    uid = CharField(read_only=True)
128
129    class Meta:
130        model = User
131        fields = PARTIAL_USER_SERIALIZER_MODEL_FIELDS + ["uid"]

Partial User Serializer, does not include child relations.

attributes
uid
class PartialUserSerializer.Meta:
129    class Meta:
130        model = User
131        fields = PARTIAL_USER_SERIALIZER_MODEL_FIELDS + ["uid"]
model = <class 'authentik.core.models.User'>
fields = ['pk', 'username', 'name', 'is_active', 'last_login', 'email', 'attributes', 'uid']
class RelatedGroupSerializer(authentik.core.api.utils.ModelSerializer):
134class RelatedGroupSerializer(ModelSerializer):
135    """Stripped down group serializer to show relevant children/parents for groups"""
136
137    attributes = JSONDictField(required=False)
138
139    class Meta:
140        model = Group
141        fields = [
142            "pk",
143            "name",
144            "is_superuser",
145            "attributes",
146            "group_uuid",
147        ]

Stripped down group serializer to show relevant children/parents for groups

attributes
class RelatedGroupSerializer.Meta:
139    class Meta:
140        model = Group
141        fields = [
142            "pk",
143            "name",
144            "is_superuser",
145            "attributes",
146            "group_uuid",
147        ]
model = <class 'authentik.core.models.Group'>
fields = ['pk', 'name', 'is_superuser', 'attributes', 'group_uuid']
class GroupSerializer(authentik.core.api.utils.ModelSerializer):
150class GroupSerializer(ModelSerializer):
151    """Group Serializer"""
152
153    attributes = JSONDictField(required=False)
154    users = BulkPrimaryKeyRelatedField(queryset=User.objects.all(), many=True, default=list)
155    parents = PrimaryKeyRelatedField(queryset=Group.objects.all(), many=True, required=False)
156    parents_obj = SerializerMethodField(allow_null=True)
157    children_obj = SerializerMethodField(allow_null=True)
158    users_obj = SerializerMethodField(allow_null=True)
159    roles_obj = ListSerializer(
160        child=RoleSerializer(),
161        read_only=True,
162        source="roles",
163        required=False,
164    )
165    inherited_roles_obj = SerializerMethodField(allow_null=True)
166    num_pk = IntegerField(read_only=True)
167
168    @property
169    def _should_include_users(self) -> bool:
170        request: Request = self.context.get("request", None)
171        if not request:
172            return True
173        return str(request.query_params.get("include_users", "true")).lower() == "true"
174
175    @property
176    def _should_include_children(self) -> bool:
177        request: Request = self.context.get("request", None)
178        if not request:
179            return True
180        return str(request.query_params.get("include_children", "false")).lower() == "true"
181
182    @property
183    def _should_include_parents(self) -> bool:
184        request: Request = self.context.get("request", None)
185        if not request:
186            return True
187        return str(request.query_params.get("include_parents", "false")).lower() == "true"
188
189    @property
190    def _should_include_inherited_roles(self) -> bool:
191        request: Request = self.context.get("request", None)
192        if not request:
193            return True
194        return str(request.query_params.get("include_inherited_roles", "false")).lower() == "true"
195
196    @extend_schema_field(PartialUserSerializer(many=True))
197    def get_users_obj(self, instance: Group) -> list[PartialUserSerializer] | None:
198        if not self._should_include_users:
199            return None
200        return PartialUserSerializer(instance.users, many=True).data
201
202    @extend_schema_field(RelatedGroupSerializer(many=True))
203    def get_children_obj(self, instance: Group) -> list[RelatedGroupSerializer] | None:
204        if not self._should_include_children:
205            return None
206        return RelatedGroupSerializer(instance.children, many=True).data
207
208    @extend_schema_field(RelatedGroupSerializer(many=True))
209    def get_parents_obj(self, instance: Group) -> list[RelatedGroupSerializer] | None:
210        if not self._should_include_parents:
211            return None
212        return RelatedGroupSerializer(instance.parents, many=True).data
213
214    @extend_schema_field(RoleSerializer(many=True))
215    def get_inherited_roles_obj(self, instance: Group) -> list | None:
216        """Return only inherited roles from ancestor groups (excludes direct roles)"""
217        if not self._should_include_inherited_roles:
218            return None
219        direct_role_pks = instance.roles.values_list("pk", flat=True)
220        inherited_roles = instance.all_roles().exclude(pk__in=direct_role_pks)
221        return RoleSerializer(inherited_roles, many=True).data
222
223    def validate_is_superuser(self, superuser: bool):
224        """Ensure that the user creating this group has permissions to set the superuser flag"""
225        request: Request = self.context.get("request", None)
226        if not request:
227            return superuser
228        # If we're updating an instance, and the state hasn't changed, we don't need to check perms
229        if self.instance and superuser == self.instance.is_superuser:
230            return superuser
231        user: User = request.user
232        perm = (
233            "authentik_core.enable_group_superuser"
234            if superuser
235            else "authentik_core.disable_group_superuser"
236        )
237        if self.instance or superuser:
238            has_perm = user.has_perm(perm) or user.has_perm(perm, self.instance)
239            if not has_perm:
240                raise ValidationError(
241                    _(
242                        (
243                            "User does not have permission to set "
244                            "superuser status to {superuser_status}."
245                        ).format_map({"superuser_status": superuser})
246                    )
247                )
248        return superuser
249
250    def validate_users(self, users: list) -> list:
251        """Require add_user_to_group permission when adding new members via group PATCH."""
252        request: Request = self.context.get("request", None)
253        if not request:
254            return users
255        if not self.instance:
256            return users
257        # BulkManyRelatedField returns raw PKs, not model instances
258        current_user_pks = set(self.instance.users.values_list("pk", flat=True))
259        new_users = [u for u in users if u not in current_user_pks]
260        if not new_users:
261            return users
262        has_perm = request.user.has_perm(
263            "authentik_core.add_user_to_group"
264        ) or request.user.has_perm("authentik_core.add_user_to_group", self.instance)
265        if not has_perm:
266            raise ValidationError(_("User does not have permission to add members to this group."))
267        return users
268
269    class Meta:
270        model = Group
271        fields = [
272            "pk",
273            "num_pk",
274            "name",
275            "is_superuser",
276            "parents",
277            "parents_obj",
278            "users",
279            "users_obj",
280            "attributes",
281            "roles",
282            "roles_obj",
283            "inherited_roles_obj",
284            "children",
285            "children_obj",
286        ]
287        extra_kwargs = {
288            "children": {
289                "required": False,
290                "default": list,
291            },
292            "parents": {
293                "required": False,
294                "default": list,
295            },
296        }

Group Serializer

attributes
users
parents
parents_obj
children_obj
users_obj
roles_obj
inherited_roles_obj
num_pk
@extend_schema_field(PartialUserSerializer(many=True))
def get_users_obj( self, instance: authentik.core.models.Group) -> list[PartialUserSerializer] | None:
196    @extend_schema_field(PartialUserSerializer(many=True))
197    def get_users_obj(self, instance: Group) -> list[PartialUserSerializer] | None:
198        if not self._should_include_users:
199            return None
200        return PartialUserSerializer(instance.users, many=True).data
@extend_schema_field(RelatedGroupSerializer(many=True))
def get_children_obj( self, instance: authentik.core.models.Group) -> list[RelatedGroupSerializer] | None:
202    @extend_schema_field(RelatedGroupSerializer(many=True))
203    def get_children_obj(self, instance: Group) -> list[RelatedGroupSerializer] | None:
204        if not self._should_include_children:
205            return None
206        return RelatedGroupSerializer(instance.children, many=True).data
@extend_schema_field(RelatedGroupSerializer(many=True))
def get_parents_obj( self, instance: authentik.core.models.Group) -> list[RelatedGroupSerializer] | None:
208    @extend_schema_field(RelatedGroupSerializer(many=True))
209    def get_parents_obj(self, instance: Group) -> list[RelatedGroupSerializer] | None:
210        if not self._should_include_parents:
211            return None
212        return RelatedGroupSerializer(instance.parents, many=True).data
@extend_schema_field(RoleSerializer(many=True))
def get_inherited_roles_obj(self, instance: authentik.core.models.Group) -> list | None:
214    @extend_schema_field(RoleSerializer(many=True))
215    def get_inherited_roles_obj(self, instance: Group) -> list | None:
216        """Return only inherited roles from ancestor groups (excludes direct roles)"""
217        if not self._should_include_inherited_roles:
218            return None
219        direct_role_pks = instance.roles.values_list("pk", flat=True)
220        inherited_roles = instance.all_roles().exclude(pk__in=direct_role_pks)
221        return RoleSerializer(inherited_roles, many=True).data

Return only inherited roles from ancestor groups (excludes direct roles)

def validate_is_superuser(self, superuser: bool):
223    def validate_is_superuser(self, superuser: bool):
224        """Ensure that the user creating this group has permissions to set the superuser flag"""
225        request: Request = self.context.get("request", None)
226        if not request:
227            return superuser
228        # If we're updating an instance, and the state hasn't changed, we don't need to check perms
229        if self.instance and superuser == self.instance.is_superuser:
230            return superuser
231        user: User = request.user
232        perm = (
233            "authentik_core.enable_group_superuser"
234            if superuser
235            else "authentik_core.disable_group_superuser"
236        )
237        if self.instance or superuser:
238            has_perm = user.has_perm(perm) or user.has_perm(perm, self.instance)
239            if not has_perm:
240                raise ValidationError(
241                    _(
242                        (
243                            "User does not have permission to set "
244                            "superuser status to {superuser_status}."
245                        ).format_map({"superuser_status": superuser})
246                    )
247                )
248        return superuser

Ensure that the user creating this group has permissions to set the superuser flag

def validate_users(self, users: list) -> list:
250    def validate_users(self, users: list) -> list:
251        """Require add_user_to_group permission when adding new members via group PATCH."""
252        request: Request = self.context.get("request", None)
253        if not request:
254            return users
255        if not self.instance:
256            return users
257        # BulkManyRelatedField returns raw PKs, not model instances
258        current_user_pks = set(self.instance.users.values_list("pk", flat=True))
259        new_users = [u for u in users if u not in current_user_pks]
260        if not new_users:
261            return users
262        has_perm = request.user.has_perm(
263            "authentik_core.add_user_to_group"
264        ) or request.user.has_perm("authentik_core.add_user_to_group", self.instance)
265        if not has_perm:
266            raise ValidationError(_("User does not have permission to add members to this group."))
267        return users

Require add_user_to_group permission when adding new members via group PATCH.

class GroupSerializer.Meta:
269    class Meta:
270        model = Group
271        fields = [
272            "pk",
273            "num_pk",
274            "name",
275            "is_superuser",
276            "parents",
277            "parents_obj",
278            "users",
279            "users_obj",
280            "attributes",
281            "roles",
282            "roles_obj",
283            "inherited_roles_obj",
284            "children",
285            "children_obj",
286        ]
287        extra_kwargs = {
288            "children": {
289                "required": False,
290                "default": list,
291            },
292            "parents": {
293                "required": False,
294                "default": list,
295            },
296        }
model = <class 'authentik.core.models.Group'>
fields = ['pk', 'num_pk', 'name', 'is_superuser', 'parents', 'parents_obj', 'users', 'users_obj', 'attributes', 'roles', 'roles_obj', 'inherited_roles_obj', 'children', 'children_obj']
extra_kwargs = {'children': {'required': False, 'default': <class 'list'>}, 'parents': {'required': False, 'default': <class 'list'>}}
class GroupFilter(django_filters.filterset.FilterSet):
299class GroupFilter(FilterSet):
300    """Filter for groups"""
301
302    attributes = CharFilter(
303        field_name="attributes",
304        lookup_expr="",
305        label="Attributes",
306        method="filter_attributes",
307    )
308
309    members_by_username = ModelMultipleChoiceFilter(
310        field_name="users__username",
311        to_field_name="username",
312        queryset=User.objects.all(),
313    )
314    members_by_pk = ModelMultipleChoiceFilter(
315        field_name="users",
316        queryset=User.objects.all(),
317        distinct=False,
318    )
319
320    def filter_attributes(self, queryset, name, value):
321        """Filter attributes by query args"""
322        try:
323            value = loads(value)
324        except ValueError:
325            raise ValidationError(detail="filter: failed to parse JSON") from None
326        if not isinstance(value, dict):
327            raise ValidationError(detail="filter: value must be key:value mapping")
328        qs = {}
329        for key, _value in value.items():
330            qs[f"attributes__{key}"] = _value
331        try:
332            _ = len(queryset.filter(**qs))
333            return queryset.filter(**qs)
334        except ValueError:
335            return queryset
336
337    class Meta:
338        model = Group
339        fields = ["name", "is_superuser", "members_by_pk", "attributes", "members_by_username"]

Filter for groups

attributes
members_by_username
members_by_pk
def filter_attributes(self, queryset, name, value):
320    def filter_attributes(self, queryset, name, value):
321        """Filter attributes by query args"""
322        try:
323            value = loads(value)
324        except ValueError:
325            raise ValidationError(detail="filter: failed to parse JSON") from None
326        if not isinstance(value, dict):
327            raise ValidationError(detail="filter: value must be key:value mapping")
328        qs = {}
329        for key, _value in value.items():
330            qs[f"attributes__{key}"] = _value
331        try:
332            _ = len(queryset.filter(**qs))
333            return queryset.filter(**qs)
334        except ValueError:
335            return queryset

Filter attributes by query args

declared_filters = OrderedDict({'attributes': <django_filters.filters.CharFilter object>, 'members_by_username': <django_filters.filters.ModelMultipleChoiceFilter object>, 'members_by_pk': <django_filters.filters.ModelMultipleChoiceFilter object>})
base_filters = OrderedDict({'name': <django_filters.filters.CharFilter object>, 'is_superuser': <django_filters.filters.BooleanFilter object>, 'members_by_pk': <django_filters.filters.ModelMultipleChoiceFilter object>, 'attributes': <django_filters.filters.CharFilter object>, 'members_by_username': <django_filters.filters.ModelMultipleChoiceFilter object>})
class GroupFilter.Meta:
337    class Meta:
338        model = Group
339        fields = ["name", "is_superuser", "members_by_pk", "attributes", "members_by_username"]
model = <class 'authentik.core.models.Group'>
fields = ['name', 'is_superuser', 'members_by_pk', 'attributes', 'members_by_username']
class GroupViewSet(authentik.core.api.used_by.UsedByMixin, rest_framework.viewsets.ModelViewSet):
342class GroupViewSet(UsedByMixin, ModelViewSet):
343    """Group Viewset"""
344
345    class UserAccountSerializer(PassiveSerializer):
346        """Account adding/removing operations"""
347
348        pk = IntegerField(required=True)
349
350    queryset = Group.objects.none()
351    serializer_class = GroupSerializer
352    search_fields = ["name", "is_superuser"]
353    filterset_class = GroupFilter
354    ordering = ["name"]
355    authentication_classes = [
356        TokenAuthentication,
357        SessionAuthentication,
358        AgentAuth,
359    ]
360
361    def get_ql_fields(self):
362        return [
363            StrField(Group, "name"),
364            BoolField(Group, "is_superuser", nullable=True),
365            JSONSearchField(Group, "attributes"),
366        ]
367
368    def get_queryset(self):
369        # Always prefetch parents and children since their PKs are always serialized
370        base_qs = Group.objects.all().prefetch_related("roles", "parents", "children")
371
372        if self.serializer_class(context={"request": self.request})._should_include_users:
373            # Only fetch fields needed by PartialUserSerializer to reduce DB load and instantiation
374            # time
375            base_qs = base_qs.prefetch_related(
376                Prefetch(
377                    "users",
378                    queryset=User.objects.all().only(*PARTIAL_USER_SERIALIZER_MODEL_FIELDS),
379                )
380            )
381        # When include_users=false, skip users prefetch entirely.
382        # BulkManyRelatedField.to_representation will use values_list to get PKs
383        # directly without loading User instances into memory.
384
385        return base_qs
386
387    @extend_schema(
388        parameters=[
389            OpenApiParameter("include_users", bool, default=True),
390            OpenApiParameter("include_children", bool, default=False),
391            OpenApiParameter("include_parents", bool, default=False),
392            OpenApiParameter("include_inherited_roles", bool, default=False),
393        ]
394    )
395    def list(self, request, *args, **kwargs):
396        return super().list(request, *args, **kwargs)
397
398    @extend_schema(
399        parameters=[
400            OpenApiParameter("include_users", bool, default=True),
401            OpenApiParameter("include_children", bool, default=False),
402            OpenApiParameter("include_parents", bool, default=False),
403            OpenApiParameter("include_inherited_roles", bool, default=False),
404        ]
405    )
406    def retrieve(self, request, *args, **kwargs):
407        return super().retrieve(request, *args, **kwargs)
408
409    @permission_required("authentik_core.add_user_to_group")
410    @extend_schema(
411        request=UserAccountSerializer,
412        responses={
413            204: OpenApiResponse(description="User added"),
414            404: OpenApiResponse(description="User not found"),
415        },
416    )
417    @action(
418        detail=True,
419        methods=["POST"],
420        pagination_class=None,
421        filter_backends=[],
422        permission_classes=[IsAuthenticated],
423    )
424    @validate(UserAccountSerializer)
425    def add_user(self, request: Request, body: UserAccountSerializer, pk: str) -> Response:
426        """Add user to group"""
427        group: Group = self.get_object()
428        user: User = (
429            get_objects_for_user(request.user, "authentik_core.view_user")
430            .filter(
431                pk=body.validated_data.get("pk"),
432            )
433            .first()
434        )
435        if not user:
436            raise Http404
437        group.users.add(user)
438        return Response(status=204)
439
440    @permission_required("authentik_core.remove_user_from_group")
441    @extend_schema(
442        request=UserAccountSerializer,
443        responses={
444            204: OpenApiResponse(description="User removed"),
445            404: OpenApiResponse(description="User not found"),
446        },
447    )
448    @action(
449        detail=True,
450        methods=["POST"],
451        pagination_class=None,
452        filter_backends=[],
453        permission_classes=[IsAuthenticated],
454    )
455    @validate(UserAccountSerializer)
456    def remove_user(self, request: Request, body: UserAccountSerializer, pk: str) -> Response:
457        """Remove user from group"""
458        group: Group = self.get_object()
459        user: User = (
460            get_objects_for_user(request.user, "authentik_core.view_user")
461            .filter(
462                pk=body.validated_data.get("pk"),
463            )
464            .first()
465        )
466        if not user:
467            raise Http404
468        group.users.remove(user)
469        return Response(status=204)

Group Viewset

queryset = <GroupQuerySet []>
serializer_class = <class 'GroupSerializer'>
search_fields = ['name', 'is_superuser']
filterset_class = <class 'GroupFilter'>
ordering = ['name']
authentication_classes = [<class 'authentik.api.authentication.TokenAuthentication'>, <class 'rest_framework.authentication.SessionAuthentication'>, <class 'authentik.endpoints.connectors.agent.auth.AgentAuth'>]
def get_ql_fields(self):
361    def get_ql_fields(self):
362        return [
363            StrField(Group, "name"),
364            BoolField(Group, "is_superuser", nullable=True),
365            JSONSearchField(Group, "attributes"),
366        ]
def get_queryset(self):
368    def get_queryset(self):
369        # Always prefetch parents and children since their PKs are always serialized
370        base_qs = Group.objects.all().prefetch_related("roles", "parents", "children")
371
372        if self.serializer_class(context={"request": self.request})._should_include_users:
373            # Only fetch fields needed by PartialUserSerializer to reduce DB load and instantiation
374            # time
375            base_qs = base_qs.prefetch_related(
376                Prefetch(
377                    "users",
378                    queryset=User.objects.all().only(*PARTIAL_USER_SERIALIZER_MODEL_FIELDS),
379                )
380            )
381        # When include_users=false, skip users prefetch entirely.
382        # BulkManyRelatedField.to_representation will use values_list to get PKs
383        # directly without loading User instances into memory.
384
385        return base_qs

Get the list of items for this view. This must be an iterable, and may be a queryset. Defaults to using self.queryset.

This method should always be used rather than accessing self.queryset directly, as self.queryset gets evaluated only once, and those results are cached for all subsequent requests.

You may want to override this if you need to provide different querysets depending on the incoming request.

(Eg. return a list of items that is specific to the user)

@extend_schema(parameters=[OpenApiParameter('include_users', bool, default=True), OpenApiParameter('include_children', bool, default=False), OpenApiParameter('include_parents', bool, default=False), OpenApiParameter('include_inherited_roles', bool, default=False)])
def list(self, request, *args, **kwargs):
387    @extend_schema(
388        parameters=[
389            OpenApiParameter("include_users", bool, default=True),
390            OpenApiParameter("include_children", bool, default=False),
391            OpenApiParameter("include_parents", bool, default=False),
392            OpenApiParameter("include_inherited_roles", bool, default=False),
393        ]
394    )
395    def list(self, request, *args, **kwargs):
396        return super().list(request, *args, **kwargs)
@extend_schema(parameters=[OpenApiParameter('include_users', bool, default=True), OpenApiParameter('include_children', bool, default=False), OpenApiParameter('include_parents', bool, default=False), OpenApiParameter('include_inherited_roles', bool, default=False)])
def retrieve(self, request, *args, **kwargs):
398    @extend_schema(
399        parameters=[
400            OpenApiParameter("include_users", bool, default=True),
401            OpenApiParameter("include_children", bool, default=False),
402            OpenApiParameter("include_parents", bool, default=False),
403            OpenApiParameter("include_inherited_roles", bool, default=False),
404        ]
405    )
406    def retrieve(self, request, *args, **kwargs):
407        return super().retrieve(request, *args, **kwargs)
@permission_required('authentik_core.add_user_to_group')
@extend_schema(request=UserAccountSerializer, responses={204: OpenApiResponse(description='User added'), 404: OpenApiResponse(description='User not found')})
@action(detail=True, methods=['POST'], pagination_class=None, filter_backends=[], permission_classes=[IsAuthenticated])
@validate(UserAccountSerializer)
def add_user( self, request: rest_framework.request.Request, body: GroupViewSet.UserAccountSerializer, pk: str) -> rest_framework.response.Response:
409    @permission_required("authentik_core.add_user_to_group")
410    @extend_schema(
411        request=UserAccountSerializer,
412        responses={
413            204: OpenApiResponse(description="User added"),
414            404: OpenApiResponse(description="User not found"),
415        },
416    )
417    @action(
418        detail=True,
419        methods=["POST"],
420        pagination_class=None,
421        filter_backends=[],
422        permission_classes=[IsAuthenticated],
423    )
424    @validate(UserAccountSerializer)
425    def add_user(self, request: Request, body: UserAccountSerializer, pk: str) -> Response:
426        """Add user to group"""
427        group: Group = self.get_object()
428        user: User = (
429            get_objects_for_user(request.user, "authentik_core.view_user")
430            .filter(
431                pk=body.validated_data.get("pk"),
432            )
433            .first()
434        )
435        if not user:
436            raise Http404
437        group.users.add(user)
438        return Response(status=204)

Add user to group

@permission_required('authentik_core.remove_user_from_group')
@extend_schema(request=UserAccountSerializer, responses={204: OpenApiResponse(description='User removed'), 404: OpenApiResponse(description='User not found')})
@action(detail=True, methods=['POST'], pagination_class=None, filter_backends=[], permission_classes=[IsAuthenticated])
@validate(UserAccountSerializer)
def remove_user( self, request: rest_framework.request.Request, body: GroupViewSet.UserAccountSerializer, pk: str) -> rest_framework.response.Response:
440    @permission_required("authentik_core.remove_user_from_group")
441    @extend_schema(
442        request=UserAccountSerializer,
443        responses={
444            204: OpenApiResponse(description="User removed"),
445            404: OpenApiResponse(description="User not found"),
446        },
447    )
448    @action(
449        detail=True,
450        methods=["POST"],
451        pagination_class=None,
452        filter_backends=[],
453        permission_classes=[IsAuthenticated],
454    )
455    @validate(UserAccountSerializer)
456    def remove_user(self, request: Request, body: UserAccountSerializer, pk: str) -> Response:
457        """Remove user from group"""
458        group: Group = self.get_object()
459        user: User = (
460            get_objects_for_user(request.user, "authentik_core.view_user")
461            .filter(
462                pk=body.validated_data.get("pk"),
463            )
464            .first()
465        )
466        if not user:
467            raise Http404
468        group.users.remove(user)
469        return Response(status=204)

Remove user from group

name = None
description = None
suffix = None
detail = None
basename = None
class GroupViewSet.UserAccountSerializer(authentik.core.api.utils.PassiveSerializer):
345    class UserAccountSerializer(PassiveSerializer):
346        """Account adding/removing operations"""
347
348        pk = IntegerField(required=True)

Account adding/removing operations

pk