authentik.providers.scim.tests.test_group
SCIM Group tests
1"""SCIM Group tests""" 2 3from json import loads 4 5from django.test import TestCase 6from jsonschema import validate 7from requests_mock import Mocker 8 9from authentik.blueprints.tests import apply_blueprint 10from authentik.core.models import Application, Group, User 11from authentik.lib.generators import generate_id 12from authentik.providers.scim.models import SCIMMapping, SCIMProvider, SCIMProviderGroup 13from authentik.providers.scim.tasks import scim_sync 14 15 16class SCIMGroupTests(TestCase): 17 """SCIM Group tests""" 18 19 @apply_blueprint("system/providers-scim.yaml") 20 def setUp(self) -> None: 21 # Delete all users and groups as the mocked HTTP responses only return one ID 22 # which will cause errors with multiple users 23 User.objects.all().exclude_anonymous().delete() 24 Group.objects.all().delete() 25 self.provider: SCIMProvider = SCIMProvider.objects.create( 26 name=generate_id(), 27 url="https://localhost", 28 token=generate_id(), 29 ) 30 self.app: Application = Application.objects.create( 31 name=generate_id(), 32 slug=generate_id(), 33 ) 34 self.app.backchannel_providers.add(self.provider) 35 self.provider.property_mappings.set( 36 [SCIMMapping.objects.get(managed="goauthentik.io/providers/scim/user")] 37 ) 38 self.provider.property_mappings_group.set( 39 [SCIMMapping.objects.get(managed="goauthentik.io/providers/scim/group")] 40 ) 41 42 @Mocker() 43 def test_group_create(self, mock: Mocker): 44 """Test group creation""" 45 scim_id = generate_id() 46 mock.get( 47 "https://localhost/ServiceProviderConfig", 48 json={}, 49 ) 50 mock.post( 51 "https://localhost/Groups", 52 json={ 53 "id": scim_id, 54 }, 55 ) 56 uid = generate_id() 57 group = Group.objects.create( 58 name=uid, 59 ) 60 self.assertEqual(mock.call_count, 2) 61 self.assertEqual(mock.request_history[0].method, "GET") 62 self.assertEqual(mock.request_history[1].method, "POST") 63 self.assertJSONEqual( 64 mock.request_history[1].body, 65 { 66 "schemas": ["urn:ietf:params:scim:schemas:core:2.0:Group"], 67 "externalId": str(group.pk), 68 "displayName": group.name, 69 }, 70 ) 71 72 @Mocker() 73 def test_group_create_update(self, mock: Mocker): 74 """Test group creation and update""" 75 scim_id = generate_id() 76 mock.get( 77 "https://localhost/ServiceProviderConfig", 78 json={}, 79 ) 80 mock.post( 81 "https://localhost/Groups", 82 json={ 83 "id": scim_id, 84 }, 85 ) 86 mock.put( 87 "https://localhost/Groups", 88 json={ 89 "id": scim_id, 90 }, 91 ) 92 uid = generate_id() 93 group = Group.objects.create( 94 name=uid, 95 ) 96 self.assertEqual(mock.call_count, 2) 97 self.assertEqual(mock.request_history[0].method, "GET") 98 self.assertEqual(mock.request_history[1].method, "POST") 99 body = loads(mock.request_history[1].body) 100 with open("schemas/scim-group.schema.json", encoding="utf-8") as schema: 101 validate(body, loads(schema.read())) 102 self.assertEqual( 103 body, 104 { 105 "schemas": ["urn:ietf:params:scim:schemas:core:2.0:Group"], 106 "externalId": str(group.pk), 107 "displayName": group.name, 108 }, 109 ) 110 group.name = generate_id() 111 group.save() 112 self.assertEqual(mock.call_count, 3) 113 self.assertEqual(mock.request_history[0].method, "GET") 114 self.assertEqual(mock.request_history[1].method, "POST") 115 self.assertEqual(mock.request_history[2].method, "PUT") 116 117 @Mocker() 118 def test_group_create_delete(self, mock: Mocker): 119 """Test group creation""" 120 scim_id = generate_id() 121 mock.get( 122 "https://localhost/ServiceProviderConfig", 123 json={}, 124 ) 125 mock.post( 126 "https://localhost/Groups", 127 json={ 128 "id": scim_id, 129 }, 130 ) 131 mock.delete(f"https://localhost/Groups/{scim_id}", status_code=204) 132 uid = generate_id() 133 group = Group.objects.create( 134 name=uid, 135 ) 136 self.assertEqual(mock.call_count, 2) 137 self.assertEqual(mock.request_history[0].method, "GET") 138 self.assertEqual(mock.request_history[1].method, "POST") 139 self.assertJSONEqual( 140 mock.request_history[1].body, 141 { 142 "schemas": ["urn:ietf:params:scim:schemas:core:2.0:Group"], 143 "externalId": str(group.pk), 144 "displayName": group.name, 145 }, 146 ) 147 group.delete() 148 self.assertEqual(mock.call_count, 3) 149 self.assertEqual(mock.request_history[0].method, "GET") 150 self.assertEqual(mock.request_history[2].method, "DELETE") 151 self.assertEqual(mock.request_history[2].url, f"https://localhost/Groups/{scim_id}") 152 153 @Mocker() 154 def test_group_create_update_noop(self, mock: Mocker): 155 """Test group creation and noop update""" 156 scim_id = generate_id() 157 mock.get( 158 "https://localhost/ServiceProviderConfig", 159 json={}, 160 ) 161 mock.post( 162 "https://localhost/Groups", 163 json={ 164 "id": scim_id, 165 }, 166 ) 167 uid = generate_id() 168 group = Group.objects.create( 169 name=uid, 170 ) 171 self.assertEqual(mock.call_count, 2) 172 self.assertEqual(mock.request_history[0].method, "GET") 173 self.assertEqual(mock.request_history[1].method, "POST") 174 body = loads(mock.request_history[1].body) 175 with open("schemas/scim-group.schema.json", encoding="utf-8") as schema: 176 validate(body, loads(schema.read())) 177 self.assertEqual( 178 body, 179 { 180 "schemas": ["urn:ietf:params:scim:schemas:core:2.0:Group"], 181 "externalId": str(group.pk), 182 "displayName": group.name, 183 }, 184 ) 185 conn = SCIMProviderGroup.objects.filter(group=group).first() 186 conn.attributes = { 187 "id": scim_id, 188 "schemas": ["urn:ietf:params:scim:schemas:core:2.0:Group"], 189 "externalId": str(group.pk), 190 "displayName": group.name, 191 } 192 conn.save() 193 mock.get( 194 f"https://localhost/Groups/{scim_id}", 195 json={ 196 "id": scim_id, 197 "schemas": ["urn:ietf:params:scim:schemas:core:2.0:Group"], 198 "externalId": str(group.pk), 199 "displayName": group.name, 200 "members": [], 201 }, 202 ) 203 group.save() 204 self.assertEqual(mock.call_count, 3) 205 self.assertEqual(mock.request_history[0].method, "GET") 206 self.assertEqual(mock.request_history[1].method, "POST") 207 self.assertEqual(mock.request_history[2].method, "GET") 208 self.assertNotIn("PUT", [req.method for req in mock.request_history]) 209 210 @Mocker() 211 def test_discover(self, mock: Mocker): 212 group = Group.objects.create(name="acl_admins") 213 mock.get( 214 "https://localhost/ServiceProviderConfig", 215 json={}, 216 ) 217 mock.get( 218 "https://localhost/Groups", 219 json={ 220 "schemas": ["urn:ietf:params:scim:api:messages:2.0:ListResponse"], 221 "totalResults": 2, 222 "startIndex": 1, 223 "itemsPerPage": 1, 224 "Resources": [ 225 { 226 "schemas": ["urn:ietf:params:scim:schemas:core:2.0:Group"], 227 "id": "3", 228 "displayName": "acl_admins", 229 "meta": {"resourceType": "Group"}, 230 "members": [], 231 }, 232 ], 233 }, 234 ) 235 mock.get( 236 "https://localhost/Groups?startIndex=2", 237 json={ 238 "schemas": ["urn:ietf:params:scim:api:messages:2.0:ListResponse"], 239 "totalResults": 2, 240 "startIndex": 2, 241 "itemsPerPage": 1, 242 "Resources": [ 243 { 244 "schemas": ["urn:ietf:params:scim:schemas:core:2.0:Group"], 245 "id": "10", 246 "displayName": "test", 247 "meta": {"resourceType": "Group"}, 248 "members": [], 249 }, 250 ], 251 }, 252 ) 253 self.provider.client_for_model(Group).discover() 254 connection = SCIMProviderGroup.objects.filter(provider=self.provider, group=group).first() 255 self.assertIsNotNone(connection) 256 self.assertEqual(connection.scim_id, "3") 257 258 def _create_stale_provider_group(self, scim_id: str) -> Group: 259 """Create a group that is outside the provider's scope (via group_filters) with an 260 existing SCIMProviderGroup, simulating a previously synced group now out of scope.""" 261 self.app.backchannel_providers.remove(self.provider) 262 anchor = Group.objects.create(name=generate_id()) 263 stale = Group.objects.create(name=generate_id()) 264 self.app.backchannel_providers.add(self.provider) 265 266 self.provider.group_filters.set([anchor]) 267 SCIMProviderGroup.objects.create(provider=self.provider, group=stale, scim_id=scim_id) 268 return stale 269 270 @Mocker() 271 def test_sync_cleanup_stale_group_delete(self, mock: Mocker): 272 """Stale (out-of-scope) groups are deleted during full sync cleanup""" 273 scim_id = generate_id() 274 mock.get("https://localhost/ServiceProviderConfig", json={}) 275 276 mock.post("https://localhost/Groups", json={"id": generate_id()}) 277 mock.delete(f"https://localhost/Groups/{scim_id}", status_code=204) 278 self._create_stale_provider_group(scim_id) 279 280 scim_sync.send(self.provider.pk).get_result() 281 282 delete_reqs = [r for r in mock.request_history if r.method == "DELETE"] 283 self.assertEqual(len(delete_reqs), 1) 284 self.assertEqual(delete_reqs[0].url, f"https://localhost/Groups/{scim_id}") 285 self.assertFalse( 286 SCIMProviderGroup.objects.filter(provider=self.provider, scim_id=scim_id).exists() 287 ) 288 289 @Mocker() 290 def test_sync_cleanup_stale_group_not_found(self, mock: Mocker): 291 """Stale group cleanup handles 404 from the remote gracefully""" 292 scim_id = generate_id() 293 mock.get("https://localhost/ServiceProviderConfig", json={}) 294 mock.post("https://localhost/Groups", json={"id": generate_id()}) 295 mock.delete(f"https://localhost/Groups/{scim_id}", status_code=404) 296 self._create_stale_provider_group(scim_id) 297 298 scim_sync.send(self.provider.pk).get_result() 299 300 delete_reqs = [r for r in mock.request_history if r.method == "DELETE"] 301 self.assertEqual(len(delete_reqs), 1) 302 303 self.assertFalse( 304 SCIMProviderGroup.objects.filter(provider=self.provider, scim_id=scim_id).exists() 305 ) 306 307 @Mocker() 308 def test_sync_cleanup_stale_group_transient_error(self, mock: Mocker): 309 """Stale group cleanup logs and retries on transient HTTP errors""" 310 scim_id = generate_id() 311 mock.get("https://localhost/ServiceProviderConfig", json={}) 312 mock.post("https://localhost/Groups", json={"id": generate_id()}) 313 mock.delete(f"https://localhost/Groups/{scim_id}", status_code=429) 314 self._create_stale_provider_group(scim_id) 315 316 scim_sync.send(self.provider.pk) 317 318 delete_reqs = [r for r in mock.request_history if r.method == "DELETE"] 319 self.assertEqual(len(delete_reqs), 1) 320 321 @Mocker() 322 def test_sync_cleanup_stale_group_dry_run(self, mock: Mocker): 323 """Stale group cleanup skips HTTP DELETE in dry_run mode""" 324 self.provider.dry_run = True 325 self.provider.save() 326 scim_id = generate_id() 327 mock.get("https://localhost/ServiceProviderConfig", json={}) 328 self._create_stale_provider_group(scim_id) 329 330 scim_sync.send(self.provider.pk) 331 332 delete_reqs = [r for r in mock.request_history if r.method == "DELETE"] 333 self.assertEqual(len(delete_reqs), 0)
class
SCIMGroupTests(django.test.testcases.TestCase):
17class SCIMGroupTests(TestCase): 18 """SCIM Group tests""" 19 20 @apply_blueprint("system/providers-scim.yaml") 21 def setUp(self) -> None: 22 # Delete all users and groups as the mocked HTTP responses only return one ID 23 # which will cause errors with multiple users 24 User.objects.all().exclude_anonymous().delete() 25 Group.objects.all().delete() 26 self.provider: SCIMProvider = SCIMProvider.objects.create( 27 name=generate_id(), 28 url="https://localhost", 29 token=generate_id(), 30 ) 31 self.app: Application = Application.objects.create( 32 name=generate_id(), 33 slug=generate_id(), 34 ) 35 self.app.backchannel_providers.add(self.provider) 36 self.provider.property_mappings.set( 37 [SCIMMapping.objects.get(managed="goauthentik.io/providers/scim/user")] 38 ) 39 self.provider.property_mappings_group.set( 40 [SCIMMapping.objects.get(managed="goauthentik.io/providers/scim/group")] 41 ) 42 43 @Mocker() 44 def test_group_create(self, mock: Mocker): 45 """Test group creation""" 46 scim_id = generate_id() 47 mock.get( 48 "https://localhost/ServiceProviderConfig", 49 json={}, 50 ) 51 mock.post( 52 "https://localhost/Groups", 53 json={ 54 "id": scim_id, 55 }, 56 ) 57 uid = generate_id() 58 group = Group.objects.create( 59 name=uid, 60 ) 61 self.assertEqual(mock.call_count, 2) 62 self.assertEqual(mock.request_history[0].method, "GET") 63 self.assertEqual(mock.request_history[1].method, "POST") 64 self.assertJSONEqual( 65 mock.request_history[1].body, 66 { 67 "schemas": ["urn:ietf:params:scim:schemas:core:2.0:Group"], 68 "externalId": str(group.pk), 69 "displayName": group.name, 70 }, 71 ) 72 73 @Mocker() 74 def test_group_create_update(self, mock: Mocker): 75 """Test group creation and update""" 76 scim_id = generate_id() 77 mock.get( 78 "https://localhost/ServiceProviderConfig", 79 json={}, 80 ) 81 mock.post( 82 "https://localhost/Groups", 83 json={ 84 "id": scim_id, 85 }, 86 ) 87 mock.put( 88 "https://localhost/Groups", 89 json={ 90 "id": scim_id, 91 }, 92 ) 93 uid = generate_id() 94 group = Group.objects.create( 95 name=uid, 96 ) 97 self.assertEqual(mock.call_count, 2) 98 self.assertEqual(mock.request_history[0].method, "GET") 99 self.assertEqual(mock.request_history[1].method, "POST") 100 body = loads(mock.request_history[1].body) 101 with open("schemas/scim-group.schema.json", encoding="utf-8") as schema: 102 validate(body, loads(schema.read())) 103 self.assertEqual( 104 body, 105 { 106 "schemas": ["urn:ietf:params:scim:schemas:core:2.0:Group"], 107 "externalId": str(group.pk), 108 "displayName": group.name, 109 }, 110 ) 111 group.name = generate_id() 112 group.save() 113 self.assertEqual(mock.call_count, 3) 114 self.assertEqual(mock.request_history[0].method, "GET") 115 self.assertEqual(mock.request_history[1].method, "POST") 116 self.assertEqual(mock.request_history[2].method, "PUT") 117 118 @Mocker() 119 def test_group_create_delete(self, mock: Mocker): 120 """Test group creation""" 121 scim_id = generate_id() 122 mock.get( 123 "https://localhost/ServiceProviderConfig", 124 json={}, 125 ) 126 mock.post( 127 "https://localhost/Groups", 128 json={ 129 "id": scim_id, 130 }, 131 ) 132 mock.delete(f"https://localhost/Groups/{scim_id}", status_code=204) 133 uid = generate_id() 134 group = Group.objects.create( 135 name=uid, 136 ) 137 self.assertEqual(mock.call_count, 2) 138 self.assertEqual(mock.request_history[0].method, "GET") 139 self.assertEqual(mock.request_history[1].method, "POST") 140 self.assertJSONEqual( 141 mock.request_history[1].body, 142 { 143 "schemas": ["urn:ietf:params:scim:schemas:core:2.0:Group"], 144 "externalId": str(group.pk), 145 "displayName": group.name, 146 }, 147 ) 148 group.delete() 149 self.assertEqual(mock.call_count, 3) 150 self.assertEqual(mock.request_history[0].method, "GET") 151 self.assertEqual(mock.request_history[2].method, "DELETE") 152 self.assertEqual(mock.request_history[2].url, f"https://localhost/Groups/{scim_id}") 153 154 @Mocker() 155 def test_group_create_update_noop(self, mock: Mocker): 156 """Test group creation and noop update""" 157 scim_id = generate_id() 158 mock.get( 159 "https://localhost/ServiceProviderConfig", 160 json={}, 161 ) 162 mock.post( 163 "https://localhost/Groups", 164 json={ 165 "id": scim_id, 166 }, 167 ) 168 uid = generate_id() 169 group = Group.objects.create( 170 name=uid, 171 ) 172 self.assertEqual(mock.call_count, 2) 173 self.assertEqual(mock.request_history[0].method, "GET") 174 self.assertEqual(mock.request_history[1].method, "POST") 175 body = loads(mock.request_history[1].body) 176 with open("schemas/scim-group.schema.json", encoding="utf-8") as schema: 177 validate(body, loads(schema.read())) 178 self.assertEqual( 179 body, 180 { 181 "schemas": ["urn:ietf:params:scim:schemas:core:2.0:Group"], 182 "externalId": str(group.pk), 183 "displayName": group.name, 184 }, 185 ) 186 conn = SCIMProviderGroup.objects.filter(group=group).first() 187 conn.attributes = { 188 "id": scim_id, 189 "schemas": ["urn:ietf:params:scim:schemas:core:2.0:Group"], 190 "externalId": str(group.pk), 191 "displayName": group.name, 192 } 193 conn.save() 194 mock.get( 195 f"https://localhost/Groups/{scim_id}", 196 json={ 197 "id": scim_id, 198 "schemas": ["urn:ietf:params:scim:schemas:core:2.0:Group"], 199 "externalId": str(group.pk), 200 "displayName": group.name, 201 "members": [], 202 }, 203 ) 204 group.save() 205 self.assertEqual(mock.call_count, 3) 206 self.assertEqual(mock.request_history[0].method, "GET") 207 self.assertEqual(mock.request_history[1].method, "POST") 208 self.assertEqual(mock.request_history[2].method, "GET") 209 self.assertNotIn("PUT", [req.method for req in mock.request_history]) 210 211 @Mocker() 212 def test_discover(self, mock: Mocker): 213 group = Group.objects.create(name="acl_admins") 214 mock.get( 215 "https://localhost/ServiceProviderConfig", 216 json={}, 217 ) 218 mock.get( 219 "https://localhost/Groups", 220 json={ 221 "schemas": ["urn:ietf:params:scim:api:messages:2.0:ListResponse"], 222 "totalResults": 2, 223 "startIndex": 1, 224 "itemsPerPage": 1, 225 "Resources": [ 226 { 227 "schemas": ["urn:ietf:params:scim:schemas:core:2.0:Group"], 228 "id": "3", 229 "displayName": "acl_admins", 230 "meta": {"resourceType": "Group"}, 231 "members": [], 232 }, 233 ], 234 }, 235 ) 236 mock.get( 237 "https://localhost/Groups?startIndex=2", 238 json={ 239 "schemas": ["urn:ietf:params:scim:api:messages:2.0:ListResponse"], 240 "totalResults": 2, 241 "startIndex": 2, 242 "itemsPerPage": 1, 243 "Resources": [ 244 { 245 "schemas": ["urn:ietf:params:scim:schemas:core:2.0:Group"], 246 "id": "10", 247 "displayName": "test", 248 "meta": {"resourceType": "Group"}, 249 "members": [], 250 }, 251 ], 252 }, 253 ) 254 self.provider.client_for_model(Group).discover() 255 connection = SCIMProviderGroup.objects.filter(provider=self.provider, group=group).first() 256 self.assertIsNotNone(connection) 257 self.assertEqual(connection.scim_id, "3") 258 259 def _create_stale_provider_group(self, scim_id: str) -> Group: 260 """Create a group that is outside the provider's scope (via group_filters) with an 261 existing SCIMProviderGroup, simulating a previously synced group now out of scope.""" 262 self.app.backchannel_providers.remove(self.provider) 263 anchor = Group.objects.create(name=generate_id()) 264 stale = Group.objects.create(name=generate_id()) 265 self.app.backchannel_providers.add(self.provider) 266 267 self.provider.group_filters.set([anchor]) 268 SCIMProviderGroup.objects.create(provider=self.provider, group=stale, scim_id=scim_id) 269 return stale 270 271 @Mocker() 272 def test_sync_cleanup_stale_group_delete(self, mock: Mocker): 273 """Stale (out-of-scope) groups are deleted during full sync cleanup""" 274 scim_id = generate_id() 275 mock.get("https://localhost/ServiceProviderConfig", json={}) 276 277 mock.post("https://localhost/Groups", json={"id": generate_id()}) 278 mock.delete(f"https://localhost/Groups/{scim_id}", status_code=204) 279 self._create_stale_provider_group(scim_id) 280 281 scim_sync.send(self.provider.pk).get_result() 282 283 delete_reqs = [r for r in mock.request_history if r.method == "DELETE"] 284 self.assertEqual(len(delete_reqs), 1) 285 self.assertEqual(delete_reqs[0].url, f"https://localhost/Groups/{scim_id}") 286 self.assertFalse( 287 SCIMProviderGroup.objects.filter(provider=self.provider, scim_id=scim_id).exists() 288 ) 289 290 @Mocker() 291 def test_sync_cleanup_stale_group_not_found(self, mock: Mocker): 292 """Stale group cleanup handles 404 from the remote gracefully""" 293 scim_id = generate_id() 294 mock.get("https://localhost/ServiceProviderConfig", json={}) 295 mock.post("https://localhost/Groups", json={"id": generate_id()}) 296 mock.delete(f"https://localhost/Groups/{scim_id}", status_code=404) 297 self._create_stale_provider_group(scim_id) 298 299 scim_sync.send(self.provider.pk).get_result() 300 301 delete_reqs = [r for r in mock.request_history if r.method == "DELETE"] 302 self.assertEqual(len(delete_reqs), 1) 303 304 self.assertFalse( 305 SCIMProviderGroup.objects.filter(provider=self.provider, scim_id=scim_id).exists() 306 ) 307 308 @Mocker() 309 def test_sync_cleanup_stale_group_transient_error(self, mock: Mocker): 310 """Stale group cleanup logs and retries on transient HTTP errors""" 311 scim_id = generate_id() 312 mock.get("https://localhost/ServiceProviderConfig", json={}) 313 mock.post("https://localhost/Groups", json={"id": generate_id()}) 314 mock.delete(f"https://localhost/Groups/{scim_id}", status_code=429) 315 self._create_stale_provider_group(scim_id) 316 317 scim_sync.send(self.provider.pk) 318 319 delete_reqs = [r for r in mock.request_history if r.method == "DELETE"] 320 self.assertEqual(len(delete_reqs), 1) 321 322 @Mocker() 323 def test_sync_cleanup_stale_group_dry_run(self, mock: Mocker): 324 """Stale group cleanup skips HTTP DELETE in dry_run mode""" 325 self.provider.dry_run = True 326 self.provider.save() 327 scim_id = generate_id() 328 mock.get("https://localhost/ServiceProviderConfig", json={}) 329 self._create_stale_provider_group(scim_id) 330 331 scim_sync.send(self.provider.pk) 332 333 delete_reqs = [r for r in mock.request_history if r.method == "DELETE"] 334 self.assertEqual(len(delete_reqs), 0)
SCIM Group tests
@apply_blueprint('system/providers-scim.yaml')
def
setUp(self) -> None:
20 @apply_blueprint("system/providers-scim.yaml") 21 def setUp(self) -> None: 22 # Delete all users and groups as the mocked HTTP responses only return one ID 23 # which will cause errors with multiple users 24 User.objects.all().exclude_anonymous().delete() 25 Group.objects.all().delete() 26 self.provider: SCIMProvider = SCIMProvider.objects.create( 27 name=generate_id(), 28 url="https://localhost", 29 token=generate_id(), 30 ) 31 self.app: Application = Application.objects.create( 32 name=generate_id(), 33 slug=generate_id(), 34 ) 35 self.app.backchannel_providers.add(self.provider) 36 self.provider.property_mappings.set( 37 [SCIMMapping.objects.get(managed="goauthentik.io/providers/scim/user")] 38 ) 39 self.provider.property_mappings_group.set( 40 [SCIMMapping.objects.get(managed="goauthentik.io/providers/scim/group")] 41 )
Hook method for setting up the test fixture before exercising it.
@Mocker()
def
test_group_create(self, mock: requests_mock.mocker.Mocker):
43 @Mocker() 44 def test_group_create(self, mock: Mocker): 45 """Test group creation""" 46 scim_id = generate_id() 47 mock.get( 48 "https://localhost/ServiceProviderConfig", 49 json={}, 50 ) 51 mock.post( 52 "https://localhost/Groups", 53 json={ 54 "id": scim_id, 55 }, 56 ) 57 uid = generate_id() 58 group = Group.objects.create( 59 name=uid, 60 ) 61 self.assertEqual(mock.call_count, 2) 62 self.assertEqual(mock.request_history[0].method, "GET") 63 self.assertEqual(mock.request_history[1].method, "POST") 64 self.assertJSONEqual( 65 mock.request_history[1].body, 66 { 67 "schemas": ["urn:ietf:params:scim:schemas:core:2.0:Group"], 68 "externalId": str(group.pk), 69 "displayName": group.name, 70 }, 71 )
Test group creation
@Mocker()
def
test_group_create_update(self, mock: requests_mock.mocker.Mocker):
73 @Mocker() 74 def test_group_create_update(self, mock: Mocker): 75 """Test group creation and update""" 76 scim_id = generate_id() 77 mock.get( 78 "https://localhost/ServiceProviderConfig", 79 json={}, 80 ) 81 mock.post( 82 "https://localhost/Groups", 83 json={ 84 "id": scim_id, 85 }, 86 ) 87 mock.put( 88 "https://localhost/Groups", 89 json={ 90 "id": scim_id, 91 }, 92 ) 93 uid = generate_id() 94 group = Group.objects.create( 95 name=uid, 96 ) 97 self.assertEqual(mock.call_count, 2) 98 self.assertEqual(mock.request_history[0].method, "GET") 99 self.assertEqual(mock.request_history[1].method, "POST") 100 body = loads(mock.request_history[1].body) 101 with open("schemas/scim-group.schema.json", encoding="utf-8") as schema: 102 validate(body, loads(schema.read())) 103 self.assertEqual( 104 body, 105 { 106 "schemas": ["urn:ietf:params:scim:schemas:core:2.0:Group"], 107 "externalId": str(group.pk), 108 "displayName": group.name, 109 }, 110 ) 111 group.name = generate_id() 112 group.save() 113 self.assertEqual(mock.call_count, 3) 114 self.assertEqual(mock.request_history[0].method, "GET") 115 self.assertEqual(mock.request_history[1].method, "POST") 116 self.assertEqual(mock.request_history[2].method, "PUT")
Test group creation and update
@Mocker()
def
test_group_create_delete(self, mock: requests_mock.mocker.Mocker):
118 @Mocker() 119 def test_group_create_delete(self, mock: Mocker): 120 """Test group creation""" 121 scim_id = generate_id() 122 mock.get( 123 "https://localhost/ServiceProviderConfig", 124 json={}, 125 ) 126 mock.post( 127 "https://localhost/Groups", 128 json={ 129 "id": scim_id, 130 }, 131 ) 132 mock.delete(f"https://localhost/Groups/{scim_id}", status_code=204) 133 uid = generate_id() 134 group = Group.objects.create( 135 name=uid, 136 ) 137 self.assertEqual(mock.call_count, 2) 138 self.assertEqual(mock.request_history[0].method, "GET") 139 self.assertEqual(mock.request_history[1].method, "POST") 140 self.assertJSONEqual( 141 mock.request_history[1].body, 142 { 143 "schemas": ["urn:ietf:params:scim:schemas:core:2.0:Group"], 144 "externalId": str(group.pk), 145 "displayName": group.name, 146 }, 147 ) 148 group.delete() 149 self.assertEqual(mock.call_count, 3) 150 self.assertEqual(mock.request_history[0].method, "GET") 151 self.assertEqual(mock.request_history[2].method, "DELETE") 152 self.assertEqual(mock.request_history[2].url, f"https://localhost/Groups/{scim_id}")
Test group creation
@Mocker()
def
test_group_create_update_noop(self, mock: requests_mock.mocker.Mocker):
154 @Mocker() 155 def test_group_create_update_noop(self, mock: Mocker): 156 """Test group creation and noop update""" 157 scim_id = generate_id() 158 mock.get( 159 "https://localhost/ServiceProviderConfig", 160 json={}, 161 ) 162 mock.post( 163 "https://localhost/Groups", 164 json={ 165 "id": scim_id, 166 }, 167 ) 168 uid = generate_id() 169 group = Group.objects.create( 170 name=uid, 171 ) 172 self.assertEqual(mock.call_count, 2) 173 self.assertEqual(mock.request_history[0].method, "GET") 174 self.assertEqual(mock.request_history[1].method, "POST") 175 body = loads(mock.request_history[1].body) 176 with open("schemas/scim-group.schema.json", encoding="utf-8") as schema: 177 validate(body, loads(schema.read())) 178 self.assertEqual( 179 body, 180 { 181 "schemas": ["urn:ietf:params:scim:schemas:core:2.0:Group"], 182 "externalId": str(group.pk), 183 "displayName": group.name, 184 }, 185 ) 186 conn = SCIMProviderGroup.objects.filter(group=group).first() 187 conn.attributes = { 188 "id": scim_id, 189 "schemas": ["urn:ietf:params:scim:schemas:core:2.0:Group"], 190 "externalId": str(group.pk), 191 "displayName": group.name, 192 } 193 conn.save() 194 mock.get( 195 f"https://localhost/Groups/{scim_id}", 196 json={ 197 "id": scim_id, 198 "schemas": ["urn:ietf:params:scim:schemas:core:2.0:Group"], 199 "externalId": str(group.pk), 200 "displayName": group.name, 201 "members": [], 202 }, 203 ) 204 group.save() 205 self.assertEqual(mock.call_count, 3) 206 self.assertEqual(mock.request_history[0].method, "GET") 207 self.assertEqual(mock.request_history[1].method, "POST") 208 self.assertEqual(mock.request_history[2].method, "GET") 209 self.assertNotIn("PUT", [req.method for req in mock.request_history])
Test group creation and noop update
@Mocker()
def
test_discover(self, mock: requests_mock.mocker.Mocker):
211 @Mocker() 212 def test_discover(self, mock: Mocker): 213 group = Group.objects.create(name="acl_admins") 214 mock.get( 215 "https://localhost/ServiceProviderConfig", 216 json={}, 217 ) 218 mock.get( 219 "https://localhost/Groups", 220 json={ 221 "schemas": ["urn:ietf:params:scim:api:messages:2.0:ListResponse"], 222 "totalResults": 2, 223 "startIndex": 1, 224 "itemsPerPage": 1, 225 "Resources": [ 226 { 227 "schemas": ["urn:ietf:params:scim:schemas:core:2.0:Group"], 228 "id": "3", 229 "displayName": "acl_admins", 230 "meta": {"resourceType": "Group"}, 231 "members": [], 232 }, 233 ], 234 }, 235 ) 236 mock.get( 237 "https://localhost/Groups?startIndex=2", 238 json={ 239 "schemas": ["urn:ietf:params:scim:api:messages:2.0:ListResponse"], 240 "totalResults": 2, 241 "startIndex": 2, 242 "itemsPerPage": 1, 243 "Resources": [ 244 { 245 "schemas": ["urn:ietf:params:scim:schemas:core:2.0:Group"], 246 "id": "10", 247 "displayName": "test", 248 "meta": {"resourceType": "Group"}, 249 "members": [], 250 }, 251 ], 252 }, 253 ) 254 self.provider.client_for_model(Group).discover() 255 connection = SCIMProviderGroup.objects.filter(provider=self.provider, group=group).first() 256 self.assertIsNotNone(connection) 257 self.assertEqual(connection.scim_id, "3")
@Mocker()
def
test_sync_cleanup_stale_group_delete(self, mock: requests_mock.mocker.Mocker):
271 @Mocker() 272 def test_sync_cleanup_stale_group_delete(self, mock: Mocker): 273 """Stale (out-of-scope) groups are deleted during full sync cleanup""" 274 scim_id = generate_id() 275 mock.get("https://localhost/ServiceProviderConfig", json={}) 276 277 mock.post("https://localhost/Groups", json={"id": generate_id()}) 278 mock.delete(f"https://localhost/Groups/{scim_id}", status_code=204) 279 self._create_stale_provider_group(scim_id) 280 281 scim_sync.send(self.provider.pk).get_result() 282 283 delete_reqs = [r for r in mock.request_history if r.method == "DELETE"] 284 self.assertEqual(len(delete_reqs), 1) 285 self.assertEqual(delete_reqs[0].url, f"https://localhost/Groups/{scim_id}") 286 self.assertFalse( 287 SCIMProviderGroup.objects.filter(provider=self.provider, scim_id=scim_id).exists() 288 )
Stale (out-of-scope) groups are deleted during full sync cleanup
@Mocker()
def
test_sync_cleanup_stale_group_not_found(self, mock: requests_mock.mocker.Mocker):
290 @Mocker() 291 def test_sync_cleanup_stale_group_not_found(self, mock: Mocker): 292 """Stale group cleanup handles 404 from the remote gracefully""" 293 scim_id = generate_id() 294 mock.get("https://localhost/ServiceProviderConfig", json={}) 295 mock.post("https://localhost/Groups", json={"id": generate_id()}) 296 mock.delete(f"https://localhost/Groups/{scim_id}", status_code=404) 297 self._create_stale_provider_group(scim_id) 298 299 scim_sync.send(self.provider.pk).get_result() 300 301 delete_reqs = [r for r in mock.request_history if r.method == "DELETE"] 302 self.assertEqual(len(delete_reqs), 1) 303 304 self.assertFalse( 305 SCIMProviderGroup.objects.filter(provider=self.provider, scim_id=scim_id).exists() 306 )
Stale group cleanup handles 404 from the remote gracefully
@Mocker()
def
test_sync_cleanup_stale_group_transient_error(self, mock: requests_mock.mocker.Mocker):
308 @Mocker() 309 def test_sync_cleanup_stale_group_transient_error(self, mock: Mocker): 310 """Stale group cleanup logs and retries on transient HTTP errors""" 311 scim_id = generate_id() 312 mock.get("https://localhost/ServiceProviderConfig", json={}) 313 mock.post("https://localhost/Groups", json={"id": generate_id()}) 314 mock.delete(f"https://localhost/Groups/{scim_id}", status_code=429) 315 self._create_stale_provider_group(scim_id) 316 317 scim_sync.send(self.provider.pk) 318 319 delete_reqs = [r for r in mock.request_history if r.method == "DELETE"] 320 self.assertEqual(len(delete_reqs), 1)
Stale group cleanup logs and retries on transient HTTP errors
@Mocker()
def
test_sync_cleanup_stale_group_dry_run(self, mock: requests_mock.mocker.Mocker):
322 @Mocker() 323 def test_sync_cleanup_stale_group_dry_run(self, mock: Mocker): 324 """Stale group cleanup skips HTTP DELETE in dry_run mode""" 325 self.provider.dry_run = True 326 self.provider.save() 327 scim_id = generate_id() 328 mock.get("https://localhost/ServiceProviderConfig", json={}) 329 self._create_stale_provider_group(scim_id) 330 331 scim_sync.send(self.provider.pk) 332 333 delete_reqs = [r for r in mock.request_history if r.method == "DELETE"] 334 self.assertEqual(len(delete_reqs), 0)
Stale group cleanup skips HTTP DELETE in dry_run mode