authentik.sources.ldap.tests.test_sync

LDAP Source tests

  1"""LDAP Source tests"""
  2
  3from unittest.mock import MagicMock, patch
  4
  5from django.db.models import Q
  6from django.test import TestCase
  7from ldap3.core.exceptions import LDAPInvalidFilterError
  8from ldap3.utils.conv import escape_filter_chars
  9
 10from authentik.blueprints.tests import apply_blueprint
 11from authentik.core.models import Group, User
 12from authentik.core.tests.utils import create_test_admin_user
 13from authentik.events.models import Event, EventAction
 14from authentik.lib.generators import generate_id, generate_key
 15from authentik.lib.sync.outgoing.exceptions import StopSync
 16from authentik.lib.utils.reflection import class_to_path
 17from authentik.sources.ldap.models import (
 18    GroupLDAPSourceConnection,
 19    LDAPSource,
 20    LDAPSourcePropertyMapping,
 21    UserLDAPSourceConnection,
 22)
 23from authentik.sources.ldap.sync.forward_delete_users import DELETE_CHUNK_SIZE
 24from authentik.sources.ldap.sync.groups import GroupLDAPSynchronizer
 25from authentik.sources.ldap.sync.membership import MembershipLDAPSynchronizer
 26from authentik.sources.ldap.sync.users import UserLDAPSynchronizer
 27from authentik.sources.ldap.tasks import ldap_sync, ldap_sync_page
 28from authentik.sources.ldap.tests.mock_ad import mock_ad_connection
 29from authentik.sources.ldap.tests.mock_freeipa import mock_freeipa_connection
 30from authentik.sources.ldap.tests.mock_slapd import (
 31    group_in_slapd_cn,
 32    group_in_slapd_uid,
 33    mock_slapd_connection,
 34    user_in_slapd_cn,
 35    user_in_slapd_uid,
 36)
 37from authentik.tasks.models import Task
 38
 39LDAP_PASSWORD = generate_key()
 40
 41
 42class LDAPSyncTests(TestCase):
 43    """LDAP Sync tests"""
 44
 45    @apply_blueprint("system/sources-ldap.yaml")
 46    def setUp(self):
 47        self.source: LDAPSource = LDAPSource.objects.create(
 48            name="ldap",
 49            slug="ldap",
 50            base_dn="dc=goauthentik,dc=io",
 51            additional_user_dn="ou=users",
 52            additional_group_dn="ou=groups",
 53        )
 54
 55    def test_sync_missing_page(self):
 56        """Test sync with missing page"""
 57        connection = MagicMock(return_value=mock_ad_connection())
 58        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
 59            ldap_sync_page.send(self.source.pk, class_to_path(UserLDAPSynchronizer), "foo")
 60
 61    def test_sync_error(self):
 62        """Test user sync"""
 63        self.source.base_dn = "dc=t,dc=goauthentik,dc=io"
 64        self.source.additional_user_dn = ""
 65        self.source.additional_group_dn = ""
 66        self.source.save()
 67        self.source.user_property_mappings.set(
 68            LDAPSourcePropertyMapping.objects.filter(
 69                Q(managed__startswith="goauthentik.io/sources/ldap/default")
 70                | Q(managed__startswith="goauthentik.io/sources/ldap/ms")
 71            )
 72        )
 73        mapping = LDAPSourcePropertyMapping.objects.create(
 74            name="name",
 75            expression="q",
 76        )
 77        self.source.user_property_mappings.set([mapping])
 78        self.source.save()
 79        connection = MagicMock(return_value=mock_ad_connection())
 80        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
 81            user_sync = UserLDAPSynchronizer(self.source, Task())
 82            with self.assertRaises(StopSync):
 83                user_sync.sync_full()
 84            self.assertFalse(User.objects.filter(username="user0_sn").exists())
 85            self.assertFalse(User.objects.filter(username="user1_sn").exists())
 86        events = Event.objects.filter(
 87            action=EventAction.CONFIGURATION_ERROR,
 88            context__message="Failed to evaluate property mapping: 'name'",
 89            context__mapping__pk=mapping.pk.hex,
 90        )
 91        self.assertTrue(events.exists())
 92
 93    def test_sync_mapping(self):
 94        """Test property mappings"""
 95        none = LDAPSourcePropertyMapping.objects.create(
 96            name=generate_id(), expression="return None"
 97        )
 98        byte_mapping = LDAPSourcePropertyMapping.objects.create(
 99            name=generate_id(), expression="return b''"
100        )
101        self.source.user_property_mappings.set(
102            LDAPSourcePropertyMapping.objects.filter(
103                Q(managed__startswith="goauthentik.io/sources/ldap/default")
104                | Q(managed__startswith="goauthentik.io/sources/ldap/ms")
105            )
106        )
107        self.source.user_property_mappings.add(none, byte_mapping)
108        connection = MagicMock(return_value=mock_ad_connection())
109
110        # we basically just test that the mappings don't throw errors
111        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
112            user_sync = UserLDAPSynchronizer(self.source, Task())
113            user_sync.sync_full()
114
115    def test_sync_users_ad(self):
116        """Test user sync"""
117        self.source.base_dn = "dc=t,dc=goauthentik,dc=io"
118        self.source.additional_user_dn = ""
119        self.source.additional_group_dn = ""
120        self.source.save()
121        self.source.user_property_mappings.set(
122            LDAPSourcePropertyMapping.objects.filter(
123                Q(managed__startswith="goauthentik.io/sources/ldap/default")
124                | Q(managed__startswith="goauthentik.io/sources/ldap/ms")
125            )
126        )
127        connection = MagicMock(return_value=mock_ad_connection())
128
129        # Create the user beforehand so we can set attributes and check they aren't removed
130        user = User.objects.create(
131            username="erin.h",
132            attributes={
133                "foo": "bar",
134            },
135        )
136        UserLDAPSourceConnection.objects.create(
137            user=user,
138            source=self.source,
139            identifier="S-1-5-21-1955698215-2946288202-2760262721-1114",
140        )
141
142        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
143            user_sync = UserLDAPSynchronizer(self.source, Task())
144            user_sync.sync_full()
145
146            user.refresh_from_db()
147            self.assertEqual(user.name, "Erin M. Hagens")
148            self.assertEqual(user.attributes["foo"], "bar")
149            self.assertTrue(user.is_active)
150            self.assertEqual(user.path, "goauthentik.io/sources/ldap/ak-test")
151
152            deactivated = User.objects.filter(username="deactivated.a").first()
153            self.assertIsNotNone(deactivated)
154            self.assertFalse(deactivated.is_active)
155
156    def test_sync_ad_legacy(self):
157        """Test user sync"""
158        self.source.base_dn = "dc=t,dc=goauthentik,dc=io"
159        self.source.additional_user_dn = ""
160        self.source.additional_group_dn = ""
161        self.source.save()
162        self.source.user_property_mappings.set(
163            LDAPSourcePropertyMapping.objects.filter(
164                Q(managed__startswith="goauthentik.io/sources/ldap/default")
165                | Q(managed__startswith="goauthentik.io/sources/ldap/ms")
166            )
167        )
168        self.source.group_property_mappings.set(
169            LDAPSourcePropertyMapping.objects.filter(
170                managed="goauthentik.io/sources/ldap/default-name"
171            )
172        )
173        connection = MagicMock(return_value=mock_ad_connection())
174
175        # Create the user beforehand so we can set attributes and check they aren't removed
176        user = User.objects.create(
177            username="erin.h",
178            attributes={
179                "ldap_uniq": "S-1-5-21-1955698215-2946288202-2760262721-1114",
180                "foo": "bar",
181            },
182        )
183        group = Group.objects.create(
184            name="Administrators", attributes={"ldap_uniq": "S-1-5-32-544", "foo": "bar"}
185        )
186
187        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
188            user_sync = UserLDAPSynchronizer(self.source, Task())
189            user_sync.sync_full()
190            group_sync = GroupLDAPSynchronizer(self.source, Task())
191            group_sync.sync_full()
192
193            user.refresh_from_db()
194            group.refresh_from_db()
195
196            self.assertEqual(user.name, "Erin M. Hagens")
197            self.assertEqual(user.attributes["foo"], "bar")
198            self.assertTrue(user.is_active)
199            self.assertEqual(user.path, "goauthentik.io/sources/ldap/ak-test")
200            self.assertTrue(
201                UserLDAPSourceConnection.objects.filter(
202                    source=self.source,
203                    user=user,
204                    identifier="S-1-5-21-1955698215-2946288202-2760262721-1114",
205                ).exists()
206            )
207
208            deactivated = User.objects.filter(username="deactivated.a").first()
209            self.assertIsNotNone(deactivated)
210            self.assertFalse(deactivated.is_active)
211
212            self.assertEqual(group.name, "Administrators")
213            self.assertTrue(
214                GroupLDAPSourceConnection.objects.filter(
215                    source=self.source, group=group, identifier="S-1-5-32-544"
216                ).exists()
217            )
218            self.assertEqual(group.attributes["foo"], "bar")
219
220    def test_sync_users_openldap(self):
221        """Test user sync"""
222        self.source.object_uniqueness_field = "uid"
223        self.source.user_property_mappings.set(
224            LDAPSourcePropertyMapping.objects.filter(
225                Q(managed__startswith="goauthentik.io/sources/ldap/default")
226                | Q(managed__startswith="goauthentik.io/sources/ldap/openldap")
227            )
228        )
229        connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD))
230        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
231            user_sync = UserLDAPSynchronizer(self.source, Task())
232            user_sync.sync_full()
233            self.assertTrue(User.objects.filter(username="user0_sn").exists())
234            self.assertFalse(User.objects.filter(username="user1_sn").exists())
235
236    def test_sync_users_freeipa_ish(self):
237        """Test user sync (FreeIPA-ish), mainly testing vendor quirks"""
238        self.source.object_uniqueness_field = "uid"
239        self.source.user_property_mappings.set(
240            LDAPSourcePropertyMapping.objects.filter(
241                Q(managed__startswith="goauthentik.io/sources/ldap/default")
242                | Q(managed__startswith="goauthentik.io/sources/ldap/openldap")
243            )
244        )
245        connection = MagicMock(return_value=mock_freeipa_connection(LDAP_PASSWORD))
246        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
247            user_sync = UserLDAPSynchronizer(self.source, Task())
248            user_sync.sync_full()
249            self.assertTrue(User.objects.filter(username="user0_sn").exists())
250            self.assertFalse(User.objects.filter(username="user1_sn").exists())
251            self.assertFalse(User.objects.get(username="user-nsaccountlock").is_active)
252
253    def test_sync_groups_freeipa_memberOf(self):
254        """Test group sync when membership is derived from memberOf user attribute"""
255        self.source.object_uniqueness_field = "uid"
256        self.source.group_object_filter = "(objectClass=groupOfNames)"
257        self.source.lookup_groups_from_user = True
258        self.source.group_membership_field = "memberOf"
259        self.source.user_property_mappings.set(
260            LDAPSourcePropertyMapping.objects.filter(
261                Q(managed__startswith="goauthentik.io/sources/ldap/default")
262                | Q(managed__startswith="goauthentik.io/sources/ldap/openldap")
263            )
264        )
265        self.source.group_property_mappings.set(
266            LDAPSourcePropertyMapping.objects.filter(
267                managed="goauthentik.io/sources/ldap/openldap-cn"
268            )
269        )
270        connection = MagicMock(return_value=mock_freeipa_connection(LDAP_PASSWORD))
271        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
272            user_sync = UserLDAPSynchronizer(self.source, Task())
273            user_sync.sync_full()
274            group_sync = GroupLDAPSynchronizer(self.source, Task())
275            group_sync.sync_full()
276            membership_sync = MembershipLDAPSynchronizer(self.source, Task())
277            membership_sync.sync_full()
278
279            self.assertTrue(
280                User.objects.filter(username="user4_sn").exists(), "User does not exist"
281            )
282            # Test if membership mapping based on memberOf works.
283            memberof_group = Group.objects.filter(name="reverse-lookup-group")
284            self.assertTrue(memberof_group.exists(), "Group does not exist")
285            self.assertTrue(
286                memberof_group.first().users.filter(username="user4_sn").exists(),
287                "User not a member of the group",
288            )
289
290    def test_sync_groups_ad(self):
291        """Test group sync"""
292        self.source.base_dn = "dc=t,dc=goauthentik,dc=io"
293        self.source.additional_user_dn = ""
294        self.source.additional_group_dn = ""
295        self.source.save()
296        self.source.user_property_mappings.set(
297            LDAPSourcePropertyMapping.objects.filter(
298                Q(managed__startswith="goauthentik.io/sources/ldap/default")
299                | Q(managed__startswith="goauthentik.io/sources/ldap/ms")
300            )
301        )
302        self.source.group_property_mappings.set(
303            LDAPSourcePropertyMapping.objects.filter(
304                managed="goauthentik.io/sources/ldap/default-name"
305            )
306        )
307        connection = MagicMock(return_value=mock_ad_connection())
308        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
309            _user = create_test_admin_user()
310            parent_group = Group.objects.get(name=_user.username)
311            self.source.sync_parent_group = parent_group
312            self.source.save()
313            group_sync = GroupLDAPSynchronizer(self.source, Task())
314            group_sync.sync_full()
315            membership_sync = MembershipLDAPSynchronizer(self.source, Task())
316            membership_sync.sync_full()
317            group: Group = Group.objects.filter(name="Test Group").first()
318            self.assertIsNotNone(group)
319            self.assertEqual(group.parents.first(), parent_group)
320
321    def test_sync_groups_openldap(self):
322        """Test group sync"""
323        self.source.object_uniqueness_field = "uid"
324        self.source.group_object_filter = "(objectClass=groupOfNames)"
325        self.source.user_property_mappings.set(
326            LDAPSourcePropertyMapping.objects.filter(
327                Q(managed__startswith="goauthentik.io/sources/ldap/default")
328                | Q(managed__startswith="goauthentik.io/sources/ldap/openldap")
329            )
330        )
331        self.source.group_property_mappings.set(
332            LDAPSourcePropertyMapping.objects.filter(
333                managed="goauthentik.io/sources/ldap/openldap-cn"
334            )
335        )
336        connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD))
337        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
338            self.source.save()
339            group_sync = GroupLDAPSynchronizer(self.source, Task())
340            group_sync.sync_full()
341            membership_sync = MembershipLDAPSynchronizer(self.source, Task())
342            membership_sync.sync_full()
343            group = Group.objects.filter(name="group1")
344            self.assertTrue(group.exists())
345
346    def test_sync_groups_openldap_posix_group(self):
347        """Test posix group sync"""
348        self.source.object_uniqueness_field = "cn"
349        self.source.group_membership_field = "memberUid"
350        self.source.user_object_filter = "(objectClass=posixAccount)"
351        self.source.group_object_filter = "(objectClass=posixGroup)"
352        self.source.user_membership_attribute = "uid"
353        self.source.user_property_mappings.set(
354            [
355                *LDAPSourcePropertyMapping.objects.filter(
356                    Q(managed__startswith="goauthentik.io/sources/ldap/default")
357                    | Q(managed__startswith="goauthentik.io/sources/ldap/openldap")
358                ).all(),
359                LDAPSourcePropertyMapping.objects.create(
360                    name="name",
361                    expression='return {"attributes": {"uid": list_flatten(ldap.get("uid"))}}',
362                ),
363            ]
364        )
365        self.source.group_property_mappings.set(
366            LDAPSourcePropertyMapping.objects.filter(
367                managed="goauthentik.io/sources/ldap/openldap-cn"
368            )
369        )
370        connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD))
371        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
372            self.source.save()
373            user_sync = UserLDAPSynchronizer(self.source, Task())
374            user_sync.sync_full()
375            group_sync = GroupLDAPSynchronizer(self.source, Task())
376            group_sync.sync_full()
377            membership_sync = MembershipLDAPSynchronizer(self.source, Task())
378            membership_sync.sync_full()
379            # Test if membership mapping based on memberUid works.
380            posix_group = Group.objects.filter(name="group-posix").first()
381            self.assertTrue(posix_group.users.filter(name="user-posix").exists())
382
383    def test_sync_groups_openldap_posix_group_nonstandard_membership_attribute(self):
384        """Test posix group sync"""
385        self.source.object_uniqueness_field = "cn"
386        self.source.group_membership_field = "memberUid"
387        self.source.user_object_filter = "(objectClass=posixAccount)"
388        self.source.group_object_filter = "(objectClass=posixGroup)"
389        self.source.user_membership_attribute = "cn"
390        self.source.user_property_mappings.set(
391            [
392                *LDAPSourcePropertyMapping.objects.filter(
393                    Q(managed__startswith="goauthentik.io/sources/ldap/default")
394                    | Q(managed__startswith="goauthentik.io/sources/ldap/openldap")
395                ).all(),
396                LDAPSourcePropertyMapping.objects.create(
397                    name="name",
398                    expression='return {"attributes": {"cn": list_flatten(ldap.get("cn"))}}',
399                ),
400            ]
401        )
402        self.source.group_property_mappings.set(
403            LDAPSourcePropertyMapping.objects.filter(
404                managed="goauthentik.io/sources/ldap/openldap-cn"
405            )
406        )
407        connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD))
408        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
409            self.source.save()
410            user_sync = UserLDAPSynchronizer(self.source, Task())
411            user_sync.sync_full()
412            group_sync = GroupLDAPSynchronizer(self.source, Task())
413            group_sync.sync_full()
414            membership_sync = MembershipLDAPSynchronizer(self.source, Task())
415            membership_sync.sync_full()
416            # Test if membership mapping based on memberUid works.
417            posix_group = Group.objects.filter(name="group-posix").first()
418            self.assertTrue(posix_group.users.filter(name="user-posix").exists())
419
420    def test_tasks_ad(self):
421        """Test Scheduled tasks"""
422        self.source.user_property_mappings.set(
423            LDAPSourcePropertyMapping.objects.filter(
424                Q(managed__startswith="goauthentik.io/sources/ldap/default")
425                | Q(managed__startswith="goauthentik.io/sources/ldap/ms")
426            )
427        )
428        self.source.save()
429        connection = MagicMock(return_value=mock_ad_connection())
430        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
431            ldap_sync.send(self.source.pk)
432
433    def test_tasks_openldap(self):
434        """Test Scheduled tasks"""
435        self.source.object_uniqueness_field = "uid"
436        self.source.group_object_filter = "(objectClass=groupOfNames)"
437        self.source.user_property_mappings.set(
438            LDAPSourcePropertyMapping.objects.filter(
439                Q(managed__startswith="goauthentik.io/sources/ldap/default")
440                | Q(managed__startswith="goauthentik.io/sources/ldap/openldap")
441            )
442        )
443        self.source.save()
444        connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD))
445        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
446            ldap_sync.send(self.source.pk)
447
448    def test_user_deletion(self):
449        """Test user deletion"""
450        user = User.objects.create_user(username="not-in-the-source")
451        UserLDAPSourceConnection.objects.create(
452            user=user, source=self.source, identifier="not-in-the-source"
453        )
454        self.source.object_uniqueness_field = "uid"
455        self.source.group_object_filter = "(objectClass=groupOfNames)"
456        self.source.delete_not_found_objects = True
457        self.source.save()
458
459        connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD))
460        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
461            ldap_sync.send(self.source.pk)
462        self.assertFalse(User.objects.filter(username="not-in-the-source").exists())
463
464    def test_user_deletion_still_in_source(self):
465        """Test that user is not deleted if it's still in the source"""
466        username = user_in_slapd_cn
467        identifier = user_in_slapd_uid
468        user = User.objects.create_user(username=username)
469        UserLDAPSourceConnection.objects.create(
470            user=user, source=self.source, identifier=identifier
471        )
472        self.source.object_uniqueness_field = "uid"
473        self.source.group_object_filter = "(objectClass=groupOfNames)"
474        self.source.delete_not_found_objects = True
475        self.source.save()
476
477        connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD))
478        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
479            ldap_sync.send(self.source.pk)
480        self.assertTrue(User.objects.filter(username=username).exists())
481
482    def test_user_deletion_no_sync(self):
483        """Test that user is not deleted if sync_users is False"""
484        user = User.objects.create_user(username="not-in-the-source")
485        UserLDAPSourceConnection.objects.create(
486            user=user, source=self.source, identifier="not-in-the-source"
487        )
488        self.source.object_uniqueness_field = "uid"
489        self.source.group_object_filter = "(objectClass=groupOfNames)"
490        self.source.delete_not_found_objects = True
491        self.source.sync_users = False
492        self.source.save()
493
494        connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD))
495        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
496            ldap_sync.send(self.source.pk)
497        self.assertTrue(User.objects.filter(username="not-in-the-source").exists())
498
499    def test_user_deletion_no_delete(self):
500        """Test that user is not deleted if delete_not_found_objects is False"""
501        user = User.objects.create_user(username="not-in-the-source")
502        UserLDAPSourceConnection.objects.create(
503            user=user, source=self.source, identifier="not-in-the-source"
504        )
505        self.source.object_uniqueness_field = "uid"
506        self.source.group_object_filter = "(objectClass=groupOfNames)"
507        self.source.save()
508
509        connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD))
510        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
511            ldap_sync.send(self.source.pk)
512        self.assertTrue(User.objects.filter(username="not-in-the-source").exists())
513
514    def test_group_deletion(self):
515        """Test group deletion"""
516        group = Group.objects.create(name="not-in-the-source")
517        GroupLDAPSourceConnection.objects.create(
518            group=group, source=self.source, identifier="not-in-the-source"
519        )
520        self.source.object_uniqueness_field = "uid"
521        self.source.group_object_filter = "(objectClass=groupOfNames)"
522        self.source.delete_not_found_objects = True
523        self.source.save()
524
525        connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD))
526        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
527            ldap_sync.send(self.source.pk)
528        self.assertFalse(Group.objects.filter(name="not-in-the-source").exists())
529
530    def test_group_deletion_still_in_source(self):
531        """Test that group is not deleted if it's still in the source"""
532        groupname = group_in_slapd_cn
533        identifier = group_in_slapd_uid
534        group = Group.objects.create(name=groupname)
535        GroupLDAPSourceConnection.objects.create(
536            group=group, source=self.source, identifier=identifier
537        )
538        self.source.object_uniqueness_field = "uid"
539        self.source.group_object_filter = "(objectClass=groupOfNames)"
540        self.source.delete_not_found_objects = True
541        self.source.save()
542
543        connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD))
544        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
545            ldap_sync.send(self.source.pk)
546        self.assertTrue(Group.objects.filter(name=groupname).exists())
547
548    def test_group_deletion_no_sync(self):
549        """Test that group is not deleted if sync_groups is False"""
550        group = Group.objects.create(name="not-in-the-source")
551        GroupLDAPSourceConnection.objects.create(
552            group=group, source=self.source, identifier="not-in-the-source"
553        )
554        self.source.object_uniqueness_field = "uid"
555        self.source.group_object_filter = "(objectClass=groupOfNames)"
556        self.source.delete_not_found_objects = True
557        self.source.sync_groups = False
558        self.source.save()
559
560        connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD))
561        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
562            ldap_sync.send(self.source.pk)
563        self.assertTrue(Group.objects.filter(name="not-in-the-source").exists())
564
565    def test_group_deletion_no_delete(self):
566        """Test that group is not deleted if delete_not_found_objects is False"""
567        group = Group.objects.create(name="not-in-the-source")
568        GroupLDAPSourceConnection.objects.create(
569            group=group, source=self.source, identifier="not-in-the-source"
570        )
571        self.source.object_uniqueness_field = "uid"
572        self.source.group_object_filter = "(objectClass=groupOfNames)"
573        self.source.save()
574
575        connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD))
576        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
577            ldap_sync.send(self.source.pk)
578        self.assertTrue(Group.objects.filter(name="not-in-the-source").exists())
579
580    def test_batch_deletion(self):
581        """Test batch deletion"""
582        BATCH_SIZE = DELETE_CHUNK_SIZE + 1
583        for i in range(BATCH_SIZE):
584            user = User.objects.create_user(username=f"not-in-the-source-{i}")
585            group = Group.objects.create(name=f"not-in-the-source-{i}")
586            group.users.add(user)
587            UserLDAPSourceConnection.objects.create(
588                user=user, source=self.source, identifier=f"not-in-the-source-{i}-user"
589            )
590            GroupLDAPSourceConnection.objects.create(
591                group=group, source=self.source, identifier=f"not-in-the-source-{i}-group"
592            )
593        self.source.object_uniqueness_field = "uid"
594        self.source.group_object_filter = "(objectClass=groupOfNames)"
595        self.source.delete_not_found_objects = True
596        self.source.save()
597
598        connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD))
599        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
600            ldap_sync.send(self.source.pk)
601
602        self.assertFalse(User.objects.filter(username__startswith="not-in-the-source").exists())
603        self.assertFalse(Group.objects.filter(name__startswith="not-in-the-source").exists())
604
605    def test_membership_sync_special_chars_in_group_dn(self):
606        """Test membership synchronization with special characters in group DN"""
607        self.source.object_uniqueness_field = "uid"
608        self.source.group_object_filter = "(objectClass=groupOfNames)"
609        self.source.lookup_groups_from_user = True
610        self.source.group_membership_field = "memberOf"
611
612        # Mock connection with group DN containing special characters
613        mock_conn = MagicMock()
614
615        # Simulate group with special characters in DN: parentheses, backslashes, asterisks
616        special_group_dn = "cn=test(group),ou=groups,dc=example,dc=com"
617        backslash_group_dn = "cn=test\\group,ou=groups,dc=example,dc=com"
618        asterisk_group_dn = "cn=test*group,ou=groups,dc=example,dc=com"
619
620        # Mock the paged_search method that would be called with the filter
621        mock_standard = MagicMock()
622        mock_conn.extend.standard = mock_standard
623
624        # Test case 1: Group DN with parentheses
625        with patch("authentik.sources.ldap.models.LDAPSource.connection", return_value=mock_conn):
626            membership_sync = MembershipLDAPSynchronizer(self.source, Task())
627
628            # Simulate group data with special characters in DN
629            page_data = [{"dn": special_group_dn}]
630
631            # This should not raise LDAPInvalidFilterError anymore
632            try:
633                membership_sync.sync(page_data)
634                # Verify that the filter was properly escaped
635                # The call should have been made with escaped characters
636                mock_standard.paged_search.assert_called()
637                call_args = mock_standard.paged_search.call_args
638                search_filter = call_args[1]["search_filter"]
639                # The parentheses should be escaped as \28 and \29
640                self.assertIn("\\28", search_filter)  # Escaped (
641                self.assertIn("\\29", search_filter)  # Escaped )
642            except LDAPInvalidFilterError:
643                self.fail("LDAPInvalidFilterError should not be raised with escaped filter")
644
645        # Test case 2: Group DN with backslashes
646        with patch("authentik.sources.ldap.models.LDAPSource.connection", return_value=mock_conn):
647            membership_sync = MembershipLDAPSynchronizer(self.source, Task())
648            page_data = [{"dn": backslash_group_dn}]
649
650            try:
651                membership_sync.sync(page_data)
652                call_args = mock_standard.paged_search.call_args
653                search_filter = call_args[1]["search_filter"]
654                # The backslash should be escaped as \5c
655                self.assertIn("\\5c", search_filter)  # Escaped \
656            except LDAPInvalidFilterError:
657                self.fail("LDAPInvalidFilterError should not be raised with escaped filter")
658
659        # Test case 3: Group DN with asterisks
660        with patch("authentik.sources.ldap.models.LDAPSource.connection", return_value=mock_conn):
661            membership_sync = MembershipLDAPSynchronizer(self.source, Task())
662            page_data = [{"dn": asterisk_group_dn}]
663
664            try:
665                membership_sync.sync(page_data)
666                call_args = mock_standard.paged_search.call_args
667                search_filter = call_args[1]["search_filter"]
668                # The asterisk should be escaped as \2a
669                self.assertIn("\\2a", search_filter)  # Escaped *
670            except LDAPInvalidFilterError:
671                self.fail("LDAPInvalidFilterError should not be raised with escaped filter")
672
673    def test_escape_filter_chars_function(self):
674        """Test the escape_filter_chars function directly"""
675
676        # Test various special characters that need escaping
677        test_cases = [
678            ("test(group)", "test\\28group\\29"),  # parentheses
679            ("test\\group", "test\\5cgroup"),  # backslash
680            ("test*group", "test\\2agroup"),  # asterisk
681            ("test(*)group", "test\\28\\2a\\29group"),  # multiple special chars
682            ("normalgroup", "normalgroup"),  # no special chars
683            ("", ""),  # empty string
684        ]
685
686        for input_str, expected in test_cases:
687            with self.subTest(input_str=input_str):
688                result = escape_filter_chars(input_str)
689                self.assertEqual(result, expected)
LDAP_PASSWORD = 'Y@tP7,Ezqdn?S4).aBw#o!E4<g2v7ri\'"SWMk?lK*||6Yh7\\Hr{a57,I[gLn}LWG%sjdti4WKP8P8N:1#ZtY|\'srgj-aDTG[\\>AHd?XVGFq]&LX.N<&Ni8vlf;nv^`1,'
class LDAPSyncTests(django.test.testcases.TestCase):
 43class LDAPSyncTests(TestCase):
 44    """LDAP Sync tests"""
 45
 46    @apply_blueprint("system/sources-ldap.yaml")
 47    def setUp(self):
 48        self.source: LDAPSource = LDAPSource.objects.create(
 49            name="ldap",
 50            slug="ldap",
 51            base_dn="dc=goauthentik,dc=io",
 52            additional_user_dn="ou=users",
 53            additional_group_dn="ou=groups",
 54        )
 55
 56    def test_sync_missing_page(self):
 57        """Test sync with missing page"""
 58        connection = MagicMock(return_value=mock_ad_connection())
 59        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
 60            ldap_sync_page.send(self.source.pk, class_to_path(UserLDAPSynchronizer), "foo")
 61
 62    def test_sync_error(self):
 63        """Test user sync"""
 64        self.source.base_dn = "dc=t,dc=goauthentik,dc=io"
 65        self.source.additional_user_dn = ""
 66        self.source.additional_group_dn = ""
 67        self.source.save()
 68        self.source.user_property_mappings.set(
 69            LDAPSourcePropertyMapping.objects.filter(
 70                Q(managed__startswith="goauthentik.io/sources/ldap/default")
 71                | Q(managed__startswith="goauthentik.io/sources/ldap/ms")
 72            )
 73        )
 74        mapping = LDAPSourcePropertyMapping.objects.create(
 75            name="name",
 76            expression="q",
 77        )
 78        self.source.user_property_mappings.set([mapping])
 79        self.source.save()
 80        connection = MagicMock(return_value=mock_ad_connection())
 81        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
 82            user_sync = UserLDAPSynchronizer(self.source, Task())
 83            with self.assertRaises(StopSync):
 84                user_sync.sync_full()
 85            self.assertFalse(User.objects.filter(username="user0_sn").exists())
 86            self.assertFalse(User.objects.filter(username="user1_sn").exists())
 87        events = Event.objects.filter(
 88            action=EventAction.CONFIGURATION_ERROR,
 89            context__message="Failed to evaluate property mapping: 'name'",
 90            context__mapping__pk=mapping.pk.hex,
 91        )
 92        self.assertTrue(events.exists())
 93
 94    def test_sync_mapping(self):
 95        """Test property mappings"""
 96        none = LDAPSourcePropertyMapping.objects.create(
 97            name=generate_id(), expression="return None"
 98        )
 99        byte_mapping = LDAPSourcePropertyMapping.objects.create(
100            name=generate_id(), expression="return b''"
101        )
102        self.source.user_property_mappings.set(
103            LDAPSourcePropertyMapping.objects.filter(
104                Q(managed__startswith="goauthentik.io/sources/ldap/default")
105                | Q(managed__startswith="goauthentik.io/sources/ldap/ms")
106            )
107        )
108        self.source.user_property_mappings.add(none, byte_mapping)
109        connection = MagicMock(return_value=mock_ad_connection())
110
111        # we basically just test that the mappings don't throw errors
112        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
113            user_sync = UserLDAPSynchronizer(self.source, Task())
114            user_sync.sync_full()
115
116    def test_sync_users_ad(self):
117        """Test user sync"""
118        self.source.base_dn = "dc=t,dc=goauthentik,dc=io"
119        self.source.additional_user_dn = ""
120        self.source.additional_group_dn = ""
121        self.source.save()
122        self.source.user_property_mappings.set(
123            LDAPSourcePropertyMapping.objects.filter(
124                Q(managed__startswith="goauthentik.io/sources/ldap/default")
125                | Q(managed__startswith="goauthentik.io/sources/ldap/ms")
126            )
127        )
128        connection = MagicMock(return_value=mock_ad_connection())
129
130        # Create the user beforehand so we can set attributes and check they aren't removed
131        user = User.objects.create(
132            username="erin.h",
133            attributes={
134                "foo": "bar",
135            },
136        )
137        UserLDAPSourceConnection.objects.create(
138            user=user,
139            source=self.source,
140            identifier="S-1-5-21-1955698215-2946288202-2760262721-1114",
141        )
142
143        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
144            user_sync = UserLDAPSynchronizer(self.source, Task())
145            user_sync.sync_full()
146
147            user.refresh_from_db()
148            self.assertEqual(user.name, "Erin M. Hagens")
149            self.assertEqual(user.attributes["foo"], "bar")
150            self.assertTrue(user.is_active)
151            self.assertEqual(user.path, "goauthentik.io/sources/ldap/ak-test")
152
153            deactivated = User.objects.filter(username="deactivated.a").first()
154            self.assertIsNotNone(deactivated)
155            self.assertFalse(deactivated.is_active)
156
157    def test_sync_ad_legacy(self):
158        """Test user sync"""
159        self.source.base_dn = "dc=t,dc=goauthentik,dc=io"
160        self.source.additional_user_dn = ""
161        self.source.additional_group_dn = ""
162        self.source.save()
163        self.source.user_property_mappings.set(
164            LDAPSourcePropertyMapping.objects.filter(
165                Q(managed__startswith="goauthentik.io/sources/ldap/default")
166                | Q(managed__startswith="goauthentik.io/sources/ldap/ms")
167            )
168        )
169        self.source.group_property_mappings.set(
170            LDAPSourcePropertyMapping.objects.filter(
171                managed="goauthentik.io/sources/ldap/default-name"
172            )
173        )
174        connection = MagicMock(return_value=mock_ad_connection())
175
176        # Create the user beforehand so we can set attributes and check they aren't removed
177        user = User.objects.create(
178            username="erin.h",
179            attributes={
180                "ldap_uniq": "S-1-5-21-1955698215-2946288202-2760262721-1114",
181                "foo": "bar",
182            },
183        )
184        group = Group.objects.create(
185            name="Administrators", attributes={"ldap_uniq": "S-1-5-32-544", "foo": "bar"}
186        )
187
188        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
189            user_sync = UserLDAPSynchronizer(self.source, Task())
190            user_sync.sync_full()
191            group_sync = GroupLDAPSynchronizer(self.source, Task())
192            group_sync.sync_full()
193
194            user.refresh_from_db()
195            group.refresh_from_db()
196
197            self.assertEqual(user.name, "Erin M. Hagens")
198            self.assertEqual(user.attributes["foo"], "bar")
199            self.assertTrue(user.is_active)
200            self.assertEqual(user.path, "goauthentik.io/sources/ldap/ak-test")
201            self.assertTrue(
202                UserLDAPSourceConnection.objects.filter(
203                    source=self.source,
204                    user=user,
205                    identifier="S-1-5-21-1955698215-2946288202-2760262721-1114",
206                ).exists()
207            )
208
209            deactivated = User.objects.filter(username="deactivated.a").first()
210            self.assertIsNotNone(deactivated)
211            self.assertFalse(deactivated.is_active)
212
213            self.assertEqual(group.name, "Administrators")
214            self.assertTrue(
215                GroupLDAPSourceConnection.objects.filter(
216                    source=self.source, group=group, identifier="S-1-5-32-544"
217                ).exists()
218            )
219            self.assertEqual(group.attributes["foo"], "bar")
220
221    def test_sync_users_openldap(self):
222        """Test user sync"""
223        self.source.object_uniqueness_field = "uid"
224        self.source.user_property_mappings.set(
225            LDAPSourcePropertyMapping.objects.filter(
226                Q(managed__startswith="goauthentik.io/sources/ldap/default")
227                | Q(managed__startswith="goauthentik.io/sources/ldap/openldap")
228            )
229        )
230        connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD))
231        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
232            user_sync = UserLDAPSynchronizer(self.source, Task())
233            user_sync.sync_full()
234            self.assertTrue(User.objects.filter(username="user0_sn").exists())
235            self.assertFalse(User.objects.filter(username="user1_sn").exists())
236
237    def test_sync_users_freeipa_ish(self):
238        """Test user sync (FreeIPA-ish), mainly testing vendor quirks"""
239        self.source.object_uniqueness_field = "uid"
240        self.source.user_property_mappings.set(
241            LDAPSourcePropertyMapping.objects.filter(
242                Q(managed__startswith="goauthentik.io/sources/ldap/default")
243                | Q(managed__startswith="goauthentik.io/sources/ldap/openldap")
244            )
245        )
246        connection = MagicMock(return_value=mock_freeipa_connection(LDAP_PASSWORD))
247        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
248            user_sync = UserLDAPSynchronizer(self.source, Task())
249            user_sync.sync_full()
250            self.assertTrue(User.objects.filter(username="user0_sn").exists())
251            self.assertFalse(User.objects.filter(username="user1_sn").exists())
252            self.assertFalse(User.objects.get(username="user-nsaccountlock").is_active)
253
254    def test_sync_groups_freeipa_memberOf(self):
255        """Test group sync when membership is derived from memberOf user attribute"""
256        self.source.object_uniqueness_field = "uid"
257        self.source.group_object_filter = "(objectClass=groupOfNames)"
258        self.source.lookup_groups_from_user = True
259        self.source.group_membership_field = "memberOf"
260        self.source.user_property_mappings.set(
261            LDAPSourcePropertyMapping.objects.filter(
262                Q(managed__startswith="goauthentik.io/sources/ldap/default")
263                | Q(managed__startswith="goauthentik.io/sources/ldap/openldap")
264            )
265        )
266        self.source.group_property_mappings.set(
267            LDAPSourcePropertyMapping.objects.filter(
268                managed="goauthentik.io/sources/ldap/openldap-cn"
269            )
270        )
271        connection = MagicMock(return_value=mock_freeipa_connection(LDAP_PASSWORD))
272        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
273            user_sync = UserLDAPSynchronizer(self.source, Task())
274            user_sync.sync_full()
275            group_sync = GroupLDAPSynchronizer(self.source, Task())
276            group_sync.sync_full()
277            membership_sync = MembershipLDAPSynchronizer(self.source, Task())
278            membership_sync.sync_full()
279
280            self.assertTrue(
281                User.objects.filter(username="user4_sn").exists(), "User does not exist"
282            )
283            # Test if membership mapping based on memberOf works.
284            memberof_group = Group.objects.filter(name="reverse-lookup-group")
285            self.assertTrue(memberof_group.exists(), "Group does not exist")
286            self.assertTrue(
287                memberof_group.first().users.filter(username="user4_sn").exists(),
288                "User not a member of the group",
289            )
290
291    def test_sync_groups_ad(self):
292        """Test group sync"""
293        self.source.base_dn = "dc=t,dc=goauthentik,dc=io"
294        self.source.additional_user_dn = ""
295        self.source.additional_group_dn = ""
296        self.source.save()
297        self.source.user_property_mappings.set(
298            LDAPSourcePropertyMapping.objects.filter(
299                Q(managed__startswith="goauthentik.io/sources/ldap/default")
300                | Q(managed__startswith="goauthentik.io/sources/ldap/ms")
301            )
302        )
303        self.source.group_property_mappings.set(
304            LDAPSourcePropertyMapping.objects.filter(
305                managed="goauthentik.io/sources/ldap/default-name"
306            )
307        )
308        connection = MagicMock(return_value=mock_ad_connection())
309        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
310            _user = create_test_admin_user()
311            parent_group = Group.objects.get(name=_user.username)
312            self.source.sync_parent_group = parent_group
313            self.source.save()
314            group_sync = GroupLDAPSynchronizer(self.source, Task())
315            group_sync.sync_full()
316            membership_sync = MembershipLDAPSynchronizer(self.source, Task())
317            membership_sync.sync_full()
318            group: Group = Group.objects.filter(name="Test Group").first()
319            self.assertIsNotNone(group)
320            self.assertEqual(group.parents.first(), parent_group)
321
322    def test_sync_groups_openldap(self):
323        """Test group sync"""
324        self.source.object_uniqueness_field = "uid"
325        self.source.group_object_filter = "(objectClass=groupOfNames)"
326        self.source.user_property_mappings.set(
327            LDAPSourcePropertyMapping.objects.filter(
328                Q(managed__startswith="goauthentik.io/sources/ldap/default")
329                | Q(managed__startswith="goauthentik.io/sources/ldap/openldap")
330            )
331        )
332        self.source.group_property_mappings.set(
333            LDAPSourcePropertyMapping.objects.filter(
334                managed="goauthentik.io/sources/ldap/openldap-cn"
335            )
336        )
337        connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD))
338        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
339            self.source.save()
340            group_sync = GroupLDAPSynchronizer(self.source, Task())
341            group_sync.sync_full()
342            membership_sync = MembershipLDAPSynchronizer(self.source, Task())
343            membership_sync.sync_full()
344            group = Group.objects.filter(name="group1")
345            self.assertTrue(group.exists())
346
347    def test_sync_groups_openldap_posix_group(self):
348        """Test posix group sync"""
349        self.source.object_uniqueness_field = "cn"
350        self.source.group_membership_field = "memberUid"
351        self.source.user_object_filter = "(objectClass=posixAccount)"
352        self.source.group_object_filter = "(objectClass=posixGroup)"
353        self.source.user_membership_attribute = "uid"
354        self.source.user_property_mappings.set(
355            [
356                *LDAPSourcePropertyMapping.objects.filter(
357                    Q(managed__startswith="goauthentik.io/sources/ldap/default")
358                    | Q(managed__startswith="goauthentik.io/sources/ldap/openldap")
359                ).all(),
360                LDAPSourcePropertyMapping.objects.create(
361                    name="name",
362                    expression='return {"attributes": {"uid": list_flatten(ldap.get("uid"))}}',
363                ),
364            ]
365        )
366        self.source.group_property_mappings.set(
367            LDAPSourcePropertyMapping.objects.filter(
368                managed="goauthentik.io/sources/ldap/openldap-cn"
369            )
370        )
371        connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD))
372        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
373            self.source.save()
374            user_sync = UserLDAPSynchronizer(self.source, Task())
375            user_sync.sync_full()
376            group_sync = GroupLDAPSynchronizer(self.source, Task())
377            group_sync.sync_full()
378            membership_sync = MembershipLDAPSynchronizer(self.source, Task())
379            membership_sync.sync_full()
380            # Test if membership mapping based on memberUid works.
381            posix_group = Group.objects.filter(name="group-posix").first()
382            self.assertTrue(posix_group.users.filter(name="user-posix").exists())
383
384    def test_sync_groups_openldap_posix_group_nonstandard_membership_attribute(self):
385        """Test posix group sync"""
386        self.source.object_uniqueness_field = "cn"
387        self.source.group_membership_field = "memberUid"
388        self.source.user_object_filter = "(objectClass=posixAccount)"
389        self.source.group_object_filter = "(objectClass=posixGroup)"
390        self.source.user_membership_attribute = "cn"
391        self.source.user_property_mappings.set(
392            [
393                *LDAPSourcePropertyMapping.objects.filter(
394                    Q(managed__startswith="goauthentik.io/sources/ldap/default")
395                    | Q(managed__startswith="goauthentik.io/sources/ldap/openldap")
396                ).all(),
397                LDAPSourcePropertyMapping.objects.create(
398                    name="name",
399                    expression='return {"attributes": {"cn": list_flatten(ldap.get("cn"))}}',
400                ),
401            ]
402        )
403        self.source.group_property_mappings.set(
404            LDAPSourcePropertyMapping.objects.filter(
405                managed="goauthentik.io/sources/ldap/openldap-cn"
406            )
407        )
408        connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD))
409        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
410            self.source.save()
411            user_sync = UserLDAPSynchronizer(self.source, Task())
412            user_sync.sync_full()
413            group_sync = GroupLDAPSynchronizer(self.source, Task())
414            group_sync.sync_full()
415            membership_sync = MembershipLDAPSynchronizer(self.source, Task())
416            membership_sync.sync_full()
417            # Test if membership mapping based on memberUid works.
418            posix_group = Group.objects.filter(name="group-posix").first()
419            self.assertTrue(posix_group.users.filter(name="user-posix").exists())
420
421    def test_tasks_ad(self):
422        """Test Scheduled tasks"""
423        self.source.user_property_mappings.set(
424            LDAPSourcePropertyMapping.objects.filter(
425                Q(managed__startswith="goauthentik.io/sources/ldap/default")
426                | Q(managed__startswith="goauthentik.io/sources/ldap/ms")
427            )
428        )
429        self.source.save()
430        connection = MagicMock(return_value=mock_ad_connection())
431        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
432            ldap_sync.send(self.source.pk)
433
434    def test_tasks_openldap(self):
435        """Test Scheduled tasks"""
436        self.source.object_uniqueness_field = "uid"
437        self.source.group_object_filter = "(objectClass=groupOfNames)"
438        self.source.user_property_mappings.set(
439            LDAPSourcePropertyMapping.objects.filter(
440                Q(managed__startswith="goauthentik.io/sources/ldap/default")
441                | Q(managed__startswith="goauthentik.io/sources/ldap/openldap")
442            )
443        )
444        self.source.save()
445        connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD))
446        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
447            ldap_sync.send(self.source.pk)
448
449    def test_user_deletion(self):
450        """Test user deletion"""
451        user = User.objects.create_user(username="not-in-the-source")
452        UserLDAPSourceConnection.objects.create(
453            user=user, source=self.source, identifier="not-in-the-source"
454        )
455        self.source.object_uniqueness_field = "uid"
456        self.source.group_object_filter = "(objectClass=groupOfNames)"
457        self.source.delete_not_found_objects = True
458        self.source.save()
459
460        connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD))
461        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
462            ldap_sync.send(self.source.pk)
463        self.assertFalse(User.objects.filter(username="not-in-the-source").exists())
464
465    def test_user_deletion_still_in_source(self):
466        """Test that user is not deleted if it's still in the source"""
467        username = user_in_slapd_cn
468        identifier = user_in_slapd_uid
469        user = User.objects.create_user(username=username)
470        UserLDAPSourceConnection.objects.create(
471            user=user, source=self.source, identifier=identifier
472        )
473        self.source.object_uniqueness_field = "uid"
474        self.source.group_object_filter = "(objectClass=groupOfNames)"
475        self.source.delete_not_found_objects = True
476        self.source.save()
477
478        connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD))
479        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
480            ldap_sync.send(self.source.pk)
481        self.assertTrue(User.objects.filter(username=username).exists())
482
483    def test_user_deletion_no_sync(self):
484        """Test that user is not deleted if sync_users is False"""
485        user = User.objects.create_user(username="not-in-the-source")
486        UserLDAPSourceConnection.objects.create(
487            user=user, source=self.source, identifier="not-in-the-source"
488        )
489        self.source.object_uniqueness_field = "uid"
490        self.source.group_object_filter = "(objectClass=groupOfNames)"
491        self.source.delete_not_found_objects = True
492        self.source.sync_users = False
493        self.source.save()
494
495        connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD))
496        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
497            ldap_sync.send(self.source.pk)
498        self.assertTrue(User.objects.filter(username="not-in-the-source").exists())
499
500    def test_user_deletion_no_delete(self):
501        """Test that user is not deleted if delete_not_found_objects is False"""
502        user = User.objects.create_user(username="not-in-the-source")
503        UserLDAPSourceConnection.objects.create(
504            user=user, source=self.source, identifier="not-in-the-source"
505        )
506        self.source.object_uniqueness_field = "uid"
507        self.source.group_object_filter = "(objectClass=groupOfNames)"
508        self.source.save()
509
510        connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD))
511        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
512            ldap_sync.send(self.source.pk)
513        self.assertTrue(User.objects.filter(username="not-in-the-source").exists())
514
515    def test_group_deletion(self):
516        """Test group deletion"""
517        group = Group.objects.create(name="not-in-the-source")
518        GroupLDAPSourceConnection.objects.create(
519            group=group, source=self.source, identifier="not-in-the-source"
520        )
521        self.source.object_uniqueness_field = "uid"
522        self.source.group_object_filter = "(objectClass=groupOfNames)"
523        self.source.delete_not_found_objects = True
524        self.source.save()
525
526        connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD))
527        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
528            ldap_sync.send(self.source.pk)
529        self.assertFalse(Group.objects.filter(name="not-in-the-source").exists())
530
531    def test_group_deletion_still_in_source(self):
532        """Test that group is not deleted if it's still in the source"""
533        groupname = group_in_slapd_cn
534        identifier = group_in_slapd_uid
535        group = Group.objects.create(name=groupname)
536        GroupLDAPSourceConnection.objects.create(
537            group=group, source=self.source, identifier=identifier
538        )
539        self.source.object_uniqueness_field = "uid"
540        self.source.group_object_filter = "(objectClass=groupOfNames)"
541        self.source.delete_not_found_objects = True
542        self.source.save()
543
544        connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD))
545        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
546            ldap_sync.send(self.source.pk)
547        self.assertTrue(Group.objects.filter(name=groupname).exists())
548
549    def test_group_deletion_no_sync(self):
550        """Test that group is not deleted if sync_groups is False"""
551        group = Group.objects.create(name="not-in-the-source")
552        GroupLDAPSourceConnection.objects.create(
553            group=group, source=self.source, identifier="not-in-the-source"
554        )
555        self.source.object_uniqueness_field = "uid"
556        self.source.group_object_filter = "(objectClass=groupOfNames)"
557        self.source.delete_not_found_objects = True
558        self.source.sync_groups = False
559        self.source.save()
560
561        connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD))
562        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
563            ldap_sync.send(self.source.pk)
564        self.assertTrue(Group.objects.filter(name="not-in-the-source").exists())
565
566    def test_group_deletion_no_delete(self):
567        """Test that group is not deleted if delete_not_found_objects is False"""
568        group = Group.objects.create(name="not-in-the-source")
569        GroupLDAPSourceConnection.objects.create(
570            group=group, source=self.source, identifier="not-in-the-source"
571        )
572        self.source.object_uniqueness_field = "uid"
573        self.source.group_object_filter = "(objectClass=groupOfNames)"
574        self.source.save()
575
576        connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD))
577        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
578            ldap_sync.send(self.source.pk)
579        self.assertTrue(Group.objects.filter(name="not-in-the-source").exists())
580
581    def test_batch_deletion(self):
582        """Test batch deletion"""
583        BATCH_SIZE = DELETE_CHUNK_SIZE + 1
584        for i in range(BATCH_SIZE):
585            user = User.objects.create_user(username=f"not-in-the-source-{i}")
586            group = Group.objects.create(name=f"not-in-the-source-{i}")
587            group.users.add(user)
588            UserLDAPSourceConnection.objects.create(
589                user=user, source=self.source, identifier=f"not-in-the-source-{i}-user"
590            )
591            GroupLDAPSourceConnection.objects.create(
592                group=group, source=self.source, identifier=f"not-in-the-source-{i}-group"
593            )
594        self.source.object_uniqueness_field = "uid"
595        self.source.group_object_filter = "(objectClass=groupOfNames)"
596        self.source.delete_not_found_objects = True
597        self.source.save()
598
599        connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD))
600        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
601            ldap_sync.send(self.source.pk)
602
603        self.assertFalse(User.objects.filter(username__startswith="not-in-the-source").exists())
604        self.assertFalse(Group.objects.filter(name__startswith="not-in-the-source").exists())
605
606    def test_membership_sync_special_chars_in_group_dn(self):
607        """Test membership synchronization with special characters in group DN"""
608        self.source.object_uniqueness_field = "uid"
609        self.source.group_object_filter = "(objectClass=groupOfNames)"
610        self.source.lookup_groups_from_user = True
611        self.source.group_membership_field = "memberOf"
612
613        # Mock connection with group DN containing special characters
614        mock_conn = MagicMock()
615
616        # Simulate group with special characters in DN: parentheses, backslashes, asterisks
617        special_group_dn = "cn=test(group),ou=groups,dc=example,dc=com"
618        backslash_group_dn = "cn=test\\group,ou=groups,dc=example,dc=com"
619        asterisk_group_dn = "cn=test*group,ou=groups,dc=example,dc=com"
620
621        # Mock the paged_search method that would be called with the filter
622        mock_standard = MagicMock()
623        mock_conn.extend.standard = mock_standard
624
625        # Test case 1: Group DN with parentheses
626        with patch("authentik.sources.ldap.models.LDAPSource.connection", return_value=mock_conn):
627            membership_sync = MembershipLDAPSynchronizer(self.source, Task())
628
629            # Simulate group data with special characters in DN
630            page_data = [{"dn": special_group_dn}]
631
632            # This should not raise LDAPInvalidFilterError anymore
633            try:
634                membership_sync.sync(page_data)
635                # Verify that the filter was properly escaped
636                # The call should have been made with escaped characters
637                mock_standard.paged_search.assert_called()
638                call_args = mock_standard.paged_search.call_args
639                search_filter = call_args[1]["search_filter"]
640                # The parentheses should be escaped as \28 and \29
641                self.assertIn("\\28", search_filter)  # Escaped (
642                self.assertIn("\\29", search_filter)  # Escaped )
643            except LDAPInvalidFilterError:
644                self.fail("LDAPInvalidFilterError should not be raised with escaped filter")
645
646        # Test case 2: Group DN with backslashes
647        with patch("authentik.sources.ldap.models.LDAPSource.connection", return_value=mock_conn):
648            membership_sync = MembershipLDAPSynchronizer(self.source, Task())
649            page_data = [{"dn": backslash_group_dn}]
650
651            try:
652                membership_sync.sync(page_data)
653                call_args = mock_standard.paged_search.call_args
654                search_filter = call_args[1]["search_filter"]
655                # The backslash should be escaped as \5c
656                self.assertIn("\\5c", search_filter)  # Escaped \
657            except LDAPInvalidFilterError:
658                self.fail("LDAPInvalidFilterError should not be raised with escaped filter")
659
660        # Test case 3: Group DN with asterisks
661        with patch("authentik.sources.ldap.models.LDAPSource.connection", return_value=mock_conn):
662            membership_sync = MembershipLDAPSynchronizer(self.source, Task())
663            page_data = [{"dn": asterisk_group_dn}]
664
665            try:
666                membership_sync.sync(page_data)
667                call_args = mock_standard.paged_search.call_args
668                search_filter = call_args[1]["search_filter"]
669                # The asterisk should be escaped as \2a
670                self.assertIn("\\2a", search_filter)  # Escaped *
671            except LDAPInvalidFilterError:
672                self.fail("LDAPInvalidFilterError should not be raised with escaped filter")
673
674    def test_escape_filter_chars_function(self):
675        """Test the escape_filter_chars function directly"""
676
677        # Test various special characters that need escaping
678        test_cases = [
679            ("test(group)", "test\\28group\\29"),  # parentheses
680            ("test\\group", "test\\5cgroup"),  # backslash
681            ("test*group", "test\\2agroup"),  # asterisk
682            ("test(*)group", "test\\28\\2a\\29group"),  # multiple special chars
683            ("normalgroup", "normalgroup"),  # no special chars
684            ("", ""),  # empty string
685        ]
686
687        for input_str, expected in test_cases:
688            with self.subTest(input_str=input_str):
689                result = escape_filter_chars(input_str)
690                self.assertEqual(result, expected)

LDAP Sync tests

@apply_blueprint('system/sources-ldap.yaml')
def setUp(self):
46    @apply_blueprint("system/sources-ldap.yaml")
47    def setUp(self):
48        self.source: LDAPSource = LDAPSource.objects.create(
49            name="ldap",
50            slug="ldap",
51            base_dn="dc=goauthentik,dc=io",
52            additional_user_dn="ou=users",
53            additional_group_dn="ou=groups",
54        )

Hook method for setting up the test fixture before exercising it.

def test_sync_missing_page(self):
56    def test_sync_missing_page(self):
57        """Test sync with missing page"""
58        connection = MagicMock(return_value=mock_ad_connection())
59        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
60            ldap_sync_page.send(self.source.pk, class_to_path(UserLDAPSynchronizer), "foo")

Test sync with missing page

def test_sync_error(self):
62    def test_sync_error(self):
63        """Test user sync"""
64        self.source.base_dn = "dc=t,dc=goauthentik,dc=io"
65        self.source.additional_user_dn = ""
66        self.source.additional_group_dn = ""
67        self.source.save()
68        self.source.user_property_mappings.set(
69            LDAPSourcePropertyMapping.objects.filter(
70                Q(managed__startswith="goauthentik.io/sources/ldap/default")
71                | Q(managed__startswith="goauthentik.io/sources/ldap/ms")
72            )
73        )
74        mapping = LDAPSourcePropertyMapping.objects.create(
75            name="name",
76            expression="q",
77        )
78        self.source.user_property_mappings.set([mapping])
79        self.source.save()
80        connection = MagicMock(return_value=mock_ad_connection())
81        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
82            user_sync = UserLDAPSynchronizer(self.source, Task())
83            with self.assertRaises(StopSync):
84                user_sync.sync_full()
85            self.assertFalse(User.objects.filter(username="user0_sn").exists())
86            self.assertFalse(User.objects.filter(username="user1_sn").exists())
87        events = Event.objects.filter(
88            action=EventAction.CONFIGURATION_ERROR,
89            context__message="Failed to evaluate property mapping: 'name'",
90            context__mapping__pk=mapping.pk.hex,
91        )
92        self.assertTrue(events.exists())

Test user sync

def test_sync_mapping(self):
 94    def test_sync_mapping(self):
 95        """Test property mappings"""
 96        none = LDAPSourcePropertyMapping.objects.create(
 97            name=generate_id(), expression="return None"
 98        )
 99        byte_mapping = LDAPSourcePropertyMapping.objects.create(
100            name=generate_id(), expression="return b''"
101        )
102        self.source.user_property_mappings.set(
103            LDAPSourcePropertyMapping.objects.filter(
104                Q(managed__startswith="goauthentik.io/sources/ldap/default")
105                | Q(managed__startswith="goauthentik.io/sources/ldap/ms")
106            )
107        )
108        self.source.user_property_mappings.add(none, byte_mapping)
109        connection = MagicMock(return_value=mock_ad_connection())
110
111        # we basically just test that the mappings don't throw errors
112        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
113            user_sync = UserLDAPSynchronizer(self.source, Task())
114            user_sync.sync_full()

Test property mappings

def test_sync_users_ad(self):
116    def test_sync_users_ad(self):
117        """Test user sync"""
118        self.source.base_dn = "dc=t,dc=goauthentik,dc=io"
119        self.source.additional_user_dn = ""
120        self.source.additional_group_dn = ""
121        self.source.save()
122        self.source.user_property_mappings.set(
123            LDAPSourcePropertyMapping.objects.filter(
124                Q(managed__startswith="goauthentik.io/sources/ldap/default")
125                | Q(managed__startswith="goauthentik.io/sources/ldap/ms")
126            )
127        )
128        connection = MagicMock(return_value=mock_ad_connection())
129
130        # Create the user beforehand so we can set attributes and check they aren't removed
131        user = User.objects.create(
132            username="erin.h",
133            attributes={
134                "foo": "bar",
135            },
136        )
137        UserLDAPSourceConnection.objects.create(
138            user=user,
139            source=self.source,
140            identifier="S-1-5-21-1955698215-2946288202-2760262721-1114",
141        )
142
143        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
144            user_sync = UserLDAPSynchronizer(self.source, Task())
145            user_sync.sync_full()
146
147            user.refresh_from_db()
148            self.assertEqual(user.name, "Erin M. Hagens")
149            self.assertEqual(user.attributes["foo"], "bar")
150            self.assertTrue(user.is_active)
151            self.assertEqual(user.path, "goauthentik.io/sources/ldap/ak-test")
152
153            deactivated = User.objects.filter(username="deactivated.a").first()
154            self.assertIsNotNone(deactivated)
155            self.assertFalse(deactivated.is_active)

Test user sync

def test_sync_ad_legacy(self):
157    def test_sync_ad_legacy(self):
158        """Test user sync"""
159        self.source.base_dn = "dc=t,dc=goauthentik,dc=io"
160        self.source.additional_user_dn = ""
161        self.source.additional_group_dn = ""
162        self.source.save()
163        self.source.user_property_mappings.set(
164            LDAPSourcePropertyMapping.objects.filter(
165                Q(managed__startswith="goauthentik.io/sources/ldap/default")
166                | Q(managed__startswith="goauthentik.io/sources/ldap/ms")
167            )
168        )
169        self.source.group_property_mappings.set(
170            LDAPSourcePropertyMapping.objects.filter(
171                managed="goauthentik.io/sources/ldap/default-name"
172            )
173        )
174        connection = MagicMock(return_value=mock_ad_connection())
175
176        # Create the user beforehand so we can set attributes and check they aren't removed
177        user = User.objects.create(
178            username="erin.h",
179            attributes={
180                "ldap_uniq": "S-1-5-21-1955698215-2946288202-2760262721-1114",
181                "foo": "bar",
182            },
183        )
184        group = Group.objects.create(
185            name="Administrators", attributes={"ldap_uniq": "S-1-5-32-544", "foo": "bar"}
186        )
187
188        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
189            user_sync = UserLDAPSynchronizer(self.source, Task())
190            user_sync.sync_full()
191            group_sync = GroupLDAPSynchronizer(self.source, Task())
192            group_sync.sync_full()
193
194            user.refresh_from_db()
195            group.refresh_from_db()
196
197            self.assertEqual(user.name, "Erin M. Hagens")
198            self.assertEqual(user.attributes["foo"], "bar")
199            self.assertTrue(user.is_active)
200            self.assertEqual(user.path, "goauthentik.io/sources/ldap/ak-test")
201            self.assertTrue(
202                UserLDAPSourceConnection.objects.filter(
203                    source=self.source,
204                    user=user,
205                    identifier="S-1-5-21-1955698215-2946288202-2760262721-1114",
206                ).exists()
207            )
208
209            deactivated = User.objects.filter(username="deactivated.a").first()
210            self.assertIsNotNone(deactivated)
211            self.assertFalse(deactivated.is_active)
212
213            self.assertEqual(group.name, "Administrators")
214            self.assertTrue(
215                GroupLDAPSourceConnection.objects.filter(
216                    source=self.source, group=group, identifier="S-1-5-32-544"
217                ).exists()
218            )
219            self.assertEqual(group.attributes["foo"], "bar")

Test user sync

def test_sync_users_openldap(self):
221    def test_sync_users_openldap(self):
222        """Test user sync"""
223        self.source.object_uniqueness_field = "uid"
224        self.source.user_property_mappings.set(
225            LDAPSourcePropertyMapping.objects.filter(
226                Q(managed__startswith="goauthentik.io/sources/ldap/default")
227                | Q(managed__startswith="goauthentik.io/sources/ldap/openldap")
228            )
229        )
230        connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD))
231        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
232            user_sync = UserLDAPSynchronizer(self.source, Task())
233            user_sync.sync_full()
234            self.assertTrue(User.objects.filter(username="user0_sn").exists())
235            self.assertFalse(User.objects.filter(username="user1_sn").exists())

Test user sync

def test_sync_users_freeipa_ish(self):
237    def test_sync_users_freeipa_ish(self):
238        """Test user sync (FreeIPA-ish), mainly testing vendor quirks"""
239        self.source.object_uniqueness_field = "uid"
240        self.source.user_property_mappings.set(
241            LDAPSourcePropertyMapping.objects.filter(
242                Q(managed__startswith="goauthentik.io/sources/ldap/default")
243                | Q(managed__startswith="goauthentik.io/sources/ldap/openldap")
244            )
245        )
246        connection = MagicMock(return_value=mock_freeipa_connection(LDAP_PASSWORD))
247        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
248            user_sync = UserLDAPSynchronizer(self.source, Task())
249            user_sync.sync_full()
250            self.assertTrue(User.objects.filter(username="user0_sn").exists())
251            self.assertFalse(User.objects.filter(username="user1_sn").exists())
252            self.assertFalse(User.objects.get(username="user-nsaccountlock").is_active)

Test user sync (FreeIPA-ish), mainly testing vendor quirks

def test_sync_groups_freeipa_memberOf(self):
254    def test_sync_groups_freeipa_memberOf(self):
255        """Test group sync when membership is derived from memberOf user attribute"""
256        self.source.object_uniqueness_field = "uid"
257        self.source.group_object_filter = "(objectClass=groupOfNames)"
258        self.source.lookup_groups_from_user = True
259        self.source.group_membership_field = "memberOf"
260        self.source.user_property_mappings.set(
261            LDAPSourcePropertyMapping.objects.filter(
262                Q(managed__startswith="goauthentik.io/sources/ldap/default")
263                | Q(managed__startswith="goauthentik.io/sources/ldap/openldap")
264            )
265        )
266        self.source.group_property_mappings.set(
267            LDAPSourcePropertyMapping.objects.filter(
268                managed="goauthentik.io/sources/ldap/openldap-cn"
269            )
270        )
271        connection = MagicMock(return_value=mock_freeipa_connection(LDAP_PASSWORD))
272        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
273            user_sync = UserLDAPSynchronizer(self.source, Task())
274            user_sync.sync_full()
275            group_sync = GroupLDAPSynchronizer(self.source, Task())
276            group_sync.sync_full()
277            membership_sync = MembershipLDAPSynchronizer(self.source, Task())
278            membership_sync.sync_full()
279
280            self.assertTrue(
281                User.objects.filter(username="user4_sn").exists(), "User does not exist"
282            )
283            # Test if membership mapping based on memberOf works.
284            memberof_group = Group.objects.filter(name="reverse-lookup-group")
285            self.assertTrue(memberof_group.exists(), "Group does not exist")
286            self.assertTrue(
287                memberof_group.first().users.filter(username="user4_sn").exists(),
288                "User not a member of the group",
289            )

Test group sync when membership is derived from memberOf user attribute

def test_sync_groups_ad(self):
291    def test_sync_groups_ad(self):
292        """Test group sync"""
293        self.source.base_dn = "dc=t,dc=goauthentik,dc=io"
294        self.source.additional_user_dn = ""
295        self.source.additional_group_dn = ""
296        self.source.save()
297        self.source.user_property_mappings.set(
298            LDAPSourcePropertyMapping.objects.filter(
299                Q(managed__startswith="goauthentik.io/sources/ldap/default")
300                | Q(managed__startswith="goauthentik.io/sources/ldap/ms")
301            )
302        )
303        self.source.group_property_mappings.set(
304            LDAPSourcePropertyMapping.objects.filter(
305                managed="goauthentik.io/sources/ldap/default-name"
306            )
307        )
308        connection = MagicMock(return_value=mock_ad_connection())
309        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
310            _user = create_test_admin_user()
311            parent_group = Group.objects.get(name=_user.username)
312            self.source.sync_parent_group = parent_group
313            self.source.save()
314            group_sync = GroupLDAPSynchronizer(self.source, Task())
315            group_sync.sync_full()
316            membership_sync = MembershipLDAPSynchronizer(self.source, Task())
317            membership_sync.sync_full()
318            group: Group = Group.objects.filter(name="Test Group").first()
319            self.assertIsNotNone(group)
320            self.assertEqual(group.parents.first(), parent_group)

Test group sync

def test_sync_groups_openldap(self):
322    def test_sync_groups_openldap(self):
323        """Test group sync"""
324        self.source.object_uniqueness_field = "uid"
325        self.source.group_object_filter = "(objectClass=groupOfNames)"
326        self.source.user_property_mappings.set(
327            LDAPSourcePropertyMapping.objects.filter(
328                Q(managed__startswith="goauthentik.io/sources/ldap/default")
329                | Q(managed__startswith="goauthentik.io/sources/ldap/openldap")
330            )
331        )
332        self.source.group_property_mappings.set(
333            LDAPSourcePropertyMapping.objects.filter(
334                managed="goauthentik.io/sources/ldap/openldap-cn"
335            )
336        )
337        connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD))
338        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
339            self.source.save()
340            group_sync = GroupLDAPSynchronizer(self.source, Task())
341            group_sync.sync_full()
342            membership_sync = MembershipLDAPSynchronizer(self.source, Task())
343            membership_sync.sync_full()
344            group = Group.objects.filter(name="group1")
345            self.assertTrue(group.exists())

Test group sync

def test_sync_groups_openldap_posix_group(self):
347    def test_sync_groups_openldap_posix_group(self):
348        """Test posix group sync"""
349        self.source.object_uniqueness_field = "cn"
350        self.source.group_membership_field = "memberUid"
351        self.source.user_object_filter = "(objectClass=posixAccount)"
352        self.source.group_object_filter = "(objectClass=posixGroup)"
353        self.source.user_membership_attribute = "uid"
354        self.source.user_property_mappings.set(
355            [
356                *LDAPSourcePropertyMapping.objects.filter(
357                    Q(managed__startswith="goauthentik.io/sources/ldap/default")
358                    | Q(managed__startswith="goauthentik.io/sources/ldap/openldap")
359                ).all(),
360                LDAPSourcePropertyMapping.objects.create(
361                    name="name",
362                    expression='return {"attributes": {"uid": list_flatten(ldap.get("uid"))}}',
363                ),
364            ]
365        )
366        self.source.group_property_mappings.set(
367            LDAPSourcePropertyMapping.objects.filter(
368                managed="goauthentik.io/sources/ldap/openldap-cn"
369            )
370        )
371        connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD))
372        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
373            self.source.save()
374            user_sync = UserLDAPSynchronizer(self.source, Task())
375            user_sync.sync_full()
376            group_sync = GroupLDAPSynchronizer(self.source, Task())
377            group_sync.sync_full()
378            membership_sync = MembershipLDAPSynchronizer(self.source, Task())
379            membership_sync.sync_full()
380            # Test if membership mapping based on memberUid works.
381            posix_group = Group.objects.filter(name="group-posix").first()
382            self.assertTrue(posix_group.users.filter(name="user-posix").exists())

Test posix group sync

def test_sync_groups_openldap_posix_group_nonstandard_membership_attribute(self):
384    def test_sync_groups_openldap_posix_group_nonstandard_membership_attribute(self):
385        """Test posix group sync"""
386        self.source.object_uniqueness_field = "cn"
387        self.source.group_membership_field = "memberUid"
388        self.source.user_object_filter = "(objectClass=posixAccount)"
389        self.source.group_object_filter = "(objectClass=posixGroup)"
390        self.source.user_membership_attribute = "cn"
391        self.source.user_property_mappings.set(
392            [
393                *LDAPSourcePropertyMapping.objects.filter(
394                    Q(managed__startswith="goauthentik.io/sources/ldap/default")
395                    | Q(managed__startswith="goauthentik.io/sources/ldap/openldap")
396                ).all(),
397                LDAPSourcePropertyMapping.objects.create(
398                    name="name",
399                    expression='return {"attributes": {"cn": list_flatten(ldap.get("cn"))}}',
400                ),
401            ]
402        )
403        self.source.group_property_mappings.set(
404            LDAPSourcePropertyMapping.objects.filter(
405                managed="goauthentik.io/sources/ldap/openldap-cn"
406            )
407        )
408        connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD))
409        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
410            self.source.save()
411            user_sync = UserLDAPSynchronizer(self.source, Task())
412            user_sync.sync_full()
413            group_sync = GroupLDAPSynchronizer(self.source, Task())
414            group_sync.sync_full()
415            membership_sync = MembershipLDAPSynchronizer(self.source, Task())
416            membership_sync.sync_full()
417            # Test if membership mapping based on memberUid works.
418            posix_group = Group.objects.filter(name="group-posix").first()
419            self.assertTrue(posix_group.users.filter(name="user-posix").exists())

Test posix group sync

def test_tasks_ad(self):
421    def test_tasks_ad(self):
422        """Test Scheduled tasks"""
423        self.source.user_property_mappings.set(
424            LDAPSourcePropertyMapping.objects.filter(
425                Q(managed__startswith="goauthentik.io/sources/ldap/default")
426                | Q(managed__startswith="goauthentik.io/sources/ldap/ms")
427            )
428        )
429        self.source.save()
430        connection = MagicMock(return_value=mock_ad_connection())
431        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
432            ldap_sync.send(self.source.pk)

Test Scheduled tasks

def test_tasks_openldap(self):
434    def test_tasks_openldap(self):
435        """Test Scheduled tasks"""
436        self.source.object_uniqueness_field = "uid"
437        self.source.group_object_filter = "(objectClass=groupOfNames)"
438        self.source.user_property_mappings.set(
439            LDAPSourcePropertyMapping.objects.filter(
440                Q(managed__startswith="goauthentik.io/sources/ldap/default")
441                | Q(managed__startswith="goauthentik.io/sources/ldap/openldap")
442            )
443        )
444        self.source.save()
445        connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD))
446        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
447            ldap_sync.send(self.source.pk)

Test Scheduled tasks

def test_user_deletion(self):
449    def test_user_deletion(self):
450        """Test user deletion"""
451        user = User.objects.create_user(username="not-in-the-source")
452        UserLDAPSourceConnection.objects.create(
453            user=user, source=self.source, identifier="not-in-the-source"
454        )
455        self.source.object_uniqueness_field = "uid"
456        self.source.group_object_filter = "(objectClass=groupOfNames)"
457        self.source.delete_not_found_objects = True
458        self.source.save()
459
460        connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD))
461        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
462            ldap_sync.send(self.source.pk)
463        self.assertFalse(User.objects.filter(username="not-in-the-source").exists())

Test user deletion

def test_user_deletion_still_in_source(self):
465    def test_user_deletion_still_in_source(self):
466        """Test that user is not deleted if it's still in the source"""
467        username = user_in_slapd_cn
468        identifier = user_in_slapd_uid
469        user = User.objects.create_user(username=username)
470        UserLDAPSourceConnection.objects.create(
471            user=user, source=self.source, identifier=identifier
472        )
473        self.source.object_uniqueness_field = "uid"
474        self.source.group_object_filter = "(objectClass=groupOfNames)"
475        self.source.delete_not_found_objects = True
476        self.source.save()
477
478        connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD))
479        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
480            ldap_sync.send(self.source.pk)
481        self.assertTrue(User.objects.filter(username=username).exists())

Test that user is not deleted if it's still in the source

def test_user_deletion_no_sync(self):
483    def test_user_deletion_no_sync(self):
484        """Test that user is not deleted if sync_users is False"""
485        user = User.objects.create_user(username="not-in-the-source")
486        UserLDAPSourceConnection.objects.create(
487            user=user, source=self.source, identifier="not-in-the-source"
488        )
489        self.source.object_uniqueness_field = "uid"
490        self.source.group_object_filter = "(objectClass=groupOfNames)"
491        self.source.delete_not_found_objects = True
492        self.source.sync_users = False
493        self.source.save()
494
495        connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD))
496        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
497            ldap_sync.send(self.source.pk)
498        self.assertTrue(User.objects.filter(username="not-in-the-source").exists())

Test that user is not deleted if sync_users is False

def test_user_deletion_no_delete(self):
500    def test_user_deletion_no_delete(self):
501        """Test that user is not deleted if delete_not_found_objects is False"""
502        user = User.objects.create_user(username="not-in-the-source")
503        UserLDAPSourceConnection.objects.create(
504            user=user, source=self.source, identifier="not-in-the-source"
505        )
506        self.source.object_uniqueness_field = "uid"
507        self.source.group_object_filter = "(objectClass=groupOfNames)"
508        self.source.save()
509
510        connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD))
511        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
512            ldap_sync.send(self.source.pk)
513        self.assertTrue(User.objects.filter(username="not-in-the-source").exists())

Test that user is not deleted if delete_not_found_objects is False

def test_group_deletion(self):
515    def test_group_deletion(self):
516        """Test group deletion"""
517        group = Group.objects.create(name="not-in-the-source")
518        GroupLDAPSourceConnection.objects.create(
519            group=group, source=self.source, identifier="not-in-the-source"
520        )
521        self.source.object_uniqueness_field = "uid"
522        self.source.group_object_filter = "(objectClass=groupOfNames)"
523        self.source.delete_not_found_objects = True
524        self.source.save()
525
526        connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD))
527        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
528            ldap_sync.send(self.source.pk)
529        self.assertFalse(Group.objects.filter(name="not-in-the-source").exists())

Test group deletion

def test_group_deletion_still_in_source(self):
531    def test_group_deletion_still_in_source(self):
532        """Test that group is not deleted if it's still in the source"""
533        groupname = group_in_slapd_cn
534        identifier = group_in_slapd_uid
535        group = Group.objects.create(name=groupname)
536        GroupLDAPSourceConnection.objects.create(
537            group=group, source=self.source, identifier=identifier
538        )
539        self.source.object_uniqueness_field = "uid"
540        self.source.group_object_filter = "(objectClass=groupOfNames)"
541        self.source.delete_not_found_objects = True
542        self.source.save()
543
544        connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD))
545        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
546            ldap_sync.send(self.source.pk)
547        self.assertTrue(Group.objects.filter(name=groupname).exists())

Test that group is not deleted if it's still in the source

def test_group_deletion_no_sync(self):
549    def test_group_deletion_no_sync(self):
550        """Test that group is not deleted if sync_groups is False"""
551        group = Group.objects.create(name="not-in-the-source")
552        GroupLDAPSourceConnection.objects.create(
553            group=group, source=self.source, identifier="not-in-the-source"
554        )
555        self.source.object_uniqueness_field = "uid"
556        self.source.group_object_filter = "(objectClass=groupOfNames)"
557        self.source.delete_not_found_objects = True
558        self.source.sync_groups = False
559        self.source.save()
560
561        connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD))
562        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
563            ldap_sync.send(self.source.pk)
564        self.assertTrue(Group.objects.filter(name="not-in-the-source").exists())

Test that group is not deleted if sync_groups is False

def test_group_deletion_no_delete(self):
566    def test_group_deletion_no_delete(self):
567        """Test that group is not deleted if delete_not_found_objects is False"""
568        group = Group.objects.create(name="not-in-the-source")
569        GroupLDAPSourceConnection.objects.create(
570            group=group, source=self.source, identifier="not-in-the-source"
571        )
572        self.source.object_uniqueness_field = "uid"
573        self.source.group_object_filter = "(objectClass=groupOfNames)"
574        self.source.save()
575
576        connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD))
577        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
578            ldap_sync.send(self.source.pk)
579        self.assertTrue(Group.objects.filter(name="not-in-the-source").exists())

Test that group is not deleted if delete_not_found_objects is False

def test_batch_deletion(self):
581    def test_batch_deletion(self):
582        """Test batch deletion"""
583        BATCH_SIZE = DELETE_CHUNK_SIZE + 1
584        for i in range(BATCH_SIZE):
585            user = User.objects.create_user(username=f"not-in-the-source-{i}")
586            group = Group.objects.create(name=f"not-in-the-source-{i}")
587            group.users.add(user)
588            UserLDAPSourceConnection.objects.create(
589                user=user, source=self.source, identifier=f"not-in-the-source-{i}-user"
590            )
591            GroupLDAPSourceConnection.objects.create(
592                group=group, source=self.source, identifier=f"not-in-the-source-{i}-group"
593            )
594        self.source.object_uniqueness_field = "uid"
595        self.source.group_object_filter = "(objectClass=groupOfNames)"
596        self.source.delete_not_found_objects = True
597        self.source.save()
598
599        connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD))
600        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
601            ldap_sync.send(self.source.pk)
602
603        self.assertFalse(User.objects.filter(username__startswith="not-in-the-source").exists())
604        self.assertFalse(Group.objects.filter(name__startswith="not-in-the-source").exists())

Test batch deletion

def test_membership_sync_special_chars_in_group_dn(self):
606    def test_membership_sync_special_chars_in_group_dn(self):
607        """Test membership synchronization with special characters in group DN"""
608        self.source.object_uniqueness_field = "uid"
609        self.source.group_object_filter = "(objectClass=groupOfNames)"
610        self.source.lookup_groups_from_user = True
611        self.source.group_membership_field = "memberOf"
612
613        # Mock connection with group DN containing special characters
614        mock_conn = MagicMock()
615
616        # Simulate group with special characters in DN: parentheses, backslashes, asterisks
617        special_group_dn = "cn=test(group),ou=groups,dc=example,dc=com"
618        backslash_group_dn = "cn=test\\group,ou=groups,dc=example,dc=com"
619        asterisk_group_dn = "cn=test*group,ou=groups,dc=example,dc=com"
620
621        # Mock the paged_search method that would be called with the filter
622        mock_standard = MagicMock()
623        mock_conn.extend.standard = mock_standard
624
625        # Test case 1: Group DN with parentheses
626        with patch("authentik.sources.ldap.models.LDAPSource.connection", return_value=mock_conn):
627            membership_sync = MembershipLDAPSynchronizer(self.source, Task())
628
629            # Simulate group data with special characters in DN
630            page_data = [{"dn": special_group_dn}]
631
632            # This should not raise LDAPInvalidFilterError anymore
633            try:
634                membership_sync.sync(page_data)
635                # Verify that the filter was properly escaped
636                # The call should have been made with escaped characters
637                mock_standard.paged_search.assert_called()
638                call_args = mock_standard.paged_search.call_args
639                search_filter = call_args[1]["search_filter"]
640                # The parentheses should be escaped as \28 and \29
641                self.assertIn("\\28", search_filter)  # Escaped (
642                self.assertIn("\\29", search_filter)  # Escaped )
643            except LDAPInvalidFilterError:
644                self.fail("LDAPInvalidFilterError should not be raised with escaped filter")
645
646        # Test case 2: Group DN with backslashes
647        with patch("authentik.sources.ldap.models.LDAPSource.connection", return_value=mock_conn):
648            membership_sync = MembershipLDAPSynchronizer(self.source, Task())
649            page_data = [{"dn": backslash_group_dn}]
650
651            try:
652                membership_sync.sync(page_data)
653                call_args = mock_standard.paged_search.call_args
654                search_filter = call_args[1]["search_filter"]
655                # The backslash should be escaped as \5c
656                self.assertIn("\\5c", search_filter)  # Escaped \
657            except LDAPInvalidFilterError:
658                self.fail("LDAPInvalidFilterError should not be raised with escaped filter")
659
660        # Test case 3: Group DN with asterisks
661        with patch("authentik.sources.ldap.models.LDAPSource.connection", return_value=mock_conn):
662            membership_sync = MembershipLDAPSynchronizer(self.source, Task())
663            page_data = [{"dn": asterisk_group_dn}]
664
665            try:
666                membership_sync.sync(page_data)
667                call_args = mock_standard.paged_search.call_args
668                search_filter = call_args[1]["search_filter"]
669                # The asterisk should be escaped as \2a
670                self.assertIn("\\2a", search_filter)  # Escaped *
671            except LDAPInvalidFilterError:
672                self.fail("LDAPInvalidFilterError should not be raised with escaped filter")

Test membership synchronization with special characters in group DN

def test_escape_filter_chars_function(self):
674    def test_escape_filter_chars_function(self):
675        """Test the escape_filter_chars function directly"""
676
677        # Test various special characters that need escaping
678        test_cases = [
679            ("test(group)", "test\\28group\\29"),  # parentheses
680            ("test\\group", "test\\5cgroup"),  # backslash
681            ("test*group", "test\\2agroup"),  # asterisk
682            ("test(*)group", "test\\28\\2a\\29group"),  # multiple special chars
683            ("normalgroup", "normalgroup"),  # no special chars
684            ("", ""),  # empty string
685        ]
686
687        for input_str, expected in test_cases:
688            with self.subTest(input_str=input_str):
689                result = escape_filter_chars(input_str)
690                self.assertEqual(result, expected)

Test the escape_filter_chars function directly