authentik.providers.scim.tests.test_group

SCIM Group tests

  1"""SCIM Group tests"""
  2
  3from json import loads
  4
  5from django.test import TestCase
  6from jsonschema import validate
  7from requests_mock import Mocker
  8
  9from authentik.blueprints.tests import apply_blueprint
 10from authentik.core.models import Application, Group, User
 11from authentik.lib.generators import generate_id
 12from authentik.providers.scim.models import SCIMMapping, SCIMProvider, SCIMProviderGroup
 13from authentik.providers.scim.tasks import scim_sync
 14
 15
 16class SCIMGroupTests(TestCase):
 17    """SCIM Group tests"""
 18
 19    @apply_blueprint("system/providers-scim.yaml")
 20    def setUp(self) -> None:
 21        # Delete all users and groups as the mocked HTTP responses only return one ID
 22        # which will cause errors with multiple users
 23        User.objects.all().exclude_anonymous().delete()
 24        Group.objects.all().delete()
 25        self.provider: SCIMProvider = SCIMProvider.objects.create(
 26            name=generate_id(),
 27            url="https://localhost",
 28            token=generate_id(),
 29        )
 30        self.app: Application = Application.objects.create(
 31            name=generate_id(),
 32            slug=generate_id(),
 33        )
 34        self.app.backchannel_providers.add(self.provider)
 35        self.provider.property_mappings.set(
 36            [SCIMMapping.objects.get(managed="goauthentik.io/providers/scim/user")]
 37        )
 38        self.provider.property_mappings_group.set(
 39            [SCIMMapping.objects.get(managed="goauthentik.io/providers/scim/group")]
 40        )
 41
 42    @Mocker()
 43    def test_group_create(self, mock: Mocker):
 44        """Test group creation"""
 45        scim_id = generate_id()
 46        mock.get(
 47            "https://localhost/ServiceProviderConfig",
 48            json={},
 49        )
 50        mock.post(
 51            "https://localhost/Groups",
 52            json={
 53                "id": scim_id,
 54            },
 55        )
 56        uid = generate_id()
 57        group = Group.objects.create(
 58            name=uid,
 59        )
 60        self.assertEqual(mock.call_count, 2)
 61        self.assertEqual(mock.request_history[0].method, "GET")
 62        self.assertEqual(mock.request_history[1].method, "POST")
 63        self.assertJSONEqual(
 64            mock.request_history[1].body,
 65            {
 66                "schemas": ["urn:ietf:params:scim:schemas:core:2.0:Group"],
 67                "externalId": str(group.pk),
 68                "displayName": group.name,
 69            },
 70        )
 71
 72    @Mocker()
 73    def test_group_create_update(self, mock: Mocker):
 74        """Test group creation and update"""
 75        scim_id = generate_id()
 76        mock.get(
 77            "https://localhost/ServiceProviderConfig",
 78            json={},
 79        )
 80        mock.post(
 81            "https://localhost/Groups",
 82            json={
 83                "id": scim_id,
 84            },
 85        )
 86        mock.put(
 87            "https://localhost/Groups",
 88            json={
 89                "id": scim_id,
 90            },
 91        )
 92        uid = generate_id()
 93        group = Group.objects.create(
 94            name=uid,
 95        )
 96        self.assertEqual(mock.call_count, 2)
 97        self.assertEqual(mock.request_history[0].method, "GET")
 98        self.assertEqual(mock.request_history[1].method, "POST")
 99        body = loads(mock.request_history[1].body)
100        with open("schemas/scim-group.schema.json", encoding="utf-8") as schema:
101            validate(body, loads(schema.read()))
102        self.assertEqual(
103            body,
104            {
105                "schemas": ["urn:ietf:params:scim:schemas:core:2.0:Group"],
106                "externalId": str(group.pk),
107                "displayName": group.name,
108            },
109        )
110        group.name = generate_id()
111        group.save()
112        self.assertEqual(mock.call_count, 3)
113        self.assertEqual(mock.request_history[0].method, "GET")
114        self.assertEqual(mock.request_history[1].method, "POST")
115        self.assertEqual(mock.request_history[2].method, "PUT")
116
117    @Mocker()
118    def test_group_create_delete(self, mock: Mocker):
119        """Test group creation"""
120        scim_id = generate_id()
121        mock.get(
122            "https://localhost/ServiceProviderConfig",
123            json={},
124        )
125        mock.post(
126            "https://localhost/Groups",
127            json={
128                "id": scim_id,
129            },
130        )
131        mock.delete(f"https://localhost/Groups/{scim_id}", status_code=204)
132        uid = generate_id()
133        group = Group.objects.create(
134            name=uid,
135        )
136        self.assertEqual(mock.call_count, 2)
137        self.assertEqual(mock.request_history[0].method, "GET")
138        self.assertEqual(mock.request_history[1].method, "POST")
139        self.assertJSONEqual(
140            mock.request_history[1].body,
141            {
142                "schemas": ["urn:ietf:params:scim:schemas:core:2.0:Group"],
143                "externalId": str(group.pk),
144                "displayName": group.name,
145            },
146        )
147        group.delete()
148        self.assertEqual(mock.call_count, 3)
149        self.assertEqual(mock.request_history[0].method, "GET")
150        self.assertEqual(mock.request_history[2].method, "DELETE")
151        self.assertEqual(mock.request_history[2].url, f"https://localhost/Groups/{scim_id}")
152
153    @Mocker()
154    def test_group_create_update_noop(self, mock: Mocker):
155        """Test group creation and noop update"""
156        scim_id = generate_id()
157        mock.get(
158            "https://localhost/ServiceProviderConfig",
159            json={},
160        )
161        mock.post(
162            "https://localhost/Groups",
163            json={
164                "id": scim_id,
165            },
166        )
167        uid = generate_id()
168        group = Group.objects.create(
169            name=uid,
170        )
171        self.assertEqual(mock.call_count, 2)
172        self.assertEqual(mock.request_history[0].method, "GET")
173        self.assertEqual(mock.request_history[1].method, "POST")
174        body = loads(mock.request_history[1].body)
175        with open("schemas/scim-group.schema.json", encoding="utf-8") as schema:
176            validate(body, loads(schema.read()))
177        self.assertEqual(
178            body,
179            {
180                "schemas": ["urn:ietf:params:scim:schemas:core:2.0:Group"],
181                "externalId": str(group.pk),
182                "displayName": group.name,
183            },
184        )
185        conn = SCIMProviderGroup.objects.filter(group=group).first()
186        conn.attributes = {
187            "id": scim_id,
188            "schemas": ["urn:ietf:params:scim:schemas:core:2.0:Group"],
189            "externalId": str(group.pk),
190            "displayName": group.name,
191        }
192        conn.save()
193        mock.get(
194            f"https://localhost/Groups/{scim_id}",
195            json={
196                "id": scim_id,
197                "schemas": ["urn:ietf:params:scim:schemas:core:2.0:Group"],
198                "externalId": str(group.pk),
199                "displayName": group.name,
200                "members": [],
201            },
202        )
203        group.save()
204        self.assertEqual(mock.call_count, 3)
205        self.assertEqual(mock.request_history[0].method, "GET")
206        self.assertEqual(mock.request_history[1].method, "POST")
207        self.assertEqual(mock.request_history[2].method, "GET")
208        self.assertNotIn("PUT", [req.method for req in mock.request_history])
209
210    @Mocker()
211    def test_discover(self, mock: Mocker):
212        group = Group.objects.create(name="acl_admins")
213        mock.get(
214            "https://localhost/ServiceProviderConfig",
215            json={},
216        )
217        mock.get(
218            "https://localhost/Groups",
219            json={
220                "schemas": ["urn:ietf:params:scim:api:messages:2.0:ListResponse"],
221                "totalResults": 2,
222                "startIndex": 1,
223                "itemsPerPage": 1,
224                "Resources": [
225                    {
226                        "schemas": ["urn:ietf:params:scim:schemas:core:2.0:Group"],
227                        "id": "3",
228                        "displayName": "acl_admins",
229                        "meta": {"resourceType": "Group"},
230                        "members": [],
231                    },
232                ],
233            },
234        )
235        mock.get(
236            "https://localhost/Groups?startIndex=2",
237            json={
238                "schemas": ["urn:ietf:params:scim:api:messages:2.0:ListResponse"],
239                "totalResults": 2,
240                "startIndex": 2,
241                "itemsPerPage": 1,
242                "Resources": [
243                    {
244                        "schemas": ["urn:ietf:params:scim:schemas:core:2.0:Group"],
245                        "id": "10",
246                        "displayName": "test",
247                        "meta": {"resourceType": "Group"},
248                        "members": [],
249                    },
250                ],
251            },
252        )
253        self.provider.client_for_model(Group).discover()
254        connection = SCIMProviderGroup.objects.filter(provider=self.provider, group=group).first()
255        self.assertIsNotNone(connection)
256        self.assertEqual(connection.scim_id, "3")
257
258    def _create_stale_provider_group(self, scim_id: str) -> Group:
259        """Create a group that is outside the provider's scope (via group_filters) with an
260        existing SCIMProviderGroup, simulating a previously synced group now out of scope."""
261        self.app.backchannel_providers.remove(self.provider)
262        anchor = Group.objects.create(name=generate_id())
263        stale = Group.objects.create(name=generate_id())
264        self.app.backchannel_providers.add(self.provider)
265
266        self.provider.group_filters.set([anchor])
267        SCIMProviderGroup.objects.create(provider=self.provider, group=stale, scim_id=scim_id)
268        return stale
269
270    @Mocker()
271    def test_sync_cleanup_stale_group_delete(self, mock: Mocker):
272        """Stale (out-of-scope) groups are deleted during full sync cleanup"""
273        scim_id = generate_id()
274        mock.get("https://localhost/ServiceProviderConfig", json={})
275
276        mock.post("https://localhost/Groups", json={"id": generate_id()})
277        mock.delete(f"https://localhost/Groups/{scim_id}", status_code=204)
278        self._create_stale_provider_group(scim_id)
279
280        scim_sync.send(self.provider.pk).get_result()
281
282        delete_reqs = [r for r in mock.request_history if r.method == "DELETE"]
283        self.assertEqual(len(delete_reqs), 1)
284        self.assertEqual(delete_reqs[0].url, f"https://localhost/Groups/{scim_id}")
285        self.assertFalse(
286            SCIMProviderGroup.objects.filter(provider=self.provider, scim_id=scim_id).exists()
287        )
288
289    @Mocker()
290    def test_sync_cleanup_stale_group_not_found(self, mock: Mocker):
291        """Stale group cleanup handles 404 from the remote gracefully"""
292        scim_id = generate_id()
293        mock.get("https://localhost/ServiceProviderConfig", json={})
294        mock.post("https://localhost/Groups", json={"id": generate_id()})
295        mock.delete(f"https://localhost/Groups/{scim_id}", status_code=404)
296        self._create_stale_provider_group(scim_id)
297
298        scim_sync.send(self.provider.pk).get_result()
299
300        delete_reqs = [r for r in mock.request_history if r.method == "DELETE"]
301        self.assertEqual(len(delete_reqs), 1)
302
303        self.assertFalse(
304            SCIMProviderGroup.objects.filter(provider=self.provider, scim_id=scim_id).exists()
305        )
306
307    @Mocker()
308    def test_sync_cleanup_stale_group_transient_error(self, mock: Mocker):
309        """Stale group cleanup logs and retries on transient HTTP errors"""
310        scim_id = generate_id()
311        mock.get("https://localhost/ServiceProviderConfig", json={})
312        mock.post("https://localhost/Groups", json={"id": generate_id()})
313        mock.delete(f"https://localhost/Groups/{scim_id}", status_code=429)
314        self._create_stale_provider_group(scim_id)
315
316        scim_sync.send(self.provider.pk)
317
318        delete_reqs = [r for r in mock.request_history if r.method == "DELETE"]
319        self.assertEqual(len(delete_reqs), 1)
320
321    @Mocker()
322    def test_sync_cleanup_stale_group_dry_run(self, mock: Mocker):
323        """Stale group cleanup skips HTTP DELETE in dry_run mode"""
324        self.provider.dry_run = True
325        self.provider.save()
326        scim_id = generate_id()
327        mock.get("https://localhost/ServiceProviderConfig", json={})
328        self._create_stale_provider_group(scim_id)
329
330        scim_sync.send(self.provider.pk)
331
332        delete_reqs = [r for r in mock.request_history if r.method == "DELETE"]
333        self.assertEqual(len(delete_reqs), 0)
class SCIMGroupTests(django.test.testcases.TestCase):
 17class SCIMGroupTests(TestCase):
 18    """SCIM Group tests"""
 19
 20    @apply_blueprint("system/providers-scim.yaml")
 21    def setUp(self) -> None:
 22        # Delete all users and groups as the mocked HTTP responses only return one ID
 23        # which will cause errors with multiple users
 24        User.objects.all().exclude_anonymous().delete()
 25        Group.objects.all().delete()
 26        self.provider: SCIMProvider = SCIMProvider.objects.create(
 27            name=generate_id(),
 28            url="https://localhost",
 29            token=generate_id(),
 30        )
 31        self.app: Application = Application.objects.create(
 32            name=generate_id(),
 33            slug=generate_id(),
 34        )
 35        self.app.backchannel_providers.add(self.provider)
 36        self.provider.property_mappings.set(
 37            [SCIMMapping.objects.get(managed="goauthentik.io/providers/scim/user")]
 38        )
 39        self.provider.property_mappings_group.set(
 40            [SCIMMapping.objects.get(managed="goauthentik.io/providers/scim/group")]
 41        )
 42
 43    @Mocker()
 44    def test_group_create(self, mock: Mocker):
 45        """Test group creation"""
 46        scim_id = generate_id()
 47        mock.get(
 48            "https://localhost/ServiceProviderConfig",
 49            json={},
 50        )
 51        mock.post(
 52            "https://localhost/Groups",
 53            json={
 54                "id": scim_id,
 55            },
 56        )
 57        uid = generate_id()
 58        group = Group.objects.create(
 59            name=uid,
 60        )
 61        self.assertEqual(mock.call_count, 2)
 62        self.assertEqual(mock.request_history[0].method, "GET")
 63        self.assertEqual(mock.request_history[1].method, "POST")
 64        self.assertJSONEqual(
 65            mock.request_history[1].body,
 66            {
 67                "schemas": ["urn:ietf:params:scim:schemas:core:2.0:Group"],
 68                "externalId": str(group.pk),
 69                "displayName": group.name,
 70            },
 71        )
 72
 73    @Mocker()
 74    def test_group_create_update(self, mock: Mocker):
 75        """Test group creation and update"""
 76        scim_id = generate_id()
 77        mock.get(
 78            "https://localhost/ServiceProviderConfig",
 79            json={},
 80        )
 81        mock.post(
 82            "https://localhost/Groups",
 83            json={
 84                "id": scim_id,
 85            },
 86        )
 87        mock.put(
 88            "https://localhost/Groups",
 89            json={
 90                "id": scim_id,
 91            },
 92        )
 93        uid = generate_id()
 94        group = Group.objects.create(
 95            name=uid,
 96        )
 97        self.assertEqual(mock.call_count, 2)
 98        self.assertEqual(mock.request_history[0].method, "GET")
 99        self.assertEqual(mock.request_history[1].method, "POST")
100        body = loads(mock.request_history[1].body)
101        with open("schemas/scim-group.schema.json", encoding="utf-8") as schema:
102            validate(body, loads(schema.read()))
103        self.assertEqual(
104            body,
105            {
106                "schemas": ["urn:ietf:params:scim:schemas:core:2.0:Group"],
107                "externalId": str(group.pk),
108                "displayName": group.name,
109            },
110        )
111        group.name = generate_id()
112        group.save()
113        self.assertEqual(mock.call_count, 3)
114        self.assertEqual(mock.request_history[0].method, "GET")
115        self.assertEqual(mock.request_history[1].method, "POST")
116        self.assertEqual(mock.request_history[2].method, "PUT")
117
118    @Mocker()
119    def test_group_create_delete(self, mock: Mocker):
120        """Test group creation"""
121        scim_id = generate_id()
122        mock.get(
123            "https://localhost/ServiceProviderConfig",
124            json={},
125        )
126        mock.post(
127            "https://localhost/Groups",
128            json={
129                "id": scim_id,
130            },
131        )
132        mock.delete(f"https://localhost/Groups/{scim_id}", status_code=204)
133        uid = generate_id()
134        group = Group.objects.create(
135            name=uid,
136        )
137        self.assertEqual(mock.call_count, 2)
138        self.assertEqual(mock.request_history[0].method, "GET")
139        self.assertEqual(mock.request_history[1].method, "POST")
140        self.assertJSONEqual(
141            mock.request_history[1].body,
142            {
143                "schemas": ["urn:ietf:params:scim:schemas:core:2.0:Group"],
144                "externalId": str(group.pk),
145                "displayName": group.name,
146            },
147        )
148        group.delete()
149        self.assertEqual(mock.call_count, 3)
150        self.assertEqual(mock.request_history[0].method, "GET")
151        self.assertEqual(mock.request_history[2].method, "DELETE")
152        self.assertEqual(mock.request_history[2].url, f"https://localhost/Groups/{scim_id}")
153
154    @Mocker()
155    def test_group_create_update_noop(self, mock: Mocker):
156        """Test group creation and noop update"""
157        scim_id = generate_id()
158        mock.get(
159            "https://localhost/ServiceProviderConfig",
160            json={},
161        )
162        mock.post(
163            "https://localhost/Groups",
164            json={
165                "id": scim_id,
166            },
167        )
168        uid = generate_id()
169        group = Group.objects.create(
170            name=uid,
171        )
172        self.assertEqual(mock.call_count, 2)
173        self.assertEqual(mock.request_history[0].method, "GET")
174        self.assertEqual(mock.request_history[1].method, "POST")
175        body = loads(mock.request_history[1].body)
176        with open("schemas/scim-group.schema.json", encoding="utf-8") as schema:
177            validate(body, loads(schema.read()))
178        self.assertEqual(
179            body,
180            {
181                "schemas": ["urn:ietf:params:scim:schemas:core:2.0:Group"],
182                "externalId": str(group.pk),
183                "displayName": group.name,
184            },
185        )
186        conn = SCIMProviderGroup.objects.filter(group=group).first()
187        conn.attributes = {
188            "id": scim_id,
189            "schemas": ["urn:ietf:params:scim:schemas:core:2.0:Group"],
190            "externalId": str(group.pk),
191            "displayName": group.name,
192        }
193        conn.save()
194        mock.get(
195            f"https://localhost/Groups/{scim_id}",
196            json={
197                "id": scim_id,
198                "schemas": ["urn:ietf:params:scim:schemas:core:2.0:Group"],
199                "externalId": str(group.pk),
200                "displayName": group.name,
201                "members": [],
202            },
203        )
204        group.save()
205        self.assertEqual(mock.call_count, 3)
206        self.assertEqual(mock.request_history[0].method, "GET")
207        self.assertEqual(mock.request_history[1].method, "POST")
208        self.assertEqual(mock.request_history[2].method, "GET")
209        self.assertNotIn("PUT", [req.method for req in mock.request_history])
210
211    @Mocker()
212    def test_discover(self, mock: Mocker):
213        group = Group.objects.create(name="acl_admins")
214        mock.get(
215            "https://localhost/ServiceProviderConfig",
216            json={},
217        )
218        mock.get(
219            "https://localhost/Groups",
220            json={
221                "schemas": ["urn:ietf:params:scim:api:messages:2.0:ListResponse"],
222                "totalResults": 2,
223                "startIndex": 1,
224                "itemsPerPage": 1,
225                "Resources": [
226                    {
227                        "schemas": ["urn:ietf:params:scim:schemas:core:2.0:Group"],
228                        "id": "3",
229                        "displayName": "acl_admins",
230                        "meta": {"resourceType": "Group"},
231                        "members": [],
232                    },
233                ],
234            },
235        )
236        mock.get(
237            "https://localhost/Groups?startIndex=2",
238            json={
239                "schemas": ["urn:ietf:params:scim:api:messages:2.0:ListResponse"],
240                "totalResults": 2,
241                "startIndex": 2,
242                "itemsPerPage": 1,
243                "Resources": [
244                    {
245                        "schemas": ["urn:ietf:params:scim:schemas:core:2.0:Group"],
246                        "id": "10",
247                        "displayName": "test",
248                        "meta": {"resourceType": "Group"},
249                        "members": [],
250                    },
251                ],
252            },
253        )
254        self.provider.client_for_model(Group).discover()
255        connection = SCIMProviderGroup.objects.filter(provider=self.provider, group=group).first()
256        self.assertIsNotNone(connection)
257        self.assertEqual(connection.scim_id, "3")
258
259    def _create_stale_provider_group(self, scim_id: str) -> Group:
260        """Create a group that is outside the provider's scope (via group_filters) with an
261        existing SCIMProviderGroup, simulating a previously synced group now out of scope."""
262        self.app.backchannel_providers.remove(self.provider)
263        anchor = Group.objects.create(name=generate_id())
264        stale = Group.objects.create(name=generate_id())
265        self.app.backchannel_providers.add(self.provider)
266
267        self.provider.group_filters.set([anchor])
268        SCIMProviderGroup.objects.create(provider=self.provider, group=stale, scim_id=scim_id)
269        return stale
270
271    @Mocker()
272    def test_sync_cleanup_stale_group_delete(self, mock: Mocker):
273        """Stale (out-of-scope) groups are deleted during full sync cleanup"""
274        scim_id = generate_id()
275        mock.get("https://localhost/ServiceProviderConfig", json={})
276
277        mock.post("https://localhost/Groups", json={"id": generate_id()})
278        mock.delete(f"https://localhost/Groups/{scim_id}", status_code=204)
279        self._create_stale_provider_group(scim_id)
280
281        scim_sync.send(self.provider.pk).get_result()
282
283        delete_reqs = [r for r in mock.request_history if r.method == "DELETE"]
284        self.assertEqual(len(delete_reqs), 1)
285        self.assertEqual(delete_reqs[0].url, f"https://localhost/Groups/{scim_id}")
286        self.assertFalse(
287            SCIMProviderGroup.objects.filter(provider=self.provider, scim_id=scim_id).exists()
288        )
289
290    @Mocker()
291    def test_sync_cleanup_stale_group_not_found(self, mock: Mocker):
292        """Stale group cleanup handles 404 from the remote gracefully"""
293        scim_id = generate_id()
294        mock.get("https://localhost/ServiceProviderConfig", json={})
295        mock.post("https://localhost/Groups", json={"id": generate_id()})
296        mock.delete(f"https://localhost/Groups/{scim_id}", status_code=404)
297        self._create_stale_provider_group(scim_id)
298
299        scim_sync.send(self.provider.pk).get_result()
300
301        delete_reqs = [r for r in mock.request_history if r.method == "DELETE"]
302        self.assertEqual(len(delete_reqs), 1)
303
304        self.assertFalse(
305            SCIMProviderGroup.objects.filter(provider=self.provider, scim_id=scim_id).exists()
306        )
307
308    @Mocker()
309    def test_sync_cleanup_stale_group_transient_error(self, mock: Mocker):
310        """Stale group cleanup logs and retries on transient HTTP errors"""
311        scim_id = generate_id()
312        mock.get("https://localhost/ServiceProviderConfig", json={})
313        mock.post("https://localhost/Groups", json={"id": generate_id()})
314        mock.delete(f"https://localhost/Groups/{scim_id}", status_code=429)
315        self._create_stale_provider_group(scim_id)
316
317        scim_sync.send(self.provider.pk)
318
319        delete_reqs = [r for r in mock.request_history if r.method == "DELETE"]
320        self.assertEqual(len(delete_reqs), 1)
321
322    @Mocker()
323    def test_sync_cleanup_stale_group_dry_run(self, mock: Mocker):
324        """Stale group cleanup skips HTTP DELETE in dry_run mode"""
325        self.provider.dry_run = True
326        self.provider.save()
327        scim_id = generate_id()
328        mock.get("https://localhost/ServiceProviderConfig", json={})
329        self._create_stale_provider_group(scim_id)
330
331        scim_sync.send(self.provider.pk)
332
333        delete_reqs = [r for r in mock.request_history if r.method == "DELETE"]
334        self.assertEqual(len(delete_reqs), 0)

SCIM Group tests

@apply_blueprint('system/providers-scim.yaml')
def setUp(self) -> None:
20    @apply_blueprint("system/providers-scim.yaml")
21    def setUp(self) -> None:
22        # Delete all users and groups as the mocked HTTP responses only return one ID
23        # which will cause errors with multiple users
24        User.objects.all().exclude_anonymous().delete()
25        Group.objects.all().delete()
26        self.provider: SCIMProvider = SCIMProvider.objects.create(
27            name=generate_id(),
28            url="https://localhost",
29            token=generate_id(),
30        )
31        self.app: Application = Application.objects.create(
32            name=generate_id(),
33            slug=generate_id(),
34        )
35        self.app.backchannel_providers.add(self.provider)
36        self.provider.property_mappings.set(
37            [SCIMMapping.objects.get(managed="goauthentik.io/providers/scim/user")]
38        )
39        self.provider.property_mappings_group.set(
40            [SCIMMapping.objects.get(managed="goauthentik.io/providers/scim/group")]
41        )

Hook method for setting up the test fixture before exercising it.

@Mocker()
def test_group_create(self, mock: requests_mock.mocker.Mocker):
43    @Mocker()
44    def test_group_create(self, mock: Mocker):
45        """Test group creation"""
46        scim_id = generate_id()
47        mock.get(
48            "https://localhost/ServiceProviderConfig",
49            json={},
50        )
51        mock.post(
52            "https://localhost/Groups",
53            json={
54                "id": scim_id,
55            },
56        )
57        uid = generate_id()
58        group = Group.objects.create(
59            name=uid,
60        )
61        self.assertEqual(mock.call_count, 2)
62        self.assertEqual(mock.request_history[0].method, "GET")
63        self.assertEqual(mock.request_history[1].method, "POST")
64        self.assertJSONEqual(
65            mock.request_history[1].body,
66            {
67                "schemas": ["urn:ietf:params:scim:schemas:core:2.0:Group"],
68                "externalId": str(group.pk),
69                "displayName": group.name,
70            },
71        )

Test group creation

@Mocker()
def test_group_create_update(self, mock: requests_mock.mocker.Mocker):
 73    @Mocker()
 74    def test_group_create_update(self, mock: Mocker):
 75        """Test group creation and update"""
 76        scim_id = generate_id()
 77        mock.get(
 78            "https://localhost/ServiceProviderConfig",
 79            json={},
 80        )
 81        mock.post(
 82            "https://localhost/Groups",
 83            json={
 84                "id": scim_id,
 85            },
 86        )
 87        mock.put(
 88            "https://localhost/Groups",
 89            json={
 90                "id": scim_id,
 91            },
 92        )
 93        uid = generate_id()
 94        group = Group.objects.create(
 95            name=uid,
 96        )
 97        self.assertEqual(mock.call_count, 2)
 98        self.assertEqual(mock.request_history[0].method, "GET")
 99        self.assertEqual(mock.request_history[1].method, "POST")
100        body = loads(mock.request_history[1].body)
101        with open("schemas/scim-group.schema.json", encoding="utf-8") as schema:
102            validate(body, loads(schema.read()))
103        self.assertEqual(
104            body,
105            {
106                "schemas": ["urn:ietf:params:scim:schemas:core:2.0:Group"],
107                "externalId": str(group.pk),
108                "displayName": group.name,
109            },
110        )
111        group.name = generate_id()
112        group.save()
113        self.assertEqual(mock.call_count, 3)
114        self.assertEqual(mock.request_history[0].method, "GET")
115        self.assertEqual(mock.request_history[1].method, "POST")
116        self.assertEqual(mock.request_history[2].method, "PUT")

Test group creation and update

@Mocker()
def test_group_create_delete(self, mock: requests_mock.mocker.Mocker):
118    @Mocker()
119    def test_group_create_delete(self, mock: Mocker):
120        """Test group creation"""
121        scim_id = generate_id()
122        mock.get(
123            "https://localhost/ServiceProviderConfig",
124            json={},
125        )
126        mock.post(
127            "https://localhost/Groups",
128            json={
129                "id": scim_id,
130            },
131        )
132        mock.delete(f"https://localhost/Groups/{scim_id}", status_code=204)
133        uid = generate_id()
134        group = Group.objects.create(
135            name=uid,
136        )
137        self.assertEqual(mock.call_count, 2)
138        self.assertEqual(mock.request_history[0].method, "GET")
139        self.assertEqual(mock.request_history[1].method, "POST")
140        self.assertJSONEqual(
141            mock.request_history[1].body,
142            {
143                "schemas": ["urn:ietf:params:scim:schemas:core:2.0:Group"],
144                "externalId": str(group.pk),
145                "displayName": group.name,
146            },
147        )
148        group.delete()
149        self.assertEqual(mock.call_count, 3)
150        self.assertEqual(mock.request_history[0].method, "GET")
151        self.assertEqual(mock.request_history[2].method, "DELETE")
152        self.assertEqual(mock.request_history[2].url, f"https://localhost/Groups/{scim_id}")

Test group creation

@Mocker()
def test_group_create_update_noop(self, mock: requests_mock.mocker.Mocker):
154    @Mocker()
155    def test_group_create_update_noop(self, mock: Mocker):
156        """Test group creation and noop update"""
157        scim_id = generate_id()
158        mock.get(
159            "https://localhost/ServiceProviderConfig",
160            json={},
161        )
162        mock.post(
163            "https://localhost/Groups",
164            json={
165                "id": scim_id,
166            },
167        )
168        uid = generate_id()
169        group = Group.objects.create(
170            name=uid,
171        )
172        self.assertEqual(mock.call_count, 2)
173        self.assertEqual(mock.request_history[0].method, "GET")
174        self.assertEqual(mock.request_history[1].method, "POST")
175        body = loads(mock.request_history[1].body)
176        with open("schemas/scim-group.schema.json", encoding="utf-8") as schema:
177            validate(body, loads(schema.read()))
178        self.assertEqual(
179            body,
180            {
181                "schemas": ["urn:ietf:params:scim:schemas:core:2.0:Group"],
182                "externalId": str(group.pk),
183                "displayName": group.name,
184            },
185        )
186        conn = SCIMProviderGroup.objects.filter(group=group).first()
187        conn.attributes = {
188            "id": scim_id,
189            "schemas": ["urn:ietf:params:scim:schemas:core:2.0:Group"],
190            "externalId": str(group.pk),
191            "displayName": group.name,
192        }
193        conn.save()
194        mock.get(
195            f"https://localhost/Groups/{scim_id}",
196            json={
197                "id": scim_id,
198                "schemas": ["urn:ietf:params:scim:schemas:core:2.0:Group"],
199                "externalId": str(group.pk),
200                "displayName": group.name,
201                "members": [],
202            },
203        )
204        group.save()
205        self.assertEqual(mock.call_count, 3)
206        self.assertEqual(mock.request_history[0].method, "GET")
207        self.assertEqual(mock.request_history[1].method, "POST")
208        self.assertEqual(mock.request_history[2].method, "GET")
209        self.assertNotIn("PUT", [req.method for req in mock.request_history])

Test group creation and noop update

@Mocker()
def test_discover(self, mock: requests_mock.mocker.Mocker):
211    @Mocker()
212    def test_discover(self, mock: Mocker):
213        group = Group.objects.create(name="acl_admins")
214        mock.get(
215            "https://localhost/ServiceProviderConfig",
216            json={},
217        )
218        mock.get(
219            "https://localhost/Groups",
220            json={
221                "schemas": ["urn:ietf:params:scim:api:messages:2.0:ListResponse"],
222                "totalResults": 2,
223                "startIndex": 1,
224                "itemsPerPage": 1,
225                "Resources": [
226                    {
227                        "schemas": ["urn:ietf:params:scim:schemas:core:2.0:Group"],
228                        "id": "3",
229                        "displayName": "acl_admins",
230                        "meta": {"resourceType": "Group"},
231                        "members": [],
232                    },
233                ],
234            },
235        )
236        mock.get(
237            "https://localhost/Groups?startIndex=2",
238            json={
239                "schemas": ["urn:ietf:params:scim:api:messages:2.0:ListResponse"],
240                "totalResults": 2,
241                "startIndex": 2,
242                "itemsPerPage": 1,
243                "Resources": [
244                    {
245                        "schemas": ["urn:ietf:params:scim:schemas:core:2.0:Group"],
246                        "id": "10",
247                        "displayName": "test",
248                        "meta": {"resourceType": "Group"},
249                        "members": [],
250                    },
251                ],
252            },
253        )
254        self.provider.client_for_model(Group).discover()
255        connection = SCIMProviderGroup.objects.filter(provider=self.provider, group=group).first()
256        self.assertIsNotNone(connection)
257        self.assertEqual(connection.scim_id, "3")
@Mocker()
def test_sync_cleanup_stale_group_delete(self, mock: requests_mock.mocker.Mocker):
271    @Mocker()
272    def test_sync_cleanup_stale_group_delete(self, mock: Mocker):
273        """Stale (out-of-scope) groups are deleted during full sync cleanup"""
274        scim_id = generate_id()
275        mock.get("https://localhost/ServiceProviderConfig", json={})
276
277        mock.post("https://localhost/Groups", json={"id": generate_id()})
278        mock.delete(f"https://localhost/Groups/{scim_id}", status_code=204)
279        self._create_stale_provider_group(scim_id)
280
281        scim_sync.send(self.provider.pk).get_result()
282
283        delete_reqs = [r for r in mock.request_history if r.method == "DELETE"]
284        self.assertEqual(len(delete_reqs), 1)
285        self.assertEqual(delete_reqs[0].url, f"https://localhost/Groups/{scim_id}")
286        self.assertFalse(
287            SCIMProviderGroup.objects.filter(provider=self.provider, scim_id=scim_id).exists()
288        )

Stale (out-of-scope) groups are deleted during full sync cleanup

@Mocker()
def test_sync_cleanup_stale_group_not_found(self, mock: requests_mock.mocker.Mocker):
290    @Mocker()
291    def test_sync_cleanup_stale_group_not_found(self, mock: Mocker):
292        """Stale group cleanup handles 404 from the remote gracefully"""
293        scim_id = generate_id()
294        mock.get("https://localhost/ServiceProviderConfig", json={})
295        mock.post("https://localhost/Groups", json={"id": generate_id()})
296        mock.delete(f"https://localhost/Groups/{scim_id}", status_code=404)
297        self._create_stale_provider_group(scim_id)
298
299        scim_sync.send(self.provider.pk).get_result()
300
301        delete_reqs = [r for r in mock.request_history if r.method == "DELETE"]
302        self.assertEqual(len(delete_reqs), 1)
303
304        self.assertFalse(
305            SCIMProviderGroup.objects.filter(provider=self.provider, scim_id=scim_id).exists()
306        )

Stale group cleanup handles 404 from the remote gracefully

@Mocker()
def test_sync_cleanup_stale_group_transient_error(self, mock: requests_mock.mocker.Mocker):
308    @Mocker()
309    def test_sync_cleanup_stale_group_transient_error(self, mock: Mocker):
310        """Stale group cleanup logs and retries on transient HTTP errors"""
311        scim_id = generate_id()
312        mock.get("https://localhost/ServiceProviderConfig", json={})
313        mock.post("https://localhost/Groups", json={"id": generate_id()})
314        mock.delete(f"https://localhost/Groups/{scim_id}", status_code=429)
315        self._create_stale_provider_group(scim_id)
316
317        scim_sync.send(self.provider.pk)
318
319        delete_reqs = [r for r in mock.request_history if r.method == "DELETE"]
320        self.assertEqual(len(delete_reqs), 1)

Stale group cleanup logs and retries on transient HTTP errors

@Mocker()
def test_sync_cleanup_stale_group_dry_run(self, mock: requests_mock.mocker.Mocker):
322    @Mocker()
323    def test_sync_cleanup_stale_group_dry_run(self, mock: Mocker):
324        """Stale group cleanup skips HTTP DELETE in dry_run mode"""
325        self.provider.dry_run = True
326        self.provider.save()
327        scim_id = generate_id()
328        mock.get("https://localhost/ServiceProviderConfig", json={})
329        self._create_stale_provider_group(scim_id)
330
331        scim_sync.send(self.provider.pk)
332
333        delete_reqs = [r for r in mock.request_history if r.method == "DELETE"]
334        self.assertEqual(len(delete_reqs), 0)

Stale group cleanup skips HTTP DELETE in dry_run mode