authentik.sources.scim.tests.test_users
Test SCIM User
1"""Test SCIM User""" 2 3from json import dumps 4from uuid import uuid4 5 6from django.urls import reverse 7from rest_framework.test import APITestCase 8 9from authentik.core.tests.utils import create_test_user 10from authentik.events.models import Event, EventAction 11from authentik.lib.generators import generate_id 12from authentik.providers.scim.clients.schema import User as SCIMUserSchema 13from authentik.sources.scim.constants import SCIM_URN_USER_ENTERPRISE 14from authentik.sources.scim.models import SCIMSource, SCIMSourcePropertyMapping, SCIMSourceUser 15from authentik.sources.scim.views.v2.base import SCIM_CONTENT_TYPE 16 17 18class TestSCIMUsers(APITestCase): 19 """Test SCIM User view""" 20 21 def setUp(self) -> None: 22 self.source = SCIMSource.objects.create(name=generate_id(), slug=generate_id()) 23 24 def test_user_list(self): 25 """Test full user list""" 26 response = self.client.get( 27 reverse( 28 "authentik_sources_scim:v2-users", 29 kwargs={ 30 "source_slug": self.source.slug, 31 }, 32 ), 33 HTTP_AUTHORIZATION=f"Bearer {self.source.token.key}", 34 ) 35 self.assertEqual(response.status_code, 200) 36 37 def test_user_list_single(self): 38 """Test full user list (single user)""" 39 user = create_test_user() 40 SCIMSourceUser.objects.create( 41 source=self.source, 42 user=user, 43 id=str(uuid4()), 44 ) 45 response = self.client.get( 46 reverse( 47 "authentik_sources_scim:v2-users", 48 kwargs={ 49 "source_slug": self.source.slug, 50 "user_id": str(user.uuid), 51 }, 52 ), 53 HTTP_AUTHORIZATION=f"Bearer {self.source.token.key}", 54 ) 55 self.assertEqual(response.status_code, 200) 56 SCIMUserSchema.model_validate_json(response.content, strict=True) 57 58 def test_user_create(self): 59 """Test user create""" 60 user = create_test_user() 61 ext_id = generate_id() 62 response = self.client.post( 63 reverse( 64 "authentik_sources_scim:v2-users", 65 kwargs={ 66 "source_slug": self.source.slug, 67 }, 68 ), 69 data=dumps( 70 { 71 "userName": generate_id(), 72 "externalId": ext_id, 73 "emails": [ 74 { 75 "primary": True, 76 "value": user.email, 77 } 78 ], 79 } 80 ), 81 content_type=SCIM_CONTENT_TYPE, 82 HTTP_AUTHORIZATION=f"Bearer {self.source.token.key}", 83 ) 84 self.assertEqual(response.status_code, 201) 85 self.assertTrue( 86 SCIMSourceUser.objects.filter(source=self.source, external_id=ext_id).exists() 87 ) 88 self.assertTrue( 89 Event.objects.filter( 90 action=EventAction.MODEL_CREATED, user__username=self.source.token.user.username 91 ).exists() 92 ) 93 94 def test_user_create_duplicate_by_username(self): 95 """Test user create""" 96 user = create_test_user() 97 username = generate_id() 98 obj1 = { 99 "userName": username, 100 "externalId": generate_id(), 101 "emails": [ 102 { 103 "primary": True, 104 "value": user.email, 105 } 106 ], 107 } 108 obj2 = obj1.copy() 109 obj2.update({"externalId": generate_id()}) 110 response = self.client.post( 111 reverse( 112 "authentik_sources_scim:v2-users", 113 kwargs={ 114 "source_slug": self.source.slug, 115 }, 116 ), 117 data=dumps(obj1), 118 content_type=SCIM_CONTENT_TYPE, 119 HTTP_AUTHORIZATION=f"Bearer {self.source.token.key}", 120 ) 121 self.assertEqual(response.status_code, 201) 122 self.assertTrue( 123 SCIMSourceUser.objects.filter(source=self.source, user__username=username).exists() 124 ) 125 self.assertTrue( 126 Event.objects.filter( 127 action=EventAction.MODEL_CREATED, user__username=self.source.token.user.username 128 ).exists() 129 ) 130 response = self.client.post( 131 reverse( 132 "authentik_sources_scim:v2-users", 133 kwargs={ 134 "source_slug": self.source.slug, 135 }, 136 ), 137 data=dumps(obj2), 138 content_type=SCIM_CONTENT_TYPE, 139 HTTP_AUTHORIZATION=f"Bearer {self.source.token.key}", 140 ) 141 self.assertEqual(response.status_code, 409) 142 143 def test_user_property_mappings(self): 144 """Test user property_mappings""" 145 self.source.user_property_mappings.set( 146 [ 147 SCIMSourcePropertyMapping.objects.create( 148 name=generate_id(), 149 expression='return {"attributes": {"phone": data.get("phoneNumber")}}', 150 ) 151 ] 152 ) 153 user = create_test_user() 154 ext_id = generate_id() 155 response = self.client.post( 156 reverse( 157 "authentik_sources_scim:v2-users", 158 kwargs={ 159 "source_slug": self.source.slug, 160 }, 161 ), 162 data=dumps( 163 { 164 "userName": generate_id(), 165 "externalId": ext_id, 166 "emails": [ 167 { 168 "primary": True, 169 "value": user.email, 170 } 171 ], 172 "phoneNumber": "0123456789", 173 } 174 ), 175 content_type=SCIM_CONTENT_TYPE, 176 HTTP_AUTHORIZATION=f"Bearer {self.source.token.key}", 177 ) 178 self.assertEqual(response.status_code, 201) 179 self.assertEqual( 180 SCIMSourceUser.objects.get(source=self.source, external_id=ext_id).user.attributes[ 181 "phone" 182 ], 183 "0123456789", 184 ) 185 186 def test_user_update(self): 187 """Test user update""" 188 user = create_test_user() 189 existing = SCIMSourceUser.objects.create(source=self.source, user=user, external_id=uuid4()) 190 ext_id = generate_id() 191 response = self.client.put( 192 reverse( 193 "authentik_sources_scim:v2-users", 194 kwargs={ 195 "source_slug": self.source.slug, 196 "user_id": str(user.uuid), 197 }, 198 ), 199 data=dumps( 200 { 201 "id": str(existing.pk), 202 "userName": generate_id(), 203 "externalId": ext_id, 204 "emails": [ 205 { 206 "primary": True, 207 "value": user.email, 208 } 209 ], 210 } 211 ), 212 content_type=SCIM_CONTENT_TYPE, 213 HTTP_AUTHORIZATION=f"Bearer {self.source.token.key}", 214 ) 215 self.assertEqual(response.status_code, 200) 216 217 def test_user_update_patch(self): 218 """Test user update (patch)""" 219 user = create_test_user() 220 existing = SCIMSourceUser.objects.create( 221 source=self.source, 222 user=user, 223 external_id=uuid4(), 224 attributes={ 225 "userName": generate_id(), 226 }, 227 ) 228 response = self.client.patch( 229 reverse( 230 "authentik_sources_scim:v2-users", 231 kwargs={ 232 "source_slug": self.source.slug, 233 "user_id": str(user.uuid), 234 }, 235 ), 236 data=dumps( 237 { 238 "schemas": ["urn:ietf:params:scim:api:messages:2.0:PatchOp"], 239 "Operations": [ 240 { 241 "op": "Add", 242 "path": f"{SCIM_URN_USER_ENTERPRISE}:manager", 243 "value": "86b2ed3e-30cd-4881-bb58-c4e910821339", 244 } 245 ], 246 } 247 ), 248 content_type=SCIM_CONTENT_TYPE, 249 HTTP_AUTHORIZATION=f"Bearer {self.source.token.key}", 250 ) 251 self.assertEqual(response.status_code, 200) 252 existing.refresh_from_db() 253 self.assertEqual( 254 existing.attributes[SCIM_URN_USER_ENTERPRISE], 255 {"manager": {"value": "86b2ed3e-30cd-4881-bb58-c4e910821339"}}, 256 ) 257 258 def test_user_delete(self): 259 """Test user delete""" 260 user = create_test_user() 261 SCIMSourceUser.objects.create(source=self.source, user=user, external_id=uuid4()) 262 response = self.client.delete( 263 reverse( 264 "authentik_sources_scim:v2-users", 265 kwargs={ 266 "source_slug": self.source.slug, 267 "user_id": str(user.uuid), 268 }, 269 ), 270 content_type=SCIM_CONTENT_TYPE, 271 HTTP_AUTHORIZATION=f"Bearer {self.source.token.key}", 272 ) 273 self.assertEqual(response.status_code, 204)
class
TestSCIMUsers(rest_framework.test.APITestCase):
19class TestSCIMUsers(APITestCase): 20 """Test SCIM User view""" 21 22 def setUp(self) -> None: 23 self.source = SCIMSource.objects.create(name=generate_id(), slug=generate_id()) 24 25 def test_user_list(self): 26 """Test full user list""" 27 response = self.client.get( 28 reverse( 29 "authentik_sources_scim:v2-users", 30 kwargs={ 31 "source_slug": self.source.slug, 32 }, 33 ), 34 HTTP_AUTHORIZATION=f"Bearer {self.source.token.key}", 35 ) 36 self.assertEqual(response.status_code, 200) 37 38 def test_user_list_single(self): 39 """Test full user list (single user)""" 40 user = create_test_user() 41 SCIMSourceUser.objects.create( 42 source=self.source, 43 user=user, 44 id=str(uuid4()), 45 ) 46 response = self.client.get( 47 reverse( 48 "authentik_sources_scim:v2-users", 49 kwargs={ 50 "source_slug": self.source.slug, 51 "user_id": str(user.uuid), 52 }, 53 ), 54 HTTP_AUTHORIZATION=f"Bearer {self.source.token.key}", 55 ) 56 self.assertEqual(response.status_code, 200) 57 SCIMUserSchema.model_validate_json(response.content, strict=True) 58 59 def test_user_create(self): 60 """Test user create""" 61 user = create_test_user() 62 ext_id = generate_id() 63 response = self.client.post( 64 reverse( 65 "authentik_sources_scim:v2-users", 66 kwargs={ 67 "source_slug": self.source.slug, 68 }, 69 ), 70 data=dumps( 71 { 72 "userName": generate_id(), 73 "externalId": ext_id, 74 "emails": [ 75 { 76 "primary": True, 77 "value": user.email, 78 } 79 ], 80 } 81 ), 82 content_type=SCIM_CONTENT_TYPE, 83 HTTP_AUTHORIZATION=f"Bearer {self.source.token.key}", 84 ) 85 self.assertEqual(response.status_code, 201) 86 self.assertTrue( 87 SCIMSourceUser.objects.filter(source=self.source, external_id=ext_id).exists() 88 ) 89 self.assertTrue( 90 Event.objects.filter( 91 action=EventAction.MODEL_CREATED, user__username=self.source.token.user.username 92 ).exists() 93 ) 94 95 def test_user_create_duplicate_by_username(self): 96 """Test user create""" 97 user = create_test_user() 98 username = generate_id() 99 obj1 = { 100 "userName": username, 101 "externalId": generate_id(), 102 "emails": [ 103 { 104 "primary": True, 105 "value": user.email, 106 } 107 ], 108 } 109 obj2 = obj1.copy() 110 obj2.update({"externalId": generate_id()}) 111 response = self.client.post( 112 reverse( 113 "authentik_sources_scim:v2-users", 114 kwargs={ 115 "source_slug": self.source.slug, 116 }, 117 ), 118 data=dumps(obj1), 119 content_type=SCIM_CONTENT_TYPE, 120 HTTP_AUTHORIZATION=f"Bearer {self.source.token.key}", 121 ) 122 self.assertEqual(response.status_code, 201) 123 self.assertTrue( 124 SCIMSourceUser.objects.filter(source=self.source, user__username=username).exists() 125 ) 126 self.assertTrue( 127 Event.objects.filter( 128 action=EventAction.MODEL_CREATED, user__username=self.source.token.user.username 129 ).exists() 130 ) 131 response = self.client.post( 132 reverse( 133 "authentik_sources_scim:v2-users", 134 kwargs={ 135 "source_slug": self.source.slug, 136 }, 137 ), 138 data=dumps(obj2), 139 content_type=SCIM_CONTENT_TYPE, 140 HTTP_AUTHORIZATION=f"Bearer {self.source.token.key}", 141 ) 142 self.assertEqual(response.status_code, 409) 143 144 def test_user_property_mappings(self): 145 """Test user property_mappings""" 146 self.source.user_property_mappings.set( 147 [ 148 SCIMSourcePropertyMapping.objects.create( 149 name=generate_id(), 150 expression='return {"attributes": {"phone": data.get("phoneNumber")}}', 151 ) 152 ] 153 ) 154 user = create_test_user() 155 ext_id = generate_id() 156 response = self.client.post( 157 reverse( 158 "authentik_sources_scim:v2-users", 159 kwargs={ 160 "source_slug": self.source.slug, 161 }, 162 ), 163 data=dumps( 164 { 165 "userName": generate_id(), 166 "externalId": ext_id, 167 "emails": [ 168 { 169 "primary": True, 170 "value": user.email, 171 } 172 ], 173 "phoneNumber": "0123456789", 174 } 175 ), 176 content_type=SCIM_CONTENT_TYPE, 177 HTTP_AUTHORIZATION=f"Bearer {self.source.token.key}", 178 ) 179 self.assertEqual(response.status_code, 201) 180 self.assertEqual( 181 SCIMSourceUser.objects.get(source=self.source, external_id=ext_id).user.attributes[ 182 "phone" 183 ], 184 "0123456789", 185 ) 186 187 def test_user_update(self): 188 """Test user update""" 189 user = create_test_user() 190 existing = SCIMSourceUser.objects.create(source=self.source, user=user, external_id=uuid4()) 191 ext_id = generate_id() 192 response = self.client.put( 193 reverse( 194 "authentik_sources_scim:v2-users", 195 kwargs={ 196 "source_slug": self.source.slug, 197 "user_id": str(user.uuid), 198 }, 199 ), 200 data=dumps( 201 { 202 "id": str(existing.pk), 203 "userName": generate_id(), 204 "externalId": ext_id, 205 "emails": [ 206 { 207 "primary": True, 208 "value": user.email, 209 } 210 ], 211 } 212 ), 213 content_type=SCIM_CONTENT_TYPE, 214 HTTP_AUTHORIZATION=f"Bearer {self.source.token.key}", 215 ) 216 self.assertEqual(response.status_code, 200) 217 218 def test_user_update_patch(self): 219 """Test user update (patch)""" 220 user = create_test_user() 221 existing = SCIMSourceUser.objects.create( 222 source=self.source, 223 user=user, 224 external_id=uuid4(), 225 attributes={ 226 "userName": generate_id(), 227 }, 228 ) 229 response = self.client.patch( 230 reverse( 231 "authentik_sources_scim:v2-users", 232 kwargs={ 233 "source_slug": self.source.slug, 234 "user_id": str(user.uuid), 235 }, 236 ), 237 data=dumps( 238 { 239 "schemas": ["urn:ietf:params:scim:api:messages:2.0:PatchOp"], 240 "Operations": [ 241 { 242 "op": "Add", 243 "path": f"{SCIM_URN_USER_ENTERPRISE}:manager", 244 "value": "86b2ed3e-30cd-4881-bb58-c4e910821339", 245 } 246 ], 247 } 248 ), 249 content_type=SCIM_CONTENT_TYPE, 250 HTTP_AUTHORIZATION=f"Bearer {self.source.token.key}", 251 ) 252 self.assertEqual(response.status_code, 200) 253 existing.refresh_from_db() 254 self.assertEqual( 255 existing.attributes[SCIM_URN_USER_ENTERPRISE], 256 {"manager": {"value": "86b2ed3e-30cd-4881-bb58-c4e910821339"}}, 257 ) 258 259 def test_user_delete(self): 260 """Test user delete""" 261 user = create_test_user() 262 SCIMSourceUser.objects.create(source=self.source, user=user, external_id=uuid4()) 263 response = self.client.delete( 264 reverse( 265 "authentik_sources_scim:v2-users", 266 kwargs={ 267 "source_slug": self.source.slug, 268 "user_id": str(user.uuid), 269 }, 270 ), 271 content_type=SCIM_CONTENT_TYPE, 272 HTTP_AUTHORIZATION=f"Bearer {self.source.token.key}", 273 ) 274 self.assertEqual(response.status_code, 204)
Test SCIM User view
def
setUp(self) -> None:
22 def setUp(self) -> None: 23 self.source = SCIMSource.objects.create(name=generate_id(), slug=generate_id())
Hook method for setting up the test fixture before exercising it.
def
test_user_list(self):
25 def test_user_list(self): 26 """Test full user list""" 27 response = self.client.get( 28 reverse( 29 "authentik_sources_scim:v2-users", 30 kwargs={ 31 "source_slug": self.source.slug, 32 }, 33 ), 34 HTTP_AUTHORIZATION=f"Bearer {self.source.token.key}", 35 ) 36 self.assertEqual(response.status_code, 200)
Test full user list
def
test_user_list_single(self):
38 def test_user_list_single(self): 39 """Test full user list (single user)""" 40 user = create_test_user() 41 SCIMSourceUser.objects.create( 42 source=self.source, 43 user=user, 44 id=str(uuid4()), 45 ) 46 response = self.client.get( 47 reverse( 48 "authentik_sources_scim:v2-users", 49 kwargs={ 50 "source_slug": self.source.slug, 51 "user_id": str(user.uuid), 52 }, 53 ), 54 HTTP_AUTHORIZATION=f"Bearer {self.source.token.key}", 55 ) 56 self.assertEqual(response.status_code, 200) 57 SCIMUserSchema.model_validate_json(response.content, strict=True)
Test full user list (single user)
def
test_user_create(self):
59 def test_user_create(self): 60 """Test user create""" 61 user = create_test_user() 62 ext_id = generate_id() 63 response = self.client.post( 64 reverse( 65 "authentik_sources_scim:v2-users", 66 kwargs={ 67 "source_slug": self.source.slug, 68 }, 69 ), 70 data=dumps( 71 { 72 "userName": generate_id(), 73 "externalId": ext_id, 74 "emails": [ 75 { 76 "primary": True, 77 "value": user.email, 78 } 79 ], 80 } 81 ), 82 content_type=SCIM_CONTENT_TYPE, 83 HTTP_AUTHORIZATION=f"Bearer {self.source.token.key}", 84 ) 85 self.assertEqual(response.status_code, 201) 86 self.assertTrue( 87 SCIMSourceUser.objects.filter(source=self.source, external_id=ext_id).exists() 88 ) 89 self.assertTrue( 90 Event.objects.filter( 91 action=EventAction.MODEL_CREATED, user__username=self.source.token.user.username 92 ).exists() 93 )
Test user create
def
test_user_create_duplicate_by_username(self):
95 def test_user_create_duplicate_by_username(self): 96 """Test user create""" 97 user = create_test_user() 98 username = generate_id() 99 obj1 = { 100 "userName": username, 101 "externalId": generate_id(), 102 "emails": [ 103 { 104 "primary": True, 105 "value": user.email, 106 } 107 ], 108 } 109 obj2 = obj1.copy() 110 obj2.update({"externalId": generate_id()}) 111 response = self.client.post( 112 reverse( 113 "authentik_sources_scim:v2-users", 114 kwargs={ 115 "source_slug": self.source.slug, 116 }, 117 ), 118 data=dumps(obj1), 119 content_type=SCIM_CONTENT_TYPE, 120 HTTP_AUTHORIZATION=f"Bearer {self.source.token.key}", 121 ) 122 self.assertEqual(response.status_code, 201) 123 self.assertTrue( 124 SCIMSourceUser.objects.filter(source=self.source, user__username=username).exists() 125 ) 126 self.assertTrue( 127 Event.objects.filter( 128 action=EventAction.MODEL_CREATED, user__username=self.source.token.user.username 129 ).exists() 130 ) 131 response = self.client.post( 132 reverse( 133 "authentik_sources_scim:v2-users", 134 kwargs={ 135 "source_slug": self.source.slug, 136 }, 137 ), 138 data=dumps(obj2), 139 content_type=SCIM_CONTENT_TYPE, 140 HTTP_AUTHORIZATION=f"Bearer {self.source.token.key}", 141 ) 142 self.assertEqual(response.status_code, 409)
Test user create
def
test_user_property_mappings(self):
144 def test_user_property_mappings(self): 145 """Test user property_mappings""" 146 self.source.user_property_mappings.set( 147 [ 148 SCIMSourcePropertyMapping.objects.create( 149 name=generate_id(), 150 expression='return {"attributes": {"phone": data.get("phoneNumber")}}', 151 ) 152 ] 153 ) 154 user = create_test_user() 155 ext_id = generate_id() 156 response = self.client.post( 157 reverse( 158 "authentik_sources_scim:v2-users", 159 kwargs={ 160 "source_slug": self.source.slug, 161 }, 162 ), 163 data=dumps( 164 { 165 "userName": generate_id(), 166 "externalId": ext_id, 167 "emails": [ 168 { 169 "primary": True, 170 "value": user.email, 171 } 172 ], 173 "phoneNumber": "0123456789", 174 } 175 ), 176 content_type=SCIM_CONTENT_TYPE, 177 HTTP_AUTHORIZATION=f"Bearer {self.source.token.key}", 178 ) 179 self.assertEqual(response.status_code, 201) 180 self.assertEqual( 181 SCIMSourceUser.objects.get(source=self.source, external_id=ext_id).user.attributes[ 182 "phone" 183 ], 184 "0123456789", 185 )
Test user property_mappings
def
test_user_update(self):
187 def test_user_update(self): 188 """Test user update""" 189 user = create_test_user() 190 existing = SCIMSourceUser.objects.create(source=self.source, user=user, external_id=uuid4()) 191 ext_id = generate_id() 192 response = self.client.put( 193 reverse( 194 "authentik_sources_scim:v2-users", 195 kwargs={ 196 "source_slug": self.source.slug, 197 "user_id": str(user.uuid), 198 }, 199 ), 200 data=dumps( 201 { 202 "id": str(existing.pk), 203 "userName": generate_id(), 204 "externalId": ext_id, 205 "emails": [ 206 { 207 "primary": True, 208 "value": user.email, 209 } 210 ], 211 } 212 ), 213 content_type=SCIM_CONTENT_TYPE, 214 HTTP_AUTHORIZATION=f"Bearer {self.source.token.key}", 215 ) 216 self.assertEqual(response.status_code, 200)
Test user update
def
test_user_update_patch(self):
218 def test_user_update_patch(self): 219 """Test user update (patch)""" 220 user = create_test_user() 221 existing = SCIMSourceUser.objects.create( 222 source=self.source, 223 user=user, 224 external_id=uuid4(), 225 attributes={ 226 "userName": generate_id(), 227 }, 228 ) 229 response = self.client.patch( 230 reverse( 231 "authentik_sources_scim:v2-users", 232 kwargs={ 233 "source_slug": self.source.slug, 234 "user_id": str(user.uuid), 235 }, 236 ), 237 data=dumps( 238 { 239 "schemas": ["urn:ietf:params:scim:api:messages:2.0:PatchOp"], 240 "Operations": [ 241 { 242 "op": "Add", 243 "path": f"{SCIM_URN_USER_ENTERPRISE}:manager", 244 "value": "86b2ed3e-30cd-4881-bb58-c4e910821339", 245 } 246 ], 247 } 248 ), 249 content_type=SCIM_CONTENT_TYPE, 250 HTTP_AUTHORIZATION=f"Bearer {self.source.token.key}", 251 ) 252 self.assertEqual(response.status_code, 200) 253 existing.refresh_from_db() 254 self.assertEqual( 255 existing.attributes[SCIM_URN_USER_ENTERPRISE], 256 {"manager": {"value": "86b2ed3e-30cd-4881-bb58-c4e910821339"}}, 257 )
Test user update (patch)
def
test_user_delete(self):
259 def test_user_delete(self): 260 """Test user delete""" 261 user = create_test_user() 262 SCIMSourceUser.objects.create(source=self.source, user=user, external_id=uuid4()) 263 response = self.client.delete( 264 reverse( 265 "authentik_sources_scim:v2-users", 266 kwargs={ 267 "source_slug": self.source.slug, 268 "user_id": str(user.uuid), 269 }, 270 ), 271 content_type=SCIM_CONTENT_TYPE, 272 HTTP_AUTHORIZATION=f"Bearer {self.source.token.key}", 273 ) 274 self.assertEqual(response.status_code, 204)
Test user delete