authentik.sources.scim.views.v2.users
SCIM User Views
1"""SCIM User Views""" 2 3from uuid import uuid4 4 5from django.db.models import Q 6from django.db.transaction import atomic 7from django.http import QueryDict 8from django.urls import reverse 9from pydanticscim.user import EmailKind, Name 10from rest_framework.exceptions import ValidationError 11from rest_framework.request import Request 12from rest_framework.response import Response 13 14from authentik.core.models import User 15from authentik.providers.scim.clients.schema import SCIM_USER_SCHEMA, Email 16from authentik.providers.scim.clients.schema import User as SCIMUserModel 17from authentik.sources.scim.models import SCIMSourceUser 18from authentik.sources.scim.patch.processor import SCIMPatchProcessor 19from authentik.sources.scim.views.v2.base import SCIMObjectView 20from authentik.sources.scim.views.v2.exceptions import SCIMConflictError, SCIMNotFoundError 21 22 23class UsersView(SCIMObjectView): 24 """SCIM User view""" 25 26 model = User 27 28 def user_to_scim(self, scim_user: SCIMSourceUser) -> dict: 29 """Convert User to SCIM data""" 30 payload = SCIMUserModel( 31 schemas=[SCIM_USER_SCHEMA], 32 id=str(scim_user.user.uuid), 33 externalId=scim_user.external_id, 34 userName=scim_user.user.username, 35 name=Name( 36 formatted=scim_user.user.name, 37 ), 38 displayName=scim_user.user.name, 39 active=scim_user.user.is_active, 40 emails=( 41 [Email(value=scim_user.user.email, type=EmailKind.work, primary=True)] 42 if scim_user.user.email 43 else [] 44 ), 45 meta={ 46 "resourceType": "User", 47 "created": scim_user.user.date_joined, 48 "lastModified": scim_user.last_update, 49 "location": self.request.build_absolute_uri( 50 reverse( 51 "authentik_sources_scim:v2-users", 52 kwargs={ 53 "source_slug": self.kwargs["source_slug"], 54 "user_id": str(scim_user.user.uuid), 55 }, 56 ) 57 ), 58 }, 59 ) 60 final_payload = payload.model_dump(mode="json", exclude_unset=True) 61 final_payload.update(scim_user.attributes) 62 return self.remove_excluded_attributes( 63 SCIMUserModel.model_validate(final_payload).model_dump(mode="json", exclude_unset=True) 64 ) 65 66 def get(self, request: Request, user_id: str | None = None, **kwargs) -> Response: 67 """List User handler""" 68 if user_id: 69 connection = ( 70 SCIMSourceUser.objects.filter(source=self.source, user__uuid=user_id) 71 .select_related("user") 72 .first() 73 ) 74 if not connection: 75 raise SCIMNotFoundError("User not found.") 76 return Response(self.user_to_scim(connection)) 77 connections = ( 78 SCIMSourceUser.objects.filter(source=self.source).select_related("user").order_by("pk") 79 ) 80 connections = connections.filter(self.filter_parse(request)) 81 page = self.paginate_query(connections) 82 return Response( 83 { 84 "totalResults": page.paginator.count, 85 "itemsPerPage": page.paginator.per_page, 86 "startIndex": page.start_index(), 87 "schemas": ["urn:ietf:params:scim:api:messages:2.0:ListResponse"], 88 "Resources": [self.user_to_scim(connection) for connection in page], 89 } 90 ) 91 92 @atomic 93 def update_user(self, connection: SCIMSourceUser | None, data: QueryDict): 94 """Partial update a user""" 95 properties = self.build_object_properties(data) 96 97 if not properties.get("username"): 98 raise ValidationError("Invalid user") 99 100 user = connection.user if connection else User() 101 if _user := User.objects.filter(username=properties.get("username")).first(): 102 user = _user 103 user.update_attributes(properties) 104 105 if not connection: 106 connection, _ = SCIMSourceUser.objects.update_or_create( 107 external_id=data.get("externalId") or str(uuid4()), 108 source=self.source, 109 user=user, 110 defaults={ 111 "attributes": data, 112 }, 113 ) 114 else: 115 connection.external_id = data.get("externalId", connection.external_id) 116 connection.attributes = data 117 connection.save() 118 return connection 119 120 def post(self, request: Request, **kwargs) -> Response: 121 """Create user handler""" 122 connection = SCIMSourceUser.objects.filter( 123 Q( 124 Q(user__uuid=request.data.get("id")) 125 | Q(user__username=request.data.get("userName")) 126 ), 127 source=self.source, 128 ).first() 129 if connection: 130 self.logger.debug("Found existing user") 131 raise SCIMConflictError("Group with ID exists already.") 132 connection = self.update_user(None, request.data) 133 return Response(self.user_to_scim(connection), status=201) 134 135 def patch(self, request: Request, user_id: str, **kwargs): 136 connection = SCIMSourceUser.objects.filter(source=self.source, user__uuid=user_id).first() 137 if not connection: 138 raise SCIMNotFoundError("User not found.") 139 patcher = SCIMPatchProcessor() 140 patched_data = patcher.apply_patches( 141 connection.attributes, request.data.get("Operations", []) 142 ) 143 if patched_data != connection.attributes: 144 self.update_user(connection, patched_data) 145 return Response(self.user_to_scim(connection), status=200) 146 147 def put(self, request: Request, user_id: str, **kwargs) -> Response: 148 """Update user handler""" 149 connection = SCIMSourceUser.objects.filter(source=self.source, user__uuid=user_id).first() 150 if not connection: 151 raise SCIMNotFoundError("User not found.") 152 self.update_user(connection, request.data) 153 return Response(self.user_to_scim(connection), status=200) 154 155 @atomic 156 def delete(self, request: Request, user_id: str, **kwargs) -> Response: 157 """Delete user handler""" 158 connection = SCIMSourceUser.objects.filter(source=self.source, user__uuid=user_id).first() 159 if not connection: 160 raise SCIMNotFoundError("User not found.") 161 connection.user.delete() 162 connection.delete() 163 return Response(status=204)
24class UsersView(SCIMObjectView): 25 """SCIM User view""" 26 27 model = User 28 29 def user_to_scim(self, scim_user: SCIMSourceUser) -> dict: 30 """Convert User to SCIM data""" 31 payload = SCIMUserModel( 32 schemas=[SCIM_USER_SCHEMA], 33 id=str(scim_user.user.uuid), 34 externalId=scim_user.external_id, 35 userName=scim_user.user.username, 36 name=Name( 37 formatted=scim_user.user.name, 38 ), 39 displayName=scim_user.user.name, 40 active=scim_user.user.is_active, 41 emails=( 42 [Email(value=scim_user.user.email, type=EmailKind.work, primary=True)] 43 if scim_user.user.email 44 else [] 45 ), 46 meta={ 47 "resourceType": "User", 48 "created": scim_user.user.date_joined, 49 "lastModified": scim_user.last_update, 50 "location": self.request.build_absolute_uri( 51 reverse( 52 "authentik_sources_scim:v2-users", 53 kwargs={ 54 "source_slug": self.kwargs["source_slug"], 55 "user_id": str(scim_user.user.uuid), 56 }, 57 ) 58 ), 59 }, 60 ) 61 final_payload = payload.model_dump(mode="json", exclude_unset=True) 62 final_payload.update(scim_user.attributes) 63 return self.remove_excluded_attributes( 64 SCIMUserModel.model_validate(final_payload).model_dump(mode="json", exclude_unset=True) 65 ) 66 67 def get(self, request: Request, user_id: str | None = None, **kwargs) -> Response: 68 """List User handler""" 69 if user_id: 70 connection = ( 71 SCIMSourceUser.objects.filter(source=self.source, user__uuid=user_id) 72 .select_related("user") 73 .first() 74 ) 75 if not connection: 76 raise SCIMNotFoundError("User not found.") 77 return Response(self.user_to_scim(connection)) 78 connections = ( 79 SCIMSourceUser.objects.filter(source=self.source).select_related("user").order_by("pk") 80 ) 81 connections = connections.filter(self.filter_parse(request)) 82 page = self.paginate_query(connections) 83 return Response( 84 { 85 "totalResults": page.paginator.count, 86 "itemsPerPage": page.paginator.per_page, 87 "startIndex": page.start_index(), 88 "schemas": ["urn:ietf:params:scim:api:messages:2.0:ListResponse"], 89 "Resources": [self.user_to_scim(connection) for connection in page], 90 } 91 ) 92 93 @atomic 94 def update_user(self, connection: SCIMSourceUser | None, data: QueryDict): 95 """Partial update a user""" 96 properties = self.build_object_properties(data) 97 98 if not properties.get("username"): 99 raise ValidationError("Invalid user") 100 101 user = connection.user if connection else User() 102 if _user := User.objects.filter(username=properties.get("username")).first(): 103 user = _user 104 user.update_attributes(properties) 105 106 if not connection: 107 connection, _ = SCIMSourceUser.objects.update_or_create( 108 external_id=data.get("externalId") or str(uuid4()), 109 source=self.source, 110 user=user, 111 defaults={ 112 "attributes": data, 113 }, 114 ) 115 else: 116 connection.external_id = data.get("externalId", connection.external_id) 117 connection.attributes = data 118 connection.save() 119 return connection 120 121 def post(self, request: Request, **kwargs) -> Response: 122 """Create user handler""" 123 connection = SCIMSourceUser.objects.filter( 124 Q( 125 Q(user__uuid=request.data.get("id")) 126 | Q(user__username=request.data.get("userName")) 127 ), 128 source=self.source, 129 ).first() 130 if connection: 131 self.logger.debug("Found existing user") 132 raise SCIMConflictError("Group with ID exists already.") 133 connection = self.update_user(None, request.data) 134 return Response(self.user_to_scim(connection), status=201) 135 136 def patch(self, request: Request, user_id: str, **kwargs): 137 connection = SCIMSourceUser.objects.filter(source=self.source, user__uuid=user_id).first() 138 if not connection: 139 raise SCIMNotFoundError("User not found.") 140 patcher = SCIMPatchProcessor() 141 patched_data = patcher.apply_patches( 142 connection.attributes, request.data.get("Operations", []) 143 ) 144 if patched_data != connection.attributes: 145 self.update_user(connection, patched_data) 146 return Response(self.user_to_scim(connection), status=200) 147 148 def put(self, request: Request, user_id: str, **kwargs) -> Response: 149 """Update user handler""" 150 connection = SCIMSourceUser.objects.filter(source=self.source, user__uuid=user_id).first() 151 if not connection: 152 raise SCIMNotFoundError("User not found.") 153 self.update_user(connection, request.data) 154 return Response(self.user_to_scim(connection), status=200) 155 156 @atomic 157 def delete(self, request: Request, user_id: str, **kwargs) -> Response: 158 """Delete user handler""" 159 connection = SCIMSourceUser.objects.filter(source=self.source, user__uuid=user_id).first() 160 if not connection: 161 raise SCIMNotFoundError("User not found.") 162 connection.user.delete() 163 connection.delete() 164 return Response(status=204)
SCIM User view
model =
<class 'authentik.core.models.User'>
29 def user_to_scim(self, scim_user: SCIMSourceUser) -> dict: 30 """Convert User to SCIM data""" 31 payload = SCIMUserModel( 32 schemas=[SCIM_USER_SCHEMA], 33 id=str(scim_user.user.uuid), 34 externalId=scim_user.external_id, 35 userName=scim_user.user.username, 36 name=Name( 37 formatted=scim_user.user.name, 38 ), 39 displayName=scim_user.user.name, 40 active=scim_user.user.is_active, 41 emails=( 42 [Email(value=scim_user.user.email, type=EmailKind.work, primary=True)] 43 if scim_user.user.email 44 else [] 45 ), 46 meta={ 47 "resourceType": "User", 48 "created": scim_user.user.date_joined, 49 "lastModified": scim_user.last_update, 50 "location": self.request.build_absolute_uri( 51 reverse( 52 "authentik_sources_scim:v2-users", 53 kwargs={ 54 "source_slug": self.kwargs["source_slug"], 55 "user_id": str(scim_user.user.uuid), 56 }, 57 ) 58 ), 59 }, 60 ) 61 final_payload = payload.model_dump(mode="json", exclude_unset=True) 62 final_payload.update(scim_user.attributes) 63 return self.remove_excluded_attributes( 64 SCIMUserModel.model_validate(final_payload).model_dump(mode="json", exclude_unset=True) 65 )
Convert User to SCIM data
def
get( self, request: rest_framework.request.Request, user_id: str | None = None, **kwargs) -> rest_framework.response.Response:
67 def get(self, request: Request, user_id: str | None = None, **kwargs) -> Response: 68 """List User handler""" 69 if user_id: 70 connection = ( 71 SCIMSourceUser.objects.filter(source=self.source, user__uuid=user_id) 72 .select_related("user") 73 .first() 74 ) 75 if not connection: 76 raise SCIMNotFoundError("User not found.") 77 return Response(self.user_to_scim(connection)) 78 connections = ( 79 SCIMSourceUser.objects.filter(source=self.source).select_related("user").order_by("pk") 80 ) 81 connections = connections.filter(self.filter_parse(request)) 82 page = self.paginate_query(connections) 83 return Response( 84 { 85 "totalResults": page.paginator.count, 86 "itemsPerPage": page.paginator.per_page, 87 "startIndex": page.start_index(), 88 "schemas": ["urn:ietf:params:scim:api:messages:2.0:ListResponse"], 89 "Resources": [self.user_to_scim(connection) for connection in page], 90 } 91 )
List User handler
@atomic
def
update_user( self, connection: authentik.sources.scim.models.SCIMSourceUser | None, data: django.http.request.QueryDict):
93 @atomic 94 def update_user(self, connection: SCIMSourceUser | None, data: QueryDict): 95 """Partial update a user""" 96 properties = self.build_object_properties(data) 97 98 if not properties.get("username"): 99 raise ValidationError("Invalid user") 100 101 user = connection.user if connection else User() 102 if _user := User.objects.filter(username=properties.get("username")).first(): 103 user = _user 104 user.update_attributes(properties) 105 106 if not connection: 107 connection, _ = SCIMSourceUser.objects.update_or_create( 108 external_id=data.get("externalId") or str(uuid4()), 109 source=self.source, 110 user=user, 111 defaults={ 112 "attributes": data, 113 }, 114 ) 115 else: 116 connection.external_id = data.get("externalId", connection.external_id) 117 connection.attributes = data 118 connection.save() 119 return connection
Partial update a user
def
post( self, request: rest_framework.request.Request, **kwargs) -> rest_framework.response.Response:
121 def post(self, request: Request, **kwargs) -> Response: 122 """Create user handler""" 123 connection = SCIMSourceUser.objects.filter( 124 Q( 125 Q(user__uuid=request.data.get("id")) 126 | Q(user__username=request.data.get("userName")) 127 ), 128 source=self.source, 129 ).first() 130 if connection: 131 self.logger.debug("Found existing user") 132 raise SCIMConflictError("Group with ID exists already.") 133 connection = self.update_user(None, request.data) 134 return Response(self.user_to_scim(connection), status=201)
Create user handler
def
patch( self, request: rest_framework.request.Request, user_id: str, **kwargs):
136 def patch(self, request: Request, user_id: str, **kwargs): 137 connection = SCIMSourceUser.objects.filter(source=self.source, user__uuid=user_id).first() 138 if not connection: 139 raise SCIMNotFoundError("User not found.") 140 patcher = SCIMPatchProcessor() 141 patched_data = patcher.apply_patches( 142 connection.attributes, request.data.get("Operations", []) 143 ) 144 if patched_data != connection.attributes: 145 self.update_user(connection, patched_data) 146 return Response(self.user_to_scim(connection), status=200)
def
put( self, request: rest_framework.request.Request, user_id: str, **kwargs) -> rest_framework.response.Response:
148 def put(self, request: Request, user_id: str, **kwargs) -> Response: 149 """Update user handler""" 150 connection = SCIMSourceUser.objects.filter(source=self.source, user__uuid=user_id).first() 151 if not connection: 152 raise SCIMNotFoundError("User not found.") 153 self.update_user(connection, request.data) 154 return Response(self.user_to_scim(connection), status=200)
Update user handler
@atomic
def
delete( self, request: rest_framework.request.Request, user_id: str, **kwargs) -> rest_framework.response.Response:
156 @atomic 157 def delete(self, request: Request, user_id: str, **kwargs) -> Response: 158 """Delete user handler""" 159 connection = SCIMSourceUser.objects.filter(source=self.source, user__uuid=user_id).first() 160 if not connection: 161 raise SCIMNotFoundError("User not found.") 162 connection.user.delete() 163 connection.delete() 164 return Response(status=204)
Delete user handler