authentik.sources.ldap.tests.test_sync

LDAP Source tests

  1"""LDAP Source tests"""
  2
  3from unittest.mock import MagicMock, patch
  4
  5from django.db.models import Q
  6from django.test import TestCase
  7from ldap3.core.exceptions import LDAPInvalidFilterError
  8from ldap3.utils.conv import escape_filter_chars
  9
 10from authentik.blueprints.tests import apply_blueprint
 11from authentik.core.models import Group, User
 12from authentik.core.tests.utils import create_test_admin_user
 13from authentik.events.models import Event, EventAction
 14from authentik.lib.generators import generate_id, generate_key
 15from authentik.lib.sync.outgoing.exceptions import StopSync
 16from authentik.lib.utils.reflection import class_to_path
 17from authentik.sources.ldap.models import (
 18    GroupLDAPSourceConnection,
 19    LDAPSource,
 20    LDAPSourcePropertyMapping,
 21    UserLDAPSourceConnection,
 22)
 23from authentik.sources.ldap.sync.forward_delete_users import DELETE_CHUNK_SIZE
 24from authentik.sources.ldap.sync.groups import GroupLDAPSynchronizer
 25from authentik.sources.ldap.sync.membership import MembershipLDAPSynchronizer
 26from authentik.sources.ldap.sync.users import UserLDAPSynchronizer
 27from authentik.sources.ldap.tasks import ldap_sync, ldap_sync_page
 28from authentik.sources.ldap.tests.mock_ad import mock_ad_connection
 29from authentik.sources.ldap.tests.mock_freeipa import mock_freeipa_connection
 30from authentik.sources.ldap.tests.mock_slapd import (
 31    group_in_slapd_cn,
 32    group_in_slapd_uid,
 33    mock_slapd_connection,
 34    user_in_slapd_cn,
 35    user_in_slapd_uid,
 36)
 37from authentik.tasks.models import Task
 38
 39LDAP_PASSWORD = generate_key()
 40
 41
 42class LDAPSyncTests(TestCase):
 43    """LDAP Sync tests"""
 44
 45    @apply_blueprint("system/sources-ldap.yaml")
 46    def setUp(self):
 47        self.source: LDAPSource = LDAPSource.objects.create(
 48            name="ldap",
 49            slug="ldap",
 50            base_dn="dc=goauthentik,dc=io",
 51            additional_user_dn="ou=users",
 52            additional_group_dn="ou=groups",
 53        )
 54
 55    def test_sync_missing_page(self):
 56        """Test sync with missing page"""
 57        connection = MagicMock(return_value=mock_ad_connection())
 58        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
 59            ldap_sync_page.send(self.source.pk, class_to_path(UserLDAPSynchronizer), "foo")
 60
 61    def test_sync_error(self):
 62        """Test user sync"""
 63        self.source.base_dn = "dc=t,dc=goauthentik,dc=io"
 64        self.source.additional_user_dn = ""
 65        self.source.additional_group_dn = ""
 66        self.source.save()
 67        self.source.user_property_mappings.set(
 68            LDAPSourcePropertyMapping.objects.filter(
 69                Q(managed__startswith="goauthentik.io/sources/ldap/default")
 70                | Q(managed__startswith="goauthentik.io/sources/ldap/ms")
 71            )
 72        )
 73        mapping = LDAPSourcePropertyMapping.objects.create(
 74            name="name",
 75            expression="q",
 76        )
 77        self.source.user_property_mappings.set([mapping])
 78        self.source.save()
 79        connection = MagicMock(return_value=mock_ad_connection())
 80        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
 81            user_sync = UserLDAPSynchronizer(self.source, Task())
 82            with self.assertRaises(StopSync):
 83                user_sync.sync_full()
 84            self.assertFalse(User.objects.filter(username="user0_sn").exists())
 85            self.assertFalse(User.objects.filter(username="user1_sn").exists())
 86        events = Event.objects.filter(
 87            action=EventAction.CONFIGURATION_ERROR,
 88            context__message="Failed to evaluate property mapping: 'name'",
 89            context__mapping__pk=mapping.pk.hex,
 90        )
 91        self.assertTrue(events.exists())
 92
 93    def test_sync_mapping(self):
 94        """Test property mappings"""
 95        none = LDAPSourcePropertyMapping.objects.create(
 96            name=generate_id(), expression="return None"
 97        )
 98        byte_mapping = LDAPSourcePropertyMapping.objects.create(
 99            name=generate_id(), expression="return b''"
100        )
101        self.source.user_property_mappings.set(
102            LDAPSourcePropertyMapping.objects.filter(
103                Q(managed__startswith="goauthentik.io/sources/ldap/default")
104                | Q(managed__startswith="goauthentik.io/sources/ldap/ms")
105            )
106        )
107        self.source.user_property_mappings.add(none, byte_mapping)
108        connection = MagicMock(return_value=mock_ad_connection())
109
110        # we basically just test that the mappings don't throw errors
111        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
112            user_sync = UserLDAPSynchronizer(self.source, Task())
113            user_sync.sync_full()
114
115    def test_sync_users_ad(self):
116        """Test user sync"""
117        self.source.base_dn = "dc=t,dc=goauthentik,dc=io"
118        self.source.additional_user_dn = ""
119        self.source.additional_group_dn = ""
120        self.source.save()
121        self.source.user_property_mappings.set(
122            LDAPSourcePropertyMapping.objects.filter(
123                Q(managed__startswith="goauthentik.io/sources/ldap/default")
124                | Q(managed__startswith="goauthentik.io/sources/ldap/ms")
125            )
126        )
127        connection = MagicMock(return_value=mock_ad_connection())
128
129        # Create the user beforehand so we can set attributes and check they aren't removed
130        user = User.objects.create(
131            username="erin.h",
132            attributes={
133                "ldap_uniq": "S-1-5-21-1955698215-2946288202-2760262721-1114",
134                "foo": "bar",
135            },
136        )
137
138        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
139            user_sync = UserLDAPSynchronizer(self.source, Task())
140            user_sync.sync_full()
141
142            user.refresh_from_db()
143            self.assertEqual(user.name, "Erin M. Hagens")
144            self.assertEqual(user.attributes["foo"], "bar")
145            self.assertTrue(user.is_active)
146            self.assertEqual(user.path, "goauthentik.io/sources/ldap/ak-test")
147
148            deactivated = User.objects.filter(username="deactivated.a").first()
149            self.assertIsNotNone(deactivated)
150            self.assertFalse(deactivated.is_active)
151
152    def test_sync_users_openldap(self):
153        """Test user sync"""
154        self.source.object_uniqueness_field = "uid"
155        self.source.user_property_mappings.set(
156            LDAPSourcePropertyMapping.objects.filter(
157                Q(managed__startswith="goauthentik.io/sources/ldap/default")
158                | Q(managed__startswith="goauthentik.io/sources/ldap/openldap")
159            )
160        )
161        connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD))
162        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
163            user_sync = UserLDAPSynchronizer(self.source, Task())
164            user_sync.sync_full()
165            self.assertTrue(User.objects.filter(username="user0_sn").exists())
166            self.assertFalse(User.objects.filter(username="user1_sn").exists())
167
168    def test_sync_users_freeipa_ish(self):
169        """Test user sync (FreeIPA-ish), mainly testing vendor quirks"""
170        self.source.object_uniqueness_field = "uid"
171        self.source.user_property_mappings.set(
172            LDAPSourcePropertyMapping.objects.filter(
173                Q(managed__startswith="goauthentik.io/sources/ldap/default")
174                | Q(managed__startswith="goauthentik.io/sources/ldap/openldap")
175            )
176        )
177        connection = MagicMock(return_value=mock_freeipa_connection(LDAP_PASSWORD))
178        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
179            user_sync = UserLDAPSynchronizer(self.source, Task())
180            user_sync.sync_full()
181            self.assertTrue(User.objects.filter(username="user0_sn").exists())
182            self.assertFalse(User.objects.filter(username="user1_sn").exists())
183            self.assertFalse(User.objects.get(username="user-nsaccountlock").is_active)
184
185    def test_sync_groups_freeipa_memberOf(self):
186        """Test group sync when membership is derived from memberOf user attribute"""
187        self.source.object_uniqueness_field = "uid"
188        self.source.group_object_filter = "(objectClass=groupOfNames)"
189        self.source.lookup_groups_from_user = True
190        self.source.group_membership_field = "memberOf"
191        self.source.user_property_mappings.set(
192            LDAPSourcePropertyMapping.objects.filter(
193                Q(managed__startswith="goauthentik.io/sources/ldap/default")
194                | Q(managed__startswith="goauthentik.io/sources/ldap/openldap")
195            )
196        )
197        self.source.group_property_mappings.set(
198            LDAPSourcePropertyMapping.objects.filter(
199                managed="goauthentik.io/sources/ldap/openldap-cn"
200            )
201        )
202        connection = MagicMock(return_value=mock_freeipa_connection(LDAP_PASSWORD))
203        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
204            user_sync = UserLDAPSynchronizer(self.source, Task())
205            user_sync.sync_full()
206            group_sync = GroupLDAPSynchronizer(self.source, Task())
207            group_sync.sync_full()
208            membership_sync = MembershipLDAPSynchronizer(self.source, Task())
209            membership_sync.sync_full()
210
211            self.assertTrue(
212                User.objects.filter(username="user4_sn").exists(), "User does not exist"
213            )
214            # Test if membership mapping based on memberOf works.
215            memberof_group = Group.objects.filter(name="reverse-lookup-group")
216            self.assertTrue(memberof_group.exists(), "Group does not exist")
217            self.assertTrue(
218                memberof_group.first().users.filter(username="user4_sn").exists(),
219                "User not a member of the group",
220            )
221
222    def test_sync_groups_ad(self):
223        """Test group sync"""
224        self.source.base_dn = "dc=t,dc=goauthentik,dc=io"
225        self.source.additional_user_dn = ""
226        self.source.additional_group_dn = ""
227        self.source.save()
228        self.source.user_property_mappings.set(
229            LDAPSourcePropertyMapping.objects.filter(
230                Q(managed__startswith="goauthentik.io/sources/ldap/default")
231                | Q(managed__startswith="goauthentik.io/sources/ldap/ms")
232            )
233        )
234        self.source.group_property_mappings.set(
235            LDAPSourcePropertyMapping.objects.filter(
236                managed="goauthentik.io/sources/ldap/default-name"
237            )
238        )
239        connection = MagicMock(return_value=mock_ad_connection())
240        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
241            _user = create_test_admin_user()
242            parent_group = Group.objects.get(name=_user.username)
243            self.source.sync_parent_group = parent_group
244            self.source.save()
245            group_sync = GroupLDAPSynchronizer(self.source, Task())
246            group_sync.sync_full()
247            membership_sync = MembershipLDAPSynchronizer(self.source, Task())
248            membership_sync.sync_full()
249            group: Group = Group.objects.filter(name="Test Group").first()
250            self.assertIsNotNone(group)
251            self.assertEqual(group.parents.first(), parent_group)
252
253    def test_sync_groups_openldap(self):
254        """Test group sync"""
255        self.source.object_uniqueness_field = "uid"
256        self.source.group_object_filter = "(objectClass=groupOfNames)"
257        self.source.user_property_mappings.set(
258            LDAPSourcePropertyMapping.objects.filter(
259                Q(managed__startswith="goauthentik.io/sources/ldap/default")
260                | Q(managed__startswith="goauthentik.io/sources/ldap/openldap")
261            )
262        )
263        self.source.group_property_mappings.set(
264            LDAPSourcePropertyMapping.objects.filter(
265                managed="goauthentik.io/sources/ldap/openldap-cn"
266            )
267        )
268        connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD))
269        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
270            self.source.save()
271            group_sync = GroupLDAPSynchronizer(self.source, Task())
272            group_sync.sync_full()
273            membership_sync = MembershipLDAPSynchronizer(self.source, Task())
274            membership_sync.sync_full()
275            group = Group.objects.filter(name="group1")
276            self.assertTrue(group.exists())
277
278    def test_sync_groups_openldap_posix_group(self):
279        """Test posix group sync"""
280        self.source.object_uniqueness_field = "cn"
281        self.source.group_membership_field = "memberUid"
282        self.source.user_object_filter = "(objectClass=posixAccount)"
283        self.source.group_object_filter = "(objectClass=posixGroup)"
284        self.source.user_membership_attribute = "uid"
285        self.source.user_property_mappings.set(
286            [
287                *LDAPSourcePropertyMapping.objects.filter(
288                    Q(managed__startswith="goauthentik.io/sources/ldap/default")
289                    | Q(managed__startswith="goauthentik.io/sources/ldap/openldap")
290                ).all(),
291                LDAPSourcePropertyMapping.objects.create(
292                    name="name",
293                    expression='return {"attributes": {"uid": list_flatten(ldap.get("uid"))}}',
294                ),
295            ]
296        )
297        self.source.group_property_mappings.set(
298            LDAPSourcePropertyMapping.objects.filter(
299                managed="goauthentik.io/sources/ldap/openldap-cn"
300            )
301        )
302        connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD))
303        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
304            self.source.save()
305            user_sync = UserLDAPSynchronizer(self.source, Task())
306            user_sync.sync_full()
307            group_sync = GroupLDAPSynchronizer(self.source, Task())
308            group_sync.sync_full()
309            membership_sync = MembershipLDAPSynchronizer(self.source, Task())
310            membership_sync.sync_full()
311            # Test if membership mapping based on memberUid works.
312            posix_group = Group.objects.filter(name="group-posix").first()
313            self.assertTrue(posix_group.users.filter(name="user-posix").exists())
314
315    def test_sync_groups_openldap_posix_group_nonstandard_membership_attribute(self):
316        """Test posix group sync"""
317        self.source.object_uniqueness_field = "cn"
318        self.source.group_membership_field = "memberUid"
319        self.source.user_object_filter = "(objectClass=posixAccount)"
320        self.source.group_object_filter = "(objectClass=posixGroup)"
321        self.source.user_membership_attribute = "cn"
322        self.source.user_property_mappings.set(
323            [
324                *LDAPSourcePropertyMapping.objects.filter(
325                    Q(managed__startswith="goauthentik.io/sources/ldap/default")
326                    | Q(managed__startswith="goauthentik.io/sources/ldap/openldap")
327                ).all(),
328                LDAPSourcePropertyMapping.objects.create(
329                    name="name",
330                    expression='return {"attributes": {"cn": list_flatten(ldap.get("cn"))}}',
331                ),
332            ]
333        )
334        self.source.group_property_mappings.set(
335            LDAPSourcePropertyMapping.objects.filter(
336                managed="goauthentik.io/sources/ldap/openldap-cn"
337            )
338        )
339        connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD))
340        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
341            self.source.save()
342            user_sync = UserLDAPSynchronizer(self.source, Task())
343            user_sync.sync_full()
344            group_sync = GroupLDAPSynchronizer(self.source, Task())
345            group_sync.sync_full()
346            membership_sync = MembershipLDAPSynchronizer(self.source, Task())
347            membership_sync.sync_full()
348            # Test if membership mapping based on memberUid works.
349            posix_group = Group.objects.filter(name="group-posix").first()
350            self.assertTrue(posix_group.users.filter(name="user-posix").exists())
351
352    def test_tasks_ad(self):
353        """Test Scheduled tasks"""
354        self.source.user_property_mappings.set(
355            LDAPSourcePropertyMapping.objects.filter(
356                Q(managed__startswith="goauthentik.io/sources/ldap/default")
357                | Q(managed__startswith="goauthentik.io/sources/ldap/ms")
358            )
359        )
360        self.source.save()
361        connection = MagicMock(return_value=mock_ad_connection())
362        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
363            ldap_sync.send(self.source.pk)
364
365    def test_tasks_openldap(self):
366        """Test Scheduled tasks"""
367        self.source.object_uniqueness_field = "uid"
368        self.source.group_object_filter = "(objectClass=groupOfNames)"
369        self.source.user_property_mappings.set(
370            LDAPSourcePropertyMapping.objects.filter(
371                Q(managed__startswith="goauthentik.io/sources/ldap/default")
372                | Q(managed__startswith="goauthentik.io/sources/ldap/openldap")
373            )
374        )
375        self.source.save()
376        connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD))
377        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
378            ldap_sync.send(self.source.pk)
379
380    def test_user_deletion(self):
381        """Test user deletion"""
382        user = User.objects.create_user(username="not-in-the-source")
383        UserLDAPSourceConnection.objects.create(
384            user=user, source=self.source, identifier="not-in-the-source"
385        )
386        self.source.object_uniqueness_field = "uid"
387        self.source.group_object_filter = "(objectClass=groupOfNames)"
388        self.source.delete_not_found_objects = True
389        self.source.save()
390
391        connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD))
392        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
393            ldap_sync.send(self.source.pk)
394        self.assertFalse(User.objects.filter(username="not-in-the-source").exists())
395
396    def test_user_deletion_still_in_source(self):
397        """Test that user is not deleted if it's still in the source"""
398        username = user_in_slapd_cn
399        identifier = user_in_slapd_uid
400        user = User.objects.create_user(username=username)
401        UserLDAPSourceConnection.objects.create(
402            user=user, source=self.source, identifier=identifier
403        )
404        self.source.object_uniqueness_field = "uid"
405        self.source.group_object_filter = "(objectClass=groupOfNames)"
406        self.source.delete_not_found_objects = True
407        self.source.save()
408
409        connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD))
410        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
411            ldap_sync.send(self.source.pk)
412        self.assertTrue(User.objects.filter(username=username).exists())
413
414    def test_user_deletion_no_sync(self):
415        """Test that user is not deleted if sync_users is False"""
416        user = User.objects.create_user(username="not-in-the-source")
417        UserLDAPSourceConnection.objects.create(
418            user=user, source=self.source, identifier="not-in-the-source"
419        )
420        self.source.object_uniqueness_field = "uid"
421        self.source.group_object_filter = "(objectClass=groupOfNames)"
422        self.source.delete_not_found_objects = True
423        self.source.sync_users = False
424        self.source.save()
425
426        connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD))
427        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
428            ldap_sync.send(self.source.pk)
429        self.assertTrue(User.objects.filter(username="not-in-the-source").exists())
430
431    def test_user_deletion_no_delete(self):
432        """Test that user is not deleted if delete_not_found_objects is False"""
433        user = User.objects.create_user(username="not-in-the-source")
434        UserLDAPSourceConnection.objects.create(
435            user=user, source=self.source, identifier="not-in-the-source"
436        )
437        self.source.object_uniqueness_field = "uid"
438        self.source.group_object_filter = "(objectClass=groupOfNames)"
439        self.source.save()
440
441        connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD))
442        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
443            ldap_sync.send(self.source.pk)
444        self.assertTrue(User.objects.filter(username="not-in-the-source").exists())
445
446    def test_group_deletion(self):
447        """Test group deletion"""
448        group = Group.objects.create(name="not-in-the-source")
449        GroupLDAPSourceConnection.objects.create(
450            group=group, source=self.source, identifier="not-in-the-source"
451        )
452        self.source.object_uniqueness_field = "uid"
453        self.source.group_object_filter = "(objectClass=groupOfNames)"
454        self.source.delete_not_found_objects = True
455        self.source.save()
456
457        connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD))
458        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
459            ldap_sync.send(self.source.pk)
460        self.assertFalse(Group.objects.filter(name="not-in-the-source").exists())
461
462    def test_group_deletion_still_in_source(self):
463        """Test that group is not deleted if it's still in the source"""
464        groupname = group_in_slapd_cn
465        identifier = group_in_slapd_uid
466        group = Group.objects.create(name=groupname)
467        GroupLDAPSourceConnection.objects.create(
468            group=group, source=self.source, identifier=identifier
469        )
470        self.source.object_uniqueness_field = "uid"
471        self.source.group_object_filter = "(objectClass=groupOfNames)"
472        self.source.delete_not_found_objects = True
473        self.source.save()
474
475        connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD))
476        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
477            ldap_sync.send(self.source.pk)
478        self.assertTrue(Group.objects.filter(name=groupname).exists())
479
480    def test_group_deletion_no_sync(self):
481        """Test that group is not deleted if sync_groups is False"""
482        group = Group.objects.create(name="not-in-the-source")
483        GroupLDAPSourceConnection.objects.create(
484            group=group, source=self.source, identifier="not-in-the-source"
485        )
486        self.source.object_uniqueness_field = "uid"
487        self.source.group_object_filter = "(objectClass=groupOfNames)"
488        self.source.delete_not_found_objects = True
489        self.source.sync_groups = False
490        self.source.save()
491
492        connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD))
493        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
494            ldap_sync.send(self.source.pk)
495        self.assertTrue(Group.objects.filter(name="not-in-the-source").exists())
496
497    def test_group_deletion_no_delete(self):
498        """Test that group is not deleted if delete_not_found_objects is False"""
499        group = Group.objects.create(name="not-in-the-source")
500        GroupLDAPSourceConnection.objects.create(
501            group=group, source=self.source, identifier="not-in-the-source"
502        )
503        self.source.object_uniqueness_field = "uid"
504        self.source.group_object_filter = "(objectClass=groupOfNames)"
505        self.source.save()
506
507        connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD))
508        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
509            ldap_sync.send(self.source.pk)
510        self.assertTrue(Group.objects.filter(name="not-in-the-source").exists())
511
512    def test_batch_deletion(self):
513        """Test batch deletion"""
514        BATCH_SIZE = DELETE_CHUNK_SIZE + 1
515        for i in range(BATCH_SIZE):
516            user = User.objects.create_user(username=f"not-in-the-source-{i}")
517            group = Group.objects.create(name=f"not-in-the-source-{i}")
518            group.users.add(user)
519            UserLDAPSourceConnection.objects.create(
520                user=user, source=self.source, identifier=f"not-in-the-source-{i}-user"
521            )
522            GroupLDAPSourceConnection.objects.create(
523                group=group, source=self.source, identifier=f"not-in-the-source-{i}-group"
524            )
525        self.source.object_uniqueness_field = "uid"
526        self.source.group_object_filter = "(objectClass=groupOfNames)"
527        self.source.delete_not_found_objects = True
528        self.source.save()
529
530        connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD))
531        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
532            ldap_sync.send(self.source.pk)
533
534        self.assertFalse(User.objects.filter(username__startswith="not-in-the-source").exists())
535        self.assertFalse(Group.objects.filter(name__startswith="not-in-the-source").exists())
536
537    def test_membership_sync_special_chars_in_group_dn(self):
538        """Test membership synchronization with special characters in group DN"""
539        self.source.object_uniqueness_field = "uid"
540        self.source.group_object_filter = "(objectClass=groupOfNames)"
541        self.source.lookup_groups_from_user = True
542        self.source.group_membership_field = "memberOf"
543
544        # Mock connection with group DN containing special characters
545        mock_conn = MagicMock()
546
547        # Simulate group with special characters in DN: parentheses, backslashes, asterisks
548        special_group_dn = "cn=test(group),ou=groups,dc=example,dc=com"
549        backslash_group_dn = "cn=test\\group,ou=groups,dc=example,dc=com"
550        asterisk_group_dn = "cn=test*group,ou=groups,dc=example,dc=com"
551
552        # Mock the paged_search method that would be called with the filter
553        mock_standard = MagicMock()
554        mock_conn.extend.standard = mock_standard
555
556        # Test case 1: Group DN with parentheses
557        with patch("authentik.sources.ldap.models.LDAPSource.connection", return_value=mock_conn):
558            membership_sync = MembershipLDAPSynchronizer(self.source, Task())
559
560            # Simulate group data with special characters in DN
561            page_data = [{"dn": special_group_dn}]
562
563            # This should not raise LDAPInvalidFilterError anymore
564            try:
565                membership_sync.sync(page_data)
566                # Verify that the filter was properly escaped
567                # The call should have been made with escaped characters
568                mock_standard.paged_search.assert_called()
569                call_args = mock_standard.paged_search.call_args
570                search_filter = call_args[1]["search_filter"]
571                # The parentheses should be escaped as \28 and \29
572                self.assertIn("\\28", search_filter)  # Escaped (
573                self.assertIn("\\29", search_filter)  # Escaped )
574            except LDAPInvalidFilterError:
575                self.fail("LDAPInvalidFilterError should not be raised with escaped filter")
576
577        # Test case 2: Group DN with backslashes
578        with patch("authentik.sources.ldap.models.LDAPSource.connection", return_value=mock_conn):
579            membership_sync = MembershipLDAPSynchronizer(self.source, Task())
580            page_data = [{"dn": backslash_group_dn}]
581
582            try:
583                membership_sync.sync(page_data)
584                call_args = mock_standard.paged_search.call_args
585                search_filter = call_args[1]["search_filter"]
586                # The backslash should be escaped as \5c
587                self.assertIn("\\5c", search_filter)  # Escaped \
588            except LDAPInvalidFilterError:
589                self.fail("LDAPInvalidFilterError should not be raised with escaped filter")
590
591        # Test case 3: Group DN with asterisks
592        with patch("authentik.sources.ldap.models.LDAPSource.connection", return_value=mock_conn):
593            membership_sync = MembershipLDAPSynchronizer(self.source, Task())
594            page_data = [{"dn": asterisk_group_dn}]
595
596            try:
597                membership_sync.sync(page_data)
598                call_args = mock_standard.paged_search.call_args
599                search_filter = call_args[1]["search_filter"]
600                # The asterisk should be escaped as \2a
601                self.assertIn("\\2a", search_filter)  # Escaped *
602            except LDAPInvalidFilterError:
603                self.fail("LDAPInvalidFilterError should not be raised with escaped filter")
604
605    def test_escape_filter_chars_function(self):
606        """Test the escape_filter_chars function directly"""
607
608        # Test various special characters that need escaping
609        test_cases = [
610            ("test(group)", "test\\28group\\29"),  # parentheses
611            ("test\\group", "test\\5cgroup"),  # backslash
612            ("test*group", "test\\2agroup"),  # asterisk
613            ("test(*)group", "test\\28\\2a\\29group"),  # multiple special chars
614            ("normalgroup", "normalgroup"),  # no special chars
615            ("", ""),  # empty string
616        ]
617
618        for input_str, expected in test_cases:
619            with self.subTest(input_str=input_str):
620                result = escape_filter_chars(input_str)
621                self.assertEqual(result, expected)
LDAP_PASSWORD = '.iAh(*NZp@d_kDrp"pea)0P2wlm4#iAz39zroryvpg`@6v5,Tmh8Nd7}XdRsk!UJj0W2Eai\'Oi=w_r$zxX:>.@FYD}<Fr5$9UxepU[w*Mg,W";q.s3=mFsgloE]>|HwC'
class LDAPSyncTests(django.test.testcases.TestCase):
 43class LDAPSyncTests(TestCase):
 44    """LDAP Sync tests"""
 45
 46    @apply_blueprint("system/sources-ldap.yaml")
 47    def setUp(self):
 48        self.source: LDAPSource = LDAPSource.objects.create(
 49            name="ldap",
 50            slug="ldap",
 51            base_dn="dc=goauthentik,dc=io",
 52            additional_user_dn="ou=users",
 53            additional_group_dn="ou=groups",
 54        )
 55
 56    def test_sync_missing_page(self):
 57        """Test sync with missing page"""
 58        connection = MagicMock(return_value=mock_ad_connection())
 59        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
 60            ldap_sync_page.send(self.source.pk, class_to_path(UserLDAPSynchronizer), "foo")
 61
 62    def test_sync_error(self):
 63        """Test user sync"""
 64        self.source.base_dn = "dc=t,dc=goauthentik,dc=io"
 65        self.source.additional_user_dn = ""
 66        self.source.additional_group_dn = ""
 67        self.source.save()
 68        self.source.user_property_mappings.set(
 69            LDAPSourcePropertyMapping.objects.filter(
 70                Q(managed__startswith="goauthentik.io/sources/ldap/default")
 71                | Q(managed__startswith="goauthentik.io/sources/ldap/ms")
 72            )
 73        )
 74        mapping = LDAPSourcePropertyMapping.objects.create(
 75            name="name",
 76            expression="q",
 77        )
 78        self.source.user_property_mappings.set([mapping])
 79        self.source.save()
 80        connection = MagicMock(return_value=mock_ad_connection())
 81        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
 82            user_sync = UserLDAPSynchronizer(self.source, Task())
 83            with self.assertRaises(StopSync):
 84                user_sync.sync_full()
 85            self.assertFalse(User.objects.filter(username="user0_sn").exists())
 86            self.assertFalse(User.objects.filter(username="user1_sn").exists())
 87        events = Event.objects.filter(
 88            action=EventAction.CONFIGURATION_ERROR,
 89            context__message="Failed to evaluate property mapping: 'name'",
 90            context__mapping__pk=mapping.pk.hex,
 91        )
 92        self.assertTrue(events.exists())
 93
 94    def test_sync_mapping(self):
 95        """Test property mappings"""
 96        none = LDAPSourcePropertyMapping.objects.create(
 97            name=generate_id(), expression="return None"
 98        )
 99        byte_mapping = LDAPSourcePropertyMapping.objects.create(
100            name=generate_id(), expression="return b''"
101        )
102        self.source.user_property_mappings.set(
103            LDAPSourcePropertyMapping.objects.filter(
104                Q(managed__startswith="goauthentik.io/sources/ldap/default")
105                | Q(managed__startswith="goauthentik.io/sources/ldap/ms")
106            )
107        )
108        self.source.user_property_mappings.add(none, byte_mapping)
109        connection = MagicMock(return_value=mock_ad_connection())
110
111        # we basically just test that the mappings don't throw errors
112        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
113            user_sync = UserLDAPSynchronizer(self.source, Task())
114            user_sync.sync_full()
115
116    def test_sync_users_ad(self):
117        """Test user sync"""
118        self.source.base_dn = "dc=t,dc=goauthentik,dc=io"
119        self.source.additional_user_dn = ""
120        self.source.additional_group_dn = ""
121        self.source.save()
122        self.source.user_property_mappings.set(
123            LDAPSourcePropertyMapping.objects.filter(
124                Q(managed__startswith="goauthentik.io/sources/ldap/default")
125                | Q(managed__startswith="goauthentik.io/sources/ldap/ms")
126            )
127        )
128        connection = MagicMock(return_value=mock_ad_connection())
129
130        # Create the user beforehand so we can set attributes and check they aren't removed
131        user = User.objects.create(
132            username="erin.h",
133            attributes={
134                "ldap_uniq": "S-1-5-21-1955698215-2946288202-2760262721-1114",
135                "foo": "bar",
136            },
137        )
138
139        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
140            user_sync = UserLDAPSynchronizer(self.source, Task())
141            user_sync.sync_full()
142
143            user.refresh_from_db()
144            self.assertEqual(user.name, "Erin M. Hagens")
145            self.assertEqual(user.attributes["foo"], "bar")
146            self.assertTrue(user.is_active)
147            self.assertEqual(user.path, "goauthentik.io/sources/ldap/ak-test")
148
149            deactivated = User.objects.filter(username="deactivated.a").first()
150            self.assertIsNotNone(deactivated)
151            self.assertFalse(deactivated.is_active)
152
153    def test_sync_users_openldap(self):
154        """Test user sync"""
155        self.source.object_uniqueness_field = "uid"
156        self.source.user_property_mappings.set(
157            LDAPSourcePropertyMapping.objects.filter(
158                Q(managed__startswith="goauthentik.io/sources/ldap/default")
159                | Q(managed__startswith="goauthentik.io/sources/ldap/openldap")
160            )
161        )
162        connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD))
163        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
164            user_sync = UserLDAPSynchronizer(self.source, Task())
165            user_sync.sync_full()
166            self.assertTrue(User.objects.filter(username="user0_sn").exists())
167            self.assertFalse(User.objects.filter(username="user1_sn").exists())
168
169    def test_sync_users_freeipa_ish(self):
170        """Test user sync (FreeIPA-ish), mainly testing vendor quirks"""
171        self.source.object_uniqueness_field = "uid"
172        self.source.user_property_mappings.set(
173            LDAPSourcePropertyMapping.objects.filter(
174                Q(managed__startswith="goauthentik.io/sources/ldap/default")
175                | Q(managed__startswith="goauthentik.io/sources/ldap/openldap")
176            )
177        )
178        connection = MagicMock(return_value=mock_freeipa_connection(LDAP_PASSWORD))
179        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
180            user_sync = UserLDAPSynchronizer(self.source, Task())
181            user_sync.sync_full()
182            self.assertTrue(User.objects.filter(username="user0_sn").exists())
183            self.assertFalse(User.objects.filter(username="user1_sn").exists())
184            self.assertFalse(User.objects.get(username="user-nsaccountlock").is_active)
185
186    def test_sync_groups_freeipa_memberOf(self):
187        """Test group sync when membership is derived from memberOf user attribute"""
188        self.source.object_uniqueness_field = "uid"
189        self.source.group_object_filter = "(objectClass=groupOfNames)"
190        self.source.lookup_groups_from_user = True
191        self.source.group_membership_field = "memberOf"
192        self.source.user_property_mappings.set(
193            LDAPSourcePropertyMapping.objects.filter(
194                Q(managed__startswith="goauthentik.io/sources/ldap/default")
195                | Q(managed__startswith="goauthentik.io/sources/ldap/openldap")
196            )
197        )
198        self.source.group_property_mappings.set(
199            LDAPSourcePropertyMapping.objects.filter(
200                managed="goauthentik.io/sources/ldap/openldap-cn"
201            )
202        )
203        connection = MagicMock(return_value=mock_freeipa_connection(LDAP_PASSWORD))
204        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
205            user_sync = UserLDAPSynchronizer(self.source, Task())
206            user_sync.sync_full()
207            group_sync = GroupLDAPSynchronizer(self.source, Task())
208            group_sync.sync_full()
209            membership_sync = MembershipLDAPSynchronizer(self.source, Task())
210            membership_sync.sync_full()
211
212            self.assertTrue(
213                User.objects.filter(username="user4_sn").exists(), "User does not exist"
214            )
215            # Test if membership mapping based on memberOf works.
216            memberof_group = Group.objects.filter(name="reverse-lookup-group")
217            self.assertTrue(memberof_group.exists(), "Group does not exist")
218            self.assertTrue(
219                memberof_group.first().users.filter(username="user4_sn").exists(),
220                "User not a member of the group",
221            )
222
223    def test_sync_groups_ad(self):
224        """Test group sync"""
225        self.source.base_dn = "dc=t,dc=goauthentik,dc=io"
226        self.source.additional_user_dn = ""
227        self.source.additional_group_dn = ""
228        self.source.save()
229        self.source.user_property_mappings.set(
230            LDAPSourcePropertyMapping.objects.filter(
231                Q(managed__startswith="goauthentik.io/sources/ldap/default")
232                | Q(managed__startswith="goauthentik.io/sources/ldap/ms")
233            )
234        )
235        self.source.group_property_mappings.set(
236            LDAPSourcePropertyMapping.objects.filter(
237                managed="goauthentik.io/sources/ldap/default-name"
238            )
239        )
240        connection = MagicMock(return_value=mock_ad_connection())
241        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
242            _user = create_test_admin_user()
243            parent_group = Group.objects.get(name=_user.username)
244            self.source.sync_parent_group = parent_group
245            self.source.save()
246            group_sync = GroupLDAPSynchronizer(self.source, Task())
247            group_sync.sync_full()
248            membership_sync = MembershipLDAPSynchronizer(self.source, Task())
249            membership_sync.sync_full()
250            group: Group = Group.objects.filter(name="Test Group").first()
251            self.assertIsNotNone(group)
252            self.assertEqual(group.parents.first(), parent_group)
253
254    def test_sync_groups_openldap(self):
255        """Test group sync"""
256        self.source.object_uniqueness_field = "uid"
257        self.source.group_object_filter = "(objectClass=groupOfNames)"
258        self.source.user_property_mappings.set(
259            LDAPSourcePropertyMapping.objects.filter(
260                Q(managed__startswith="goauthentik.io/sources/ldap/default")
261                | Q(managed__startswith="goauthentik.io/sources/ldap/openldap")
262            )
263        )
264        self.source.group_property_mappings.set(
265            LDAPSourcePropertyMapping.objects.filter(
266                managed="goauthentik.io/sources/ldap/openldap-cn"
267            )
268        )
269        connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD))
270        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
271            self.source.save()
272            group_sync = GroupLDAPSynchronizer(self.source, Task())
273            group_sync.sync_full()
274            membership_sync = MembershipLDAPSynchronizer(self.source, Task())
275            membership_sync.sync_full()
276            group = Group.objects.filter(name="group1")
277            self.assertTrue(group.exists())
278
279    def test_sync_groups_openldap_posix_group(self):
280        """Test posix group sync"""
281        self.source.object_uniqueness_field = "cn"
282        self.source.group_membership_field = "memberUid"
283        self.source.user_object_filter = "(objectClass=posixAccount)"
284        self.source.group_object_filter = "(objectClass=posixGroup)"
285        self.source.user_membership_attribute = "uid"
286        self.source.user_property_mappings.set(
287            [
288                *LDAPSourcePropertyMapping.objects.filter(
289                    Q(managed__startswith="goauthentik.io/sources/ldap/default")
290                    | Q(managed__startswith="goauthentik.io/sources/ldap/openldap")
291                ).all(),
292                LDAPSourcePropertyMapping.objects.create(
293                    name="name",
294                    expression='return {"attributes": {"uid": list_flatten(ldap.get("uid"))}}',
295                ),
296            ]
297        )
298        self.source.group_property_mappings.set(
299            LDAPSourcePropertyMapping.objects.filter(
300                managed="goauthentik.io/sources/ldap/openldap-cn"
301            )
302        )
303        connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD))
304        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
305            self.source.save()
306            user_sync = UserLDAPSynchronizer(self.source, Task())
307            user_sync.sync_full()
308            group_sync = GroupLDAPSynchronizer(self.source, Task())
309            group_sync.sync_full()
310            membership_sync = MembershipLDAPSynchronizer(self.source, Task())
311            membership_sync.sync_full()
312            # Test if membership mapping based on memberUid works.
313            posix_group = Group.objects.filter(name="group-posix").first()
314            self.assertTrue(posix_group.users.filter(name="user-posix").exists())
315
316    def test_sync_groups_openldap_posix_group_nonstandard_membership_attribute(self):
317        """Test posix group sync"""
318        self.source.object_uniqueness_field = "cn"
319        self.source.group_membership_field = "memberUid"
320        self.source.user_object_filter = "(objectClass=posixAccount)"
321        self.source.group_object_filter = "(objectClass=posixGroup)"
322        self.source.user_membership_attribute = "cn"
323        self.source.user_property_mappings.set(
324            [
325                *LDAPSourcePropertyMapping.objects.filter(
326                    Q(managed__startswith="goauthentik.io/sources/ldap/default")
327                    | Q(managed__startswith="goauthentik.io/sources/ldap/openldap")
328                ).all(),
329                LDAPSourcePropertyMapping.objects.create(
330                    name="name",
331                    expression='return {"attributes": {"cn": list_flatten(ldap.get("cn"))}}',
332                ),
333            ]
334        )
335        self.source.group_property_mappings.set(
336            LDAPSourcePropertyMapping.objects.filter(
337                managed="goauthentik.io/sources/ldap/openldap-cn"
338            )
339        )
340        connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD))
341        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
342            self.source.save()
343            user_sync = UserLDAPSynchronizer(self.source, Task())
344            user_sync.sync_full()
345            group_sync = GroupLDAPSynchronizer(self.source, Task())
346            group_sync.sync_full()
347            membership_sync = MembershipLDAPSynchronizer(self.source, Task())
348            membership_sync.sync_full()
349            # Test if membership mapping based on memberUid works.
350            posix_group = Group.objects.filter(name="group-posix").first()
351            self.assertTrue(posix_group.users.filter(name="user-posix").exists())
352
353    def test_tasks_ad(self):
354        """Test Scheduled tasks"""
355        self.source.user_property_mappings.set(
356            LDAPSourcePropertyMapping.objects.filter(
357                Q(managed__startswith="goauthentik.io/sources/ldap/default")
358                | Q(managed__startswith="goauthentik.io/sources/ldap/ms")
359            )
360        )
361        self.source.save()
362        connection = MagicMock(return_value=mock_ad_connection())
363        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
364            ldap_sync.send(self.source.pk)
365
366    def test_tasks_openldap(self):
367        """Test Scheduled tasks"""
368        self.source.object_uniqueness_field = "uid"
369        self.source.group_object_filter = "(objectClass=groupOfNames)"
370        self.source.user_property_mappings.set(
371            LDAPSourcePropertyMapping.objects.filter(
372                Q(managed__startswith="goauthentik.io/sources/ldap/default")
373                | Q(managed__startswith="goauthentik.io/sources/ldap/openldap")
374            )
375        )
376        self.source.save()
377        connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD))
378        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
379            ldap_sync.send(self.source.pk)
380
381    def test_user_deletion(self):
382        """Test user deletion"""
383        user = User.objects.create_user(username="not-in-the-source")
384        UserLDAPSourceConnection.objects.create(
385            user=user, source=self.source, identifier="not-in-the-source"
386        )
387        self.source.object_uniqueness_field = "uid"
388        self.source.group_object_filter = "(objectClass=groupOfNames)"
389        self.source.delete_not_found_objects = True
390        self.source.save()
391
392        connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD))
393        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
394            ldap_sync.send(self.source.pk)
395        self.assertFalse(User.objects.filter(username="not-in-the-source").exists())
396
397    def test_user_deletion_still_in_source(self):
398        """Test that user is not deleted if it's still in the source"""
399        username = user_in_slapd_cn
400        identifier = user_in_slapd_uid
401        user = User.objects.create_user(username=username)
402        UserLDAPSourceConnection.objects.create(
403            user=user, source=self.source, identifier=identifier
404        )
405        self.source.object_uniqueness_field = "uid"
406        self.source.group_object_filter = "(objectClass=groupOfNames)"
407        self.source.delete_not_found_objects = True
408        self.source.save()
409
410        connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD))
411        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
412            ldap_sync.send(self.source.pk)
413        self.assertTrue(User.objects.filter(username=username).exists())
414
415    def test_user_deletion_no_sync(self):
416        """Test that user is not deleted if sync_users is False"""
417        user = User.objects.create_user(username="not-in-the-source")
418        UserLDAPSourceConnection.objects.create(
419            user=user, source=self.source, identifier="not-in-the-source"
420        )
421        self.source.object_uniqueness_field = "uid"
422        self.source.group_object_filter = "(objectClass=groupOfNames)"
423        self.source.delete_not_found_objects = True
424        self.source.sync_users = False
425        self.source.save()
426
427        connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD))
428        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
429            ldap_sync.send(self.source.pk)
430        self.assertTrue(User.objects.filter(username="not-in-the-source").exists())
431
432    def test_user_deletion_no_delete(self):
433        """Test that user is not deleted if delete_not_found_objects is False"""
434        user = User.objects.create_user(username="not-in-the-source")
435        UserLDAPSourceConnection.objects.create(
436            user=user, source=self.source, identifier="not-in-the-source"
437        )
438        self.source.object_uniqueness_field = "uid"
439        self.source.group_object_filter = "(objectClass=groupOfNames)"
440        self.source.save()
441
442        connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD))
443        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
444            ldap_sync.send(self.source.pk)
445        self.assertTrue(User.objects.filter(username="not-in-the-source").exists())
446
447    def test_group_deletion(self):
448        """Test group deletion"""
449        group = Group.objects.create(name="not-in-the-source")
450        GroupLDAPSourceConnection.objects.create(
451            group=group, source=self.source, identifier="not-in-the-source"
452        )
453        self.source.object_uniqueness_field = "uid"
454        self.source.group_object_filter = "(objectClass=groupOfNames)"
455        self.source.delete_not_found_objects = True
456        self.source.save()
457
458        connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD))
459        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
460            ldap_sync.send(self.source.pk)
461        self.assertFalse(Group.objects.filter(name="not-in-the-source").exists())
462
463    def test_group_deletion_still_in_source(self):
464        """Test that group is not deleted if it's still in the source"""
465        groupname = group_in_slapd_cn
466        identifier = group_in_slapd_uid
467        group = Group.objects.create(name=groupname)
468        GroupLDAPSourceConnection.objects.create(
469            group=group, source=self.source, identifier=identifier
470        )
471        self.source.object_uniqueness_field = "uid"
472        self.source.group_object_filter = "(objectClass=groupOfNames)"
473        self.source.delete_not_found_objects = True
474        self.source.save()
475
476        connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD))
477        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
478            ldap_sync.send(self.source.pk)
479        self.assertTrue(Group.objects.filter(name=groupname).exists())
480
481    def test_group_deletion_no_sync(self):
482        """Test that group is not deleted if sync_groups is False"""
483        group = Group.objects.create(name="not-in-the-source")
484        GroupLDAPSourceConnection.objects.create(
485            group=group, source=self.source, identifier="not-in-the-source"
486        )
487        self.source.object_uniqueness_field = "uid"
488        self.source.group_object_filter = "(objectClass=groupOfNames)"
489        self.source.delete_not_found_objects = True
490        self.source.sync_groups = False
491        self.source.save()
492
493        connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD))
494        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
495            ldap_sync.send(self.source.pk)
496        self.assertTrue(Group.objects.filter(name="not-in-the-source").exists())
497
498    def test_group_deletion_no_delete(self):
499        """Test that group is not deleted if delete_not_found_objects is False"""
500        group = Group.objects.create(name="not-in-the-source")
501        GroupLDAPSourceConnection.objects.create(
502            group=group, source=self.source, identifier="not-in-the-source"
503        )
504        self.source.object_uniqueness_field = "uid"
505        self.source.group_object_filter = "(objectClass=groupOfNames)"
506        self.source.save()
507
508        connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD))
509        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
510            ldap_sync.send(self.source.pk)
511        self.assertTrue(Group.objects.filter(name="not-in-the-source").exists())
512
513    def test_batch_deletion(self):
514        """Test batch deletion"""
515        BATCH_SIZE = DELETE_CHUNK_SIZE + 1
516        for i in range(BATCH_SIZE):
517            user = User.objects.create_user(username=f"not-in-the-source-{i}")
518            group = Group.objects.create(name=f"not-in-the-source-{i}")
519            group.users.add(user)
520            UserLDAPSourceConnection.objects.create(
521                user=user, source=self.source, identifier=f"not-in-the-source-{i}-user"
522            )
523            GroupLDAPSourceConnection.objects.create(
524                group=group, source=self.source, identifier=f"not-in-the-source-{i}-group"
525            )
526        self.source.object_uniqueness_field = "uid"
527        self.source.group_object_filter = "(objectClass=groupOfNames)"
528        self.source.delete_not_found_objects = True
529        self.source.save()
530
531        connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD))
532        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
533            ldap_sync.send(self.source.pk)
534
535        self.assertFalse(User.objects.filter(username__startswith="not-in-the-source").exists())
536        self.assertFalse(Group.objects.filter(name__startswith="not-in-the-source").exists())
537
538    def test_membership_sync_special_chars_in_group_dn(self):
539        """Test membership synchronization with special characters in group DN"""
540        self.source.object_uniqueness_field = "uid"
541        self.source.group_object_filter = "(objectClass=groupOfNames)"
542        self.source.lookup_groups_from_user = True
543        self.source.group_membership_field = "memberOf"
544
545        # Mock connection with group DN containing special characters
546        mock_conn = MagicMock()
547
548        # Simulate group with special characters in DN: parentheses, backslashes, asterisks
549        special_group_dn = "cn=test(group),ou=groups,dc=example,dc=com"
550        backslash_group_dn = "cn=test\\group,ou=groups,dc=example,dc=com"
551        asterisk_group_dn = "cn=test*group,ou=groups,dc=example,dc=com"
552
553        # Mock the paged_search method that would be called with the filter
554        mock_standard = MagicMock()
555        mock_conn.extend.standard = mock_standard
556
557        # Test case 1: Group DN with parentheses
558        with patch("authentik.sources.ldap.models.LDAPSource.connection", return_value=mock_conn):
559            membership_sync = MembershipLDAPSynchronizer(self.source, Task())
560
561            # Simulate group data with special characters in DN
562            page_data = [{"dn": special_group_dn}]
563
564            # This should not raise LDAPInvalidFilterError anymore
565            try:
566                membership_sync.sync(page_data)
567                # Verify that the filter was properly escaped
568                # The call should have been made with escaped characters
569                mock_standard.paged_search.assert_called()
570                call_args = mock_standard.paged_search.call_args
571                search_filter = call_args[1]["search_filter"]
572                # The parentheses should be escaped as \28 and \29
573                self.assertIn("\\28", search_filter)  # Escaped (
574                self.assertIn("\\29", search_filter)  # Escaped )
575            except LDAPInvalidFilterError:
576                self.fail("LDAPInvalidFilterError should not be raised with escaped filter")
577
578        # Test case 2: Group DN with backslashes
579        with patch("authentik.sources.ldap.models.LDAPSource.connection", return_value=mock_conn):
580            membership_sync = MembershipLDAPSynchronizer(self.source, Task())
581            page_data = [{"dn": backslash_group_dn}]
582
583            try:
584                membership_sync.sync(page_data)
585                call_args = mock_standard.paged_search.call_args
586                search_filter = call_args[1]["search_filter"]
587                # The backslash should be escaped as \5c
588                self.assertIn("\\5c", search_filter)  # Escaped \
589            except LDAPInvalidFilterError:
590                self.fail("LDAPInvalidFilterError should not be raised with escaped filter")
591
592        # Test case 3: Group DN with asterisks
593        with patch("authentik.sources.ldap.models.LDAPSource.connection", return_value=mock_conn):
594            membership_sync = MembershipLDAPSynchronizer(self.source, Task())
595            page_data = [{"dn": asterisk_group_dn}]
596
597            try:
598                membership_sync.sync(page_data)
599                call_args = mock_standard.paged_search.call_args
600                search_filter = call_args[1]["search_filter"]
601                # The asterisk should be escaped as \2a
602                self.assertIn("\\2a", search_filter)  # Escaped *
603            except LDAPInvalidFilterError:
604                self.fail("LDAPInvalidFilterError should not be raised with escaped filter")
605
606    def test_escape_filter_chars_function(self):
607        """Test the escape_filter_chars function directly"""
608
609        # Test various special characters that need escaping
610        test_cases = [
611            ("test(group)", "test\\28group\\29"),  # parentheses
612            ("test\\group", "test\\5cgroup"),  # backslash
613            ("test*group", "test\\2agroup"),  # asterisk
614            ("test(*)group", "test\\28\\2a\\29group"),  # multiple special chars
615            ("normalgroup", "normalgroup"),  # no special chars
616            ("", ""),  # empty string
617        ]
618
619        for input_str, expected in test_cases:
620            with self.subTest(input_str=input_str):
621                result = escape_filter_chars(input_str)
622                self.assertEqual(result, expected)

LDAP Sync tests

@apply_blueprint('system/sources-ldap.yaml')
def setUp(self):
46    @apply_blueprint("system/sources-ldap.yaml")
47    def setUp(self):
48        self.source: LDAPSource = LDAPSource.objects.create(
49            name="ldap",
50            slug="ldap",
51            base_dn="dc=goauthentik,dc=io",
52            additional_user_dn="ou=users",
53            additional_group_dn="ou=groups",
54        )

Hook method for setting up the test fixture before exercising it.

def test_sync_missing_page(self):
56    def test_sync_missing_page(self):
57        """Test sync with missing page"""
58        connection = MagicMock(return_value=mock_ad_connection())
59        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
60            ldap_sync_page.send(self.source.pk, class_to_path(UserLDAPSynchronizer), "foo")

Test sync with missing page

def test_sync_error(self):
62    def test_sync_error(self):
63        """Test user sync"""
64        self.source.base_dn = "dc=t,dc=goauthentik,dc=io"
65        self.source.additional_user_dn = ""
66        self.source.additional_group_dn = ""
67        self.source.save()
68        self.source.user_property_mappings.set(
69            LDAPSourcePropertyMapping.objects.filter(
70                Q(managed__startswith="goauthentik.io/sources/ldap/default")
71                | Q(managed__startswith="goauthentik.io/sources/ldap/ms")
72            )
73        )
74        mapping = LDAPSourcePropertyMapping.objects.create(
75            name="name",
76            expression="q",
77        )
78        self.source.user_property_mappings.set([mapping])
79        self.source.save()
80        connection = MagicMock(return_value=mock_ad_connection())
81        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
82            user_sync = UserLDAPSynchronizer(self.source, Task())
83            with self.assertRaises(StopSync):
84                user_sync.sync_full()
85            self.assertFalse(User.objects.filter(username="user0_sn").exists())
86            self.assertFalse(User.objects.filter(username="user1_sn").exists())
87        events = Event.objects.filter(
88            action=EventAction.CONFIGURATION_ERROR,
89            context__message="Failed to evaluate property mapping: 'name'",
90            context__mapping__pk=mapping.pk.hex,
91        )
92        self.assertTrue(events.exists())

Test user sync

def test_sync_mapping(self):
 94    def test_sync_mapping(self):
 95        """Test property mappings"""
 96        none = LDAPSourcePropertyMapping.objects.create(
 97            name=generate_id(), expression="return None"
 98        )
 99        byte_mapping = LDAPSourcePropertyMapping.objects.create(
100            name=generate_id(), expression="return b''"
101        )
102        self.source.user_property_mappings.set(
103            LDAPSourcePropertyMapping.objects.filter(
104                Q(managed__startswith="goauthentik.io/sources/ldap/default")
105                | Q(managed__startswith="goauthentik.io/sources/ldap/ms")
106            )
107        )
108        self.source.user_property_mappings.add(none, byte_mapping)
109        connection = MagicMock(return_value=mock_ad_connection())
110
111        # we basically just test that the mappings don't throw errors
112        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
113            user_sync = UserLDAPSynchronizer(self.source, Task())
114            user_sync.sync_full()

Test property mappings

def test_sync_users_ad(self):
116    def test_sync_users_ad(self):
117        """Test user sync"""
118        self.source.base_dn = "dc=t,dc=goauthentik,dc=io"
119        self.source.additional_user_dn = ""
120        self.source.additional_group_dn = ""
121        self.source.save()
122        self.source.user_property_mappings.set(
123            LDAPSourcePropertyMapping.objects.filter(
124                Q(managed__startswith="goauthentik.io/sources/ldap/default")
125                | Q(managed__startswith="goauthentik.io/sources/ldap/ms")
126            )
127        )
128        connection = MagicMock(return_value=mock_ad_connection())
129
130        # Create the user beforehand so we can set attributes and check they aren't removed
131        user = User.objects.create(
132            username="erin.h",
133            attributes={
134                "ldap_uniq": "S-1-5-21-1955698215-2946288202-2760262721-1114",
135                "foo": "bar",
136            },
137        )
138
139        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
140            user_sync = UserLDAPSynchronizer(self.source, Task())
141            user_sync.sync_full()
142
143            user.refresh_from_db()
144            self.assertEqual(user.name, "Erin M. Hagens")
145            self.assertEqual(user.attributes["foo"], "bar")
146            self.assertTrue(user.is_active)
147            self.assertEqual(user.path, "goauthentik.io/sources/ldap/ak-test")
148
149            deactivated = User.objects.filter(username="deactivated.a").first()
150            self.assertIsNotNone(deactivated)
151            self.assertFalse(deactivated.is_active)

Test user sync

def test_sync_users_openldap(self):
153    def test_sync_users_openldap(self):
154        """Test user sync"""
155        self.source.object_uniqueness_field = "uid"
156        self.source.user_property_mappings.set(
157            LDAPSourcePropertyMapping.objects.filter(
158                Q(managed__startswith="goauthentik.io/sources/ldap/default")
159                | Q(managed__startswith="goauthentik.io/sources/ldap/openldap")
160            )
161        )
162        connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD))
163        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
164            user_sync = UserLDAPSynchronizer(self.source, Task())
165            user_sync.sync_full()
166            self.assertTrue(User.objects.filter(username="user0_sn").exists())
167            self.assertFalse(User.objects.filter(username="user1_sn").exists())

Test user sync

def test_sync_users_freeipa_ish(self):
169    def test_sync_users_freeipa_ish(self):
170        """Test user sync (FreeIPA-ish), mainly testing vendor quirks"""
171        self.source.object_uniqueness_field = "uid"
172        self.source.user_property_mappings.set(
173            LDAPSourcePropertyMapping.objects.filter(
174                Q(managed__startswith="goauthentik.io/sources/ldap/default")
175                | Q(managed__startswith="goauthentik.io/sources/ldap/openldap")
176            )
177        )
178        connection = MagicMock(return_value=mock_freeipa_connection(LDAP_PASSWORD))
179        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
180            user_sync = UserLDAPSynchronizer(self.source, Task())
181            user_sync.sync_full()
182            self.assertTrue(User.objects.filter(username="user0_sn").exists())
183            self.assertFalse(User.objects.filter(username="user1_sn").exists())
184            self.assertFalse(User.objects.get(username="user-nsaccountlock").is_active)

Test user sync (FreeIPA-ish), mainly testing vendor quirks

def test_sync_groups_freeipa_memberOf(self):
186    def test_sync_groups_freeipa_memberOf(self):
187        """Test group sync when membership is derived from memberOf user attribute"""
188        self.source.object_uniqueness_field = "uid"
189        self.source.group_object_filter = "(objectClass=groupOfNames)"
190        self.source.lookup_groups_from_user = True
191        self.source.group_membership_field = "memberOf"
192        self.source.user_property_mappings.set(
193            LDAPSourcePropertyMapping.objects.filter(
194                Q(managed__startswith="goauthentik.io/sources/ldap/default")
195                | Q(managed__startswith="goauthentik.io/sources/ldap/openldap")
196            )
197        )
198        self.source.group_property_mappings.set(
199            LDAPSourcePropertyMapping.objects.filter(
200                managed="goauthentik.io/sources/ldap/openldap-cn"
201            )
202        )
203        connection = MagicMock(return_value=mock_freeipa_connection(LDAP_PASSWORD))
204        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
205            user_sync = UserLDAPSynchronizer(self.source, Task())
206            user_sync.sync_full()
207            group_sync = GroupLDAPSynchronizer(self.source, Task())
208            group_sync.sync_full()
209            membership_sync = MembershipLDAPSynchronizer(self.source, Task())
210            membership_sync.sync_full()
211
212            self.assertTrue(
213                User.objects.filter(username="user4_sn").exists(), "User does not exist"
214            )
215            # Test if membership mapping based on memberOf works.
216            memberof_group = Group.objects.filter(name="reverse-lookup-group")
217            self.assertTrue(memberof_group.exists(), "Group does not exist")
218            self.assertTrue(
219                memberof_group.first().users.filter(username="user4_sn").exists(),
220                "User not a member of the group",
221            )

Test group sync when membership is derived from memberOf user attribute

def test_sync_groups_ad(self):
223    def test_sync_groups_ad(self):
224        """Test group sync"""
225        self.source.base_dn = "dc=t,dc=goauthentik,dc=io"
226        self.source.additional_user_dn = ""
227        self.source.additional_group_dn = ""
228        self.source.save()
229        self.source.user_property_mappings.set(
230            LDAPSourcePropertyMapping.objects.filter(
231                Q(managed__startswith="goauthentik.io/sources/ldap/default")
232                | Q(managed__startswith="goauthentik.io/sources/ldap/ms")
233            )
234        )
235        self.source.group_property_mappings.set(
236            LDAPSourcePropertyMapping.objects.filter(
237                managed="goauthentik.io/sources/ldap/default-name"
238            )
239        )
240        connection = MagicMock(return_value=mock_ad_connection())
241        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
242            _user = create_test_admin_user()
243            parent_group = Group.objects.get(name=_user.username)
244            self.source.sync_parent_group = parent_group
245            self.source.save()
246            group_sync = GroupLDAPSynchronizer(self.source, Task())
247            group_sync.sync_full()
248            membership_sync = MembershipLDAPSynchronizer(self.source, Task())
249            membership_sync.sync_full()
250            group: Group = Group.objects.filter(name="Test Group").first()
251            self.assertIsNotNone(group)
252            self.assertEqual(group.parents.first(), parent_group)

Test group sync

def test_sync_groups_openldap(self):
254    def test_sync_groups_openldap(self):
255        """Test group sync"""
256        self.source.object_uniqueness_field = "uid"
257        self.source.group_object_filter = "(objectClass=groupOfNames)"
258        self.source.user_property_mappings.set(
259            LDAPSourcePropertyMapping.objects.filter(
260                Q(managed__startswith="goauthentik.io/sources/ldap/default")
261                | Q(managed__startswith="goauthentik.io/sources/ldap/openldap")
262            )
263        )
264        self.source.group_property_mappings.set(
265            LDAPSourcePropertyMapping.objects.filter(
266                managed="goauthentik.io/sources/ldap/openldap-cn"
267            )
268        )
269        connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD))
270        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
271            self.source.save()
272            group_sync = GroupLDAPSynchronizer(self.source, Task())
273            group_sync.sync_full()
274            membership_sync = MembershipLDAPSynchronizer(self.source, Task())
275            membership_sync.sync_full()
276            group = Group.objects.filter(name="group1")
277            self.assertTrue(group.exists())

Test group sync

def test_sync_groups_openldap_posix_group(self):
279    def test_sync_groups_openldap_posix_group(self):
280        """Test posix group sync"""
281        self.source.object_uniqueness_field = "cn"
282        self.source.group_membership_field = "memberUid"
283        self.source.user_object_filter = "(objectClass=posixAccount)"
284        self.source.group_object_filter = "(objectClass=posixGroup)"
285        self.source.user_membership_attribute = "uid"
286        self.source.user_property_mappings.set(
287            [
288                *LDAPSourcePropertyMapping.objects.filter(
289                    Q(managed__startswith="goauthentik.io/sources/ldap/default")
290                    | Q(managed__startswith="goauthentik.io/sources/ldap/openldap")
291                ).all(),
292                LDAPSourcePropertyMapping.objects.create(
293                    name="name",
294                    expression='return {"attributes": {"uid": list_flatten(ldap.get("uid"))}}',
295                ),
296            ]
297        )
298        self.source.group_property_mappings.set(
299            LDAPSourcePropertyMapping.objects.filter(
300                managed="goauthentik.io/sources/ldap/openldap-cn"
301            )
302        )
303        connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD))
304        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
305            self.source.save()
306            user_sync = UserLDAPSynchronizer(self.source, Task())
307            user_sync.sync_full()
308            group_sync = GroupLDAPSynchronizer(self.source, Task())
309            group_sync.sync_full()
310            membership_sync = MembershipLDAPSynchronizer(self.source, Task())
311            membership_sync.sync_full()
312            # Test if membership mapping based on memberUid works.
313            posix_group = Group.objects.filter(name="group-posix").first()
314            self.assertTrue(posix_group.users.filter(name="user-posix").exists())

Test posix group sync

def test_sync_groups_openldap_posix_group_nonstandard_membership_attribute(self):
316    def test_sync_groups_openldap_posix_group_nonstandard_membership_attribute(self):
317        """Test posix group sync"""
318        self.source.object_uniqueness_field = "cn"
319        self.source.group_membership_field = "memberUid"
320        self.source.user_object_filter = "(objectClass=posixAccount)"
321        self.source.group_object_filter = "(objectClass=posixGroup)"
322        self.source.user_membership_attribute = "cn"
323        self.source.user_property_mappings.set(
324            [
325                *LDAPSourcePropertyMapping.objects.filter(
326                    Q(managed__startswith="goauthentik.io/sources/ldap/default")
327                    | Q(managed__startswith="goauthentik.io/sources/ldap/openldap")
328                ).all(),
329                LDAPSourcePropertyMapping.objects.create(
330                    name="name",
331                    expression='return {"attributes": {"cn": list_flatten(ldap.get("cn"))}}',
332                ),
333            ]
334        )
335        self.source.group_property_mappings.set(
336            LDAPSourcePropertyMapping.objects.filter(
337                managed="goauthentik.io/sources/ldap/openldap-cn"
338            )
339        )
340        connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD))
341        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
342            self.source.save()
343            user_sync = UserLDAPSynchronizer(self.source, Task())
344            user_sync.sync_full()
345            group_sync = GroupLDAPSynchronizer(self.source, Task())
346            group_sync.sync_full()
347            membership_sync = MembershipLDAPSynchronizer(self.source, Task())
348            membership_sync.sync_full()
349            # Test if membership mapping based on memberUid works.
350            posix_group = Group.objects.filter(name="group-posix").first()
351            self.assertTrue(posix_group.users.filter(name="user-posix").exists())

Test posix group sync

def test_tasks_ad(self):
353    def test_tasks_ad(self):
354        """Test Scheduled tasks"""
355        self.source.user_property_mappings.set(
356            LDAPSourcePropertyMapping.objects.filter(
357                Q(managed__startswith="goauthentik.io/sources/ldap/default")
358                | Q(managed__startswith="goauthentik.io/sources/ldap/ms")
359            )
360        )
361        self.source.save()
362        connection = MagicMock(return_value=mock_ad_connection())
363        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
364            ldap_sync.send(self.source.pk)

Test Scheduled tasks

def test_tasks_openldap(self):
366    def test_tasks_openldap(self):
367        """Test Scheduled tasks"""
368        self.source.object_uniqueness_field = "uid"
369        self.source.group_object_filter = "(objectClass=groupOfNames)"
370        self.source.user_property_mappings.set(
371            LDAPSourcePropertyMapping.objects.filter(
372                Q(managed__startswith="goauthentik.io/sources/ldap/default")
373                | Q(managed__startswith="goauthentik.io/sources/ldap/openldap")
374            )
375        )
376        self.source.save()
377        connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD))
378        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
379            ldap_sync.send(self.source.pk)

Test Scheduled tasks

def test_user_deletion(self):
381    def test_user_deletion(self):
382        """Test user deletion"""
383        user = User.objects.create_user(username="not-in-the-source")
384        UserLDAPSourceConnection.objects.create(
385            user=user, source=self.source, identifier="not-in-the-source"
386        )
387        self.source.object_uniqueness_field = "uid"
388        self.source.group_object_filter = "(objectClass=groupOfNames)"
389        self.source.delete_not_found_objects = True
390        self.source.save()
391
392        connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD))
393        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
394            ldap_sync.send(self.source.pk)
395        self.assertFalse(User.objects.filter(username="not-in-the-source").exists())

Test user deletion

def test_user_deletion_still_in_source(self):
397    def test_user_deletion_still_in_source(self):
398        """Test that user is not deleted if it's still in the source"""
399        username = user_in_slapd_cn
400        identifier = user_in_slapd_uid
401        user = User.objects.create_user(username=username)
402        UserLDAPSourceConnection.objects.create(
403            user=user, source=self.source, identifier=identifier
404        )
405        self.source.object_uniqueness_field = "uid"
406        self.source.group_object_filter = "(objectClass=groupOfNames)"
407        self.source.delete_not_found_objects = True
408        self.source.save()
409
410        connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD))
411        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
412            ldap_sync.send(self.source.pk)
413        self.assertTrue(User.objects.filter(username=username).exists())

Test that user is not deleted if it's still in the source

def test_user_deletion_no_sync(self):
415    def test_user_deletion_no_sync(self):
416        """Test that user is not deleted if sync_users is False"""
417        user = User.objects.create_user(username="not-in-the-source")
418        UserLDAPSourceConnection.objects.create(
419            user=user, source=self.source, identifier="not-in-the-source"
420        )
421        self.source.object_uniqueness_field = "uid"
422        self.source.group_object_filter = "(objectClass=groupOfNames)"
423        self.source.delete_not_found_objects = True
424        self.source.sync_users = False
425        self.source.save()
426
427        connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD))
428        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
429            ldap_sync.send(self.source.pk)
430        self.assertTrue(User.objects.filter(username="not-in-the-source").exists())

Test that user is not deleted if sync_users is False

def test_user_deletion_no_delete(self):
432    def test_user_deletion_no_delete(self):
433        """Test that user is not deleted if delete_not_found_objects is False"""
434        user = User.objects.create_user(username="not-in-the-source")
435        UserLDAPSourceConnection.objects.create(
436            user=user, source=self.source, identifier="not-in-the-source"
437        )
438        self.source.object_uniqueness_field = "uid"
439        self.source.group_object_filter = "(objectClass=groupOfNames)"
440        self.source.save()
441
442        connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD))
443        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
444            ldap_sync.send(self.source.pk)
445        self.assertTrue(User.objects.filter(username="not-in-the-source").exists())

Test that user is not deleted if delete_not_found_objects is False

def test_group_deletion(self):
447    def test_group_deletion(self):
448        """Test group deletion"""
449        group = Group.objects.create(name="not-in-the-source")
450        GroupLDAPSourceConnection.objects.create(
451            group=group, source=self.source, identifier="not-in-the-source"
452        )
453        self.source.object_uniqueness_field = "uid"
454        self.source.group_object_filter = "(objectClass=groupOfNames)"
455        self.source.delete_not_found_objects = True
456        self.source.save()
457
458        connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD))
459        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
460            ldap_sync.send(self.source.pk)
461        self.assertFalse(Group.objects.filter(name="not-in-the-source").exists())

Test group deletion

def test_group_deletion_still_in_source(self):
463    def test_group_deletion_still_in_source(self):
464        """Test that group is not deleted if it's still in the source"""
465        groupname = group_in_slapd_cn
466        identifier = group_in_slapd_uid
467        group = Group.objects.create(name=groupname)
468        GroupLDAPSourceConnection.objects.create(
469            group=group, source=self.source, identifier=identifier
470        )
471        self.source.object_uniqueness_field = "uid"
472        self.source.group_object_filter = "(objectClass=groupOfNames)"
473        self.source.delete_not_found_objects = True
474        self.source.save()
475
476        connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD))
477        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
478            ldap_sync.send(self.source.pk)
479        self.assertTrue(Group.objects.filter(name=groupname).exists())

Test that group is not deleted if it's still in the source

def test_group_deletion_no_sync(self):
481    def test_group_deletion_no_sync(self):
482        """Test that group is not deleted if sync_groups is False"""
483        group = Group.objects.create(name="not-in-the-source")
484        GroupLDAPSourceConnection.objects.create(
485            group=group, source=self.source, identifier="not-in-the-source"
486        )
487        self.source.object_uniqueness_field = "uid"
488        self.source.group_object_filter = "(objectClass=groupOfNames)"
489        self.source.delete_not_found_objects = True
490        self.source.sync_groups = False
491        self.source.save()
492
493        connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD))
494        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
495            ldap_sync.send(self.source.pk)
496        self.assertTrue(Group.objects.filter(name="not-in-the-source").exists())

Test that group is not deleted if sync_groups is False

def test_group_deletion_no_delete(self):
498    def test_group_deletion_no_delete(self):
499        """Test that group is not deleted if delete_not_found_objects is False"""
500        group = Group.objects.create(name="not-in-the-source")
501        GroupLDAPSourceConnection.objects.create(
502            group=group, source=self.source, identifier="not-in-the-source"
503        )
504        self.source.object_uniqueness_field = "uid"
505        self.source.group_object_filter = "(objectClass=groupOfNames)"
506        self.source.save()
507
508        connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD))
509        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
510            ldap_sync.send(self.source.pk)
511        self.assertTrue(Group.objects.filter(name="not-in-the-source").exists())

Test that group is not deleted if delete_not_found_objects is False

def test_batch_deletion(self):
513    def test_batch_deletion(self):
514        """Test batch deletion"""
515        BATCH_SIZE = DELETE_CHUNK_SIZE + 1
516        for i in range(BATCH_SIZE):
517            user = User.objects.create_user(username=f"not-in-the-source-{i}")
518            group = Group.objects.create(name=f"not-in-the-source-{i}")
519            group.users.add(user)
520            UserLDAPSourceConnection.objects.create(
521                user=user, source=self.source, identifier=f"not-in-the-source-{i}-user"
522            )
523            GroupLDAPSourceConnection.objects.create(
524                group=group, source=self.source, identifier=f"not-in-the-source-{i}-group"
525            )
526        self.source.object_uniqueness_field = "uid"
527        self.source.group_object_filter = "(objectClass=groupOfNames)"
528        self.source.delete_not_found_objects = True
529        self.source.save()
530
531        connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD))
532        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
533            ldap_sync.send(self.source.pk)
534
535        self.assertFalse(User.objects.filter(username__startswith="not-in-the-source").exists())
536        self.assertFalse(Group.objects.filter(name__startswith="not-in-the-source").exists())

Test batch deletion

def test_membership_sync_special_chars_in_group_dn(self):
538    def test_membership_sync_special_chars_in_group_dn(self):
539        """Test membership synchronization with special characters in group DN"""
540        self.source.object_uniqueness_field = "uid"
541        self.source.group_object_filter = "(objectClass=groupOfNames)"
542        self.source.lookup_groups_from_user = True
543        self.source.group_membership_field = "memberOf"
544
545        # Mock connection with group DN containing special characters
546        mock_conn = MagicMock()
547
548        # Simulate group with special characters in DN: parentheses, backslashes, asterisks
549        special_group_dn = "cn=test(group),ou=groups,dc=example,dc=com"
550        backslash_group_dn = "cn=test\\group,ou=groups,dc=example,dc=com"
551        asterisk_group_dn = "cn=test*group,ou=groups,dc=example,dc=com"
552
553        # Mock the paged_search method that would be called with the filter
554        mock_standard = MagicMock()
555        mock_conn.extend.standard = mock_standard
556
557        # Test case 1: Group DN with parentheses
558        with patch("authentik.sources.ldap.models.LDAPSource.connection", return_value=mock_conn):
559            membership_sync = MembershipLDAPSynchronizer(self.source, Task())
560
561            # Simulate group data with special characters in DN
562            page_data = [{"dn": special_group_dn}]
563
564            # This should not raise LDAPInvalidFilterError anymore
565            try:
566                membership_sync.sync(page_data)
567                # Verify that the filter was properly escaped
568                # The call should have been made with escaped characters
569                mock_standard.paged_search.assert_called()
570                call_args = mock_standard.paged_search.call_args
571                search_filter = call_args[1]["search_filter"]
572                # The parentheses should be escaped as \28 and \29
573                self.assertIn("\\28", search_filter)  # Escaped (
574                self.assertIn("\\29", search_filter)  # Escaped )
575            except LDAPInvalidFilterError:
576                self.fail("LDAPInvalidFilterError should not be raised with escaped filter")
577
578        # Test case 2: Group DN with backslashes
579        with patch("authentik.sources.ldap.models.LDAPSource.connection", return_value=mock_conn):
580            membership_sync = MembershipLDAPSynchronizer(self.source, Task())
581            page_data = [{"dn": backslash_group_dn}]
582
583            try:
584                membership_sync.sync(page_data)
585                call_args = mock_standard.paged_search.call_args
586                search_filter = call_args[1]["search_filter"]
587                # The backslash should be escaped as \5c
588                self.assertIn("\\5c", search_filter)  # Escaped \
589            except LDAPInvalidFilterError:
590                self.fail("LDAPInvalidFilterError should not be raised with escaped filter")
591
592        # Test case 3: Group DN with asterisks
593        with patch("authentik.sources.ldap.models.LDAPSource.connection", return_value=mock_conn):
594            membership_sync = MembershipLDAPSynchronizer(self.source, Task())
595            page_data = [{"dn": asterisk_group_dn}]
596
597            try:
598                membership_sync.sync(page_data)
599                call_args = mock_standard.paged_search.call_args
600                search_filter = call_args[1]["search_filter"]
601                # The asterisk should be escaped as \2a
602                self.assertIn("\\2a", search_filter)  # Escaped *
603            except LDAPInvalidFilterError:
604                self.fail("LDAPInvalidFilterError should not be raised with escaped filter")

Test membership synchronization with special characters in group DN

def test_escape_filter_chars_function(self):
606    def test_escape_filter_chars_function(self):
607        """Test the escape_filter_chars function directly"""
608
609        # Test various special characters that need escaping
610        test_cases = [
611            ("test(group)", "test\\28group\\29"),  # parentheses
612            ("test\\group", "test\\5cgroup"),  # backslash
613            ("test*group", "test\\2agroup"),  # asterisk
614            ("test(*)group", "test\\28\\2a\\29group"),  # multiple special chars
615            ("normalgroup", "normalgroup"),  # no special chars
616            ("", ""),  # empty string
617        ]
618
619        for input_str, expected in test_cases:
620            with self.subTest(input_str=input_str):
621                result = escape_filter_chars(input_str)
622                self.assertEqual(result, expected)

Test the escape_filter_chars function directly