authentik.sources.scim.views.v2.users

SCIM User Views

  1"""SCIM User Views"""
  2
  3from uuid import uuid4
  4
  5from django.db.models import Q
  6from django.db.transaction import atomic
  7from django.http import QueryDict
  8from django.urls import reverse
  9from pydanticscim.user import EmailKind, Name
 10from rest_framework.exceptions import ValidationError
 11from rest_framework.request import Request
 12from rest_framework.response import Response
 13
 14from authentik.core.models import User
 15from authentik.providers.scim.clients.schema import SCIM_USER_SCHEMA, Email
 16from authentik.providers.scim.clients.schema import User as SCIMUserModel
 17from authentik.sources.scim.models import SCIMSourceUser
 18from authentik.sources.scim.patch.processor import SCIMPatchProcessor
 19from authentik.sources.scim.views.v2.base import SCIMObjectView
 20from authentik.sources.scim.views.v2.exceptions import SCIMConflictError, SCIMNotFoundError
 21
 22
 23class UsersView(SCIMObjectView):
 24    """SCIM User view"""
 25
 26    model = User
 27
 28    def user_to_scim(self, scim_user: SCIMSourceUser) -> dict:
 29        """Convert User to SCIM data"""
 30        payload = SCIMUserModel(
 31            schemas=[SCIM_USER_SCHEMA],
 32            id=str(scim_user.user.uuid),
 33            externalId=scim_user.external_id,
 34            userName=scim_user.user.username,
 35            name=Name(
 36                formatted=scim_user.user.name,
 37            ),
 38            displayName=scim_user.user.name,
 39            active=scim_user.user.is_active,
 40            emails=(
 41                [Email(value=scim_user.user.email, type=EmailKind.work, primary=True)]
 42                if scim_user.user.email
 43                else []
 44            ),
 45            meta={
 46                "resourceType": "User",
 47                "created": scim_user.user.date_joined,
 48                "lastModified": scim_user.last_update,
 49                "location": self.request.build_absolute_uri(
 50                    reverse(
 51                        "authentik_sources_scim:v2-users",
 52                        kwargs={
 53                            "source_slug": self.kwargs["source_slug"],
 54                            "user_id": str(scim_user.user.uuid),
 55                        },
 56                    )
 57                ),
 58            },
 59        )
 60        final_payload = payload.model_dump(mode="json", exclude_unset=True)
 61        final_payload.update(scim_user.attributes)
 62        return self.remove_excluded_attributes(
 63            SCIMUserModel.model_validate(final_payload).model_dump(mode="json", exclude_unset=True)
 64        )
 65
 66    def get(self, request: Request, user_id: str | None = None, **kwargs) -> Response:
 67        """List User handler"""
 68        if user_id:
 69            connection = (
 70                SCIMSourceUser.objects.filter(source=self.source, user__uuid=user_id)
 71                .select_related("user")
 72                .first()
 73            )
 74            if not connection:
 75                raise SCIMNotFoundError("User not found.")
 76            return Response(self.user_to_scim(connection))
 77        connections = (
 78            SCIMSourceUser.objects.filter(source=self.source).select_related("user").order_by("pk")
 79        )
 80        connections = connections.filter(self.filter_parse(request))
 81        page = self.paginate_query(connections)
 82        return Response(
 83            {
 84                "totalResults": page.paginator.count,
 85                "itemsPerPage": page.paginator.per_page,
 86                "startIndex": page.start_index(),
 87                "schemas": ["urn:ietf:params:scim:api:messages:2.0:ListResponse"],
 88                "Resources": [self.user_to_scim(connection) for connection in page],
 89            }
 90        )
 91
 92    @atomic
 93    def update_user(self, connection: SCIMSourceUser | None, data: QueryDict):
 94        """Partial update a user"""
 95        properties = self.build_object_properties(data)
 96
 97        if not properties.get("username"):
 98            raise ValidationError("Invalid user")
 99
100        user = connection.user if connection else User()
101        if _user := User.objects.filter(username=properties.get("username")).first():
102            user = _user
103        user.update_attributes(properties)
104
105        if not connection:
106            connection, _ = SCIMSourceUser.objects.update_or_create(
107                external_id=data.get("externalId") or str(uuid4()),
108                source=self.source,
109                user=user,
110                defaults={
111                    "attributes": data,
112                },
113            )
114        else:
115            connection.external_id = data.get("externalId", connection.external_id)
116            connection.attributes = data
117            connection.save()
118        return connection
119
120    def post(self, request: Request, **kwargs) -> Response:
121        """Create user handler"""
122        connection = SCIMSourceUser.objects.filter(
123            Q(
124                Q(user__uuid=request.data.get("id"))
125                | Q(user__username=request.data.get("userName"))
126            ),
127            source=self.source,
128        ).first()
129        if connection:
130            self.logger.debug("Found existing user")
131            raise SCIMConflictError("Group with ID exists already.")
132        connection = self.update_user(None, request.data)
133        return Response(self.user_to_scim(connection), status=201)
134
135    def patch(self, request: Request, user_id: str, **kwargs):
136        connection = SCIMSourceUser.objects.filter(source=self.source, user__uuid=user_id).first()
137        if not connection:
138            raise SCIMNotFoundError("User not found.")
139        patcher = SCIMPatchProcessor()
140        patched_data = patcher.apply_patches(
141            connection.attributes, request.data.get("Operations", [])
142        )
143        if patched_data != connection.attributes:
144            self.update_user(connection, patched_data)
145        return Response(self.user_to_scim(connection), status=200)
146
147    def put(self, request: Request, user_id: str, **kwargs) -> Response:
148        """Update user handler"""
149        connection = SCIMSourceUser.objects.filter(source=self.source, user__uuid=user_id).first()
150        if not connection:
151            raise SCIMNotFoundError("User not found.")
152        self.update_user(connection, request.data)
153        return Response(self.user_to_scim(connection), status=200)
154
155    @atomic
156    def delete(self, request: Request, user_id: str, **kwargs) -> Response:
157        """Delete user handler"""
158        connection = SCIMSourceUser.objects.filter(source=self.source, user__uuid=user_id).first()
159        if not connection:
160            raise SCIMNotFoundError("User not found.")
161        connection.user.delete()
162        connection.delete()
163        return Response(status=204)
 24class UsersView(SCIMObjectView):
 25    """SCIM User view"""
 26
 27    model = User
 28
 29    def user_to_scim(self, scim_user: SCIMSourceUser) -> dict:
 30        """Convert User to SCIM data"""
 31        payload = SCIMUserModel(
 32            schemas=[SCIM_USER_SCHEMA],
 33            id=str(scim_user.user.uuid),
 34            externalId=scim_user.external_id,
 35            userName=scim_user.user.username,
 36            name=Name(
 37                formatted=scim_user.user.name,
 38            ),
 39            displayName=scim_user.user.name,
 40            active=scim_user.user.is_active,
 41            emails=(
 42                [Email(value=scim_user.user.email, type=EmailKind.work, primary=True)]
 43                if scim_user.user.email
 44                else []
 45            ),
 46            meta={
 47                "resourceType": "User",
 48                "created": scim_user.user.date_joined,
 49                "lastModified": scim_user.last_update,
 50                "location": self.request.build_absolute_uri(
 51                    reverse(
 52                        "authentik_sources_scim:v2-users",
 53                        kwargs={
 54                            "source_slug": self.kwargs["source_slug"],
 55                            "user_id": str(scim_user.user.uuid),
 56                        },
 57                    )
 58                ),
 59            },
 60        )
 61        final_payload = payload.model_dump(mode="json", exclude_unset=True)
 62        final_payload.update(scim_user.attributes)
 63        return self.remove_excluded_attributes(
 64            SCIMUserModel.model_validate(final_payload).model_dump(mode="json", exclude_unset=True)
 65        )
 66
 67    def get(self, request: Request, user_id: str | None = None, **kwargs) -> Response:
 68        """List User handler"""
 69        if user_id:
 70            connection = (
 71                SCIMSourceUser.objects.filter(source=self.source, user__uuid=user_id)
 72                .select_related("user")
 73                .first()
 74            )
 75            if not connection:
 76                raise SCIMNotFoundError("User not found.")
 77            return Response(self.user_to_scim(connection))
 78        connections = (
 79            SCIMSourceUser.objects.filter(source=self.source).select_related("user").order_by("pk")
 80        )
 81        connections = connections.filter(self.filter_parse(request))
 82        page = self.paginate_query(connections)
 83        return Response(
 84            {
 85                "totalResults": page.paginator.count,
 86                "itemsPerPage": page.paginator.per_page,
 87                "startIndex": page.start_index(),
 88                "schemas": ["urn:ietf:params:scim:api:messages:2.0:ListResponse"],
 89                "Resources": [self.user_to_scim(connection) for connection in page],
 90            }
 91        )
 92
 93    @atomic
 94    def update_user(self, connection: SCIMSourceUser | None, data: QueryDict):
 95        """Partial update a user"""
 96        properties = self.build_object_properties(data)
 97
 98        if not properties.get("username"):
 99            raise ValidationError("Invalid user")
100
101        user = connection.user if connection else User()
102        if _user := User.objects.filter(username=properties.get("username")).first():
103            user = _user
104        user.update_attributes(properties)
105
106        if not connection:
107            connection, _ = SCIMSourceUser.objects.update_or_create(
108                external_id=data.get("externalId") or str(uuid4()),
109                source=self.source,
110                user=user,
111                defaults={
112                    "attributes": data,
113                },
114            )
115        else:
116            connection.external_id = data.get("externalId", connection.external_id)
117            connection.attributes = data
118            connection.save()
119        return connection
120
121    def post(self, request: Request, **kwargs) -> Response:
122        """Create user handler"""
123        connection = SCIMSourceUser.objects.filter(
124            Q(
125                Q(user__uuid=request.data.get("id"))
126                | Q(user__username=request.data.get("userName"))
127            ),
128            source=self.source,
129        ).first()
130        if connection:
131            self.logger.debug("Found existing user")
132            raise SCIMConflictError("Group with ID exists already.")
133        connection = self.update_user(None, request.data)
134        return Response(self.user_to_scim(connection), status=201)
135
136    def patch(self, request: Request, user_id: str, **kwargs):
137        connection = SCIMSourceUser.objects.filter(source=self.source, user__uuid=user_id).first()
138        if not connection:
139            raise SCIMNotFoundError("User not found.")
140        patcher = SCIMPatchProcessor()
141        patched_data = patcher.apply_patches(
142            connection.attributes, request.data.get("Operations", [])
143        )
144        if patched_data != connection.attributes:
145            self.update_user(connection, patched_data)
146        return Response(self.user_to_scim(connection), status=200)
147
148    def put(self, request: Request, user_id: str, **kwargs) -> Response:
149        """Update user handler"""
150        connection = SCIMSourceUser.objects.filter(source=self.source, user__uuid=user_id).first()
151        if not connection:
152            raise SCIMNotFoundError("User not found.")
153        self.update_user(connection, request.data)
154        return Response(self.user_to_scim(connection), status=200)
155
156    @atomic
157    def delete(self, request: Request, user_id: str, **kwargs) -> Response:
158        """Delete user handler"""
159        connection = SCIMSourceUser.objects.filter(source=self.source, user__uuid=user_id).first()
160        if not connection:
161            raise SCIMNotFoundError("User not found.")
162        connection.user.delete()
163        connection.delete()
164        return Response(status=204)

SCIM User view

model = <class 'authentik.core.models.User'>
def user_to_scim(self, scim_user: authentik.sources.scim.models.SCIMSourceUser) -> dict:
29    def user_to_scim(self, scim_user: SCIMSourceUser) -> dict:
30        """Convert User to SCIM data"""
31        payload = SCIMUserModel(
32            schemas=[SCIM_USER_SCHEMA],
33            id=str(scim_user.user.uuid),
34            externalId=scim_user.external_id,
35            userName=scim_user.user.username,
36            name=Name(
37                formatted=scim_user.user.name,
38            ),
39            displayName=scim_user.user.name,
40            active=scim_user.user.is_active,
41            emails=(
42                [Email(value=scim_user.user.email, type=EmailKind.work, primary=True)]
43                if scim_user.user.email
44                else []
45            ),
46            meta={
47                "resourceType": "User",
48                "created": scim_user.user.date_joined,
49                "lastModified": scim_user.last_update,
50                "location": self.request.build_absolute_uri(
51                    reverse(
52                        "authentik_sources_scim:v2-users",
53                        kwargs={
54                            "source_slug": self.kwargs["source_slug"],
55                            "user_id": str(scim_user.user.uuid),
56                        },
57                    )
58                ),
59            },
60        )
61        final_payload = payload.model_dump(mode="json", exclude_unset=True)
62        final_payload.update(scim_user.attributes)
63        return self.remove_excluded_attributes(
64            SCIMUserModel.model_validate(final_payload).model_dump(mode="json", exclude_unset=True)
65        )

Convert User to SCIM data

def get( self, request: rest_framework.request.Request, user_id: str | None = None, **kwargs) -> rest_framework.response.Response:
67    def get(self, request: Request, user_id: str | None = None, **kwargs) -> Response:
68        """List User handler"""
69        if user_id:
70            connection = (
71                SCIMSourceUser.objects.filter(source=self.source, user__uuid=user_id)
72                .select_related("user")
73                .first()
74            )
75            if not connection:
76                raise SCIMNotFoundError("User not found.")
77            return Response(self.user_to_scim(connection))
78        connections = (
79            SCIMSourceUser.objects.filter(source=self.source).select_related("user").order_by("pk")
80        )
81        connections = connections.filter(self.filter_parse(request))
82        page = self.paginate_query(connections)
83        return Response(
84            {
85                "totalResults": page.paginator.count,
86                "itemsPerPage": page.paginator.per_page,
87                "startIndex": page.start_index(),
88                "schemas": ["urn:ietf:params:scim:api:messages:2.0:ListResponse"],
89                "Resources": [self.user_to_scim(connection) for connection in page],
90            }
91        )

List User handler

@atomic
def update_user( self, connection: authentik.sources.scim.models.SCIMSourceUser | None, data: django.http.request.QueryDict):
 93    @atomic
 94    def update_user(self, connection: SCIMSourceUser | None, data: QueryDict):
 95        """Partial update a user"""
 96        properties = self.build_object_properties(data)
 97
 98        if not properties.get("username"):
 99            raise ValidationError("Invalid user")
100
101        user = connection.user if connection else User()
102        if _user := User.objects.filter(username=properties.get("username")).first():
103            user = _user
104        user.update_attributes(properties)
105
106        if not connection:
107            connection, _ = SCIMSourceUser.objects.update_or_create(
108                external_id=data.get("externalId") or str(uuid4()),
109                source=self.source,
110                user=user,
111                defaults={
112                    "attributes": data,
113                },
114            )
115        else:
116            connection.external_id = data.get("externalId", connection.external_id)
117            connection.attributes = data
118            connection.save()
119        return connection

Partial update a user

def post( self, request: rest_framework.request.Request, **kwargs) -> rest_framework.response.Response:
121    def post(self, request: Request, **kwargs) -> Response:
122        """Create user handler"""
123        connection = SCIMSourceUser.objects.filter(
124            Q(
125                Q(user__uuid=request.data.get("id"))
126                | Q(user__username=request.data.get("userName"))
127            ),
128            source=self.source,
129        ).first()
130        if connection:
131            self.logger.debug("Found existing user")
132            raise SCIMConflictError("Group with ID exists already.")
133        connection = self.update_user(None, request.data)
134        return Response(self.user_to_scim(connection), status=201)

Create user handler

def patch( self, request: rest_framework.request.Request, user_id: str, **kwargs):
136    def patch(self, request: Request, user_id: str, **kwargs):
137        connection = SCIMSourceUser.objects.filter(source=self.source, user__uuid=user_id).first()
138        if not connection:
139            raise SCIMNotFoundError("User not found.")
140        patcher = SCIMPatchProcessor()
141        patched_data = patcher.apply_patches(
142            connection.attributes, request.data.get("Operations", [])
143        )
144        if patched_data != connection.attributes:
145            self.update_user(connection, patched_data)
146        return Response(self.user_to_scim(connection), status=200)
def put( self, request: rest_framework.request.Request, user_id: str, **kwargs) -> rest_framework.response.Response:
148    def put(self, request: Request, user_id: str, **kwargs) -> Response:
149        """Update user handler"""
150        connection = SCIMSourceUser.objects.filter(source=self.source, user__uuid=user_id).first()
151        if not connection:
152            raise SCIMNotFoundError("User not found.")
153        self.update_user(connection, request.data)
154        return Response(self.user_to_scim(connection), status=200)

Update user handler

@atomic
def delete( self, request: rest_framework.request.Request, user_id: str, **kwargs) -> rest_framework.response.Response:
156    @atomic
157    def delete(self, request: Request, user_id: str, **kwargs) -> Response:
158        """Delete user handler"""
159        connection = SCIMSourceUser.objects.filter(source=self.source, user__uuid=user_id).first()
160        if not connection:
161            raise SCIMNotFoundError("User not found.")
162        connection.user.delete()
163        connection.delete()
164        return Response(status=204)

Delete user handler