authentik.sources.ldap.tests.test_sync
LDAP Source tests
1"""LDAP Source tests""" 2 3from unittest.mock import MagicMock, patch 4 5from django.db.models import Q 6from django.test import TestCase 7from ldap3.core.exceptions import LDAPInvalidFilterError 8from ldap3.utils.conv import escape_filter_chars 9 10from authentik.blueprints.tests import apply_blueprint 11from authentik.core.models import Group, User 12from authentik.core.tests.utils import create_test_admin_user 13from authentik.events.models import Event, EventAction 14from authentik.lib.generators import generate_id, generate_key 15from authentik.lib.sync.outgoing.exceptions import StopSync 16from authentik.lib.utils.reflection import class_to_path 17from authentik.sources.ldap.models import ( 18 GroupLDAPSourceConnection, 19 LDAPSource, 20 LDAPSourcePropertyMapping, 21 UserLDAPSourceConnection, 22) 23from authentik.sources.ldap.sync.forward_delete_users import DELETE_CHUNK_SIZE 24from authentik.sources.ldap.sync.groups import GroupLDAPSynchronizer 25from authentik.sources.ldap.sync.membership import MembershipLDAPSynchronizer 26from authentik.sources.ldap.sync.users import UserLDAPSynchronizer 27from authentik.sources.ldap.tasks import ldap_sync, ldap_sync_page 28from authentik.sources.ldap.tests.mock_ad import mock_ad_connection 29from authentik.sources.ldap.tests.mock_freeipa import mock_freeipa_connection 30from authentik.sources.ldap.tests.mock_slapd import ( 31 group_in_slapd_cn, 32 group_in_slapd_uid, 33 mock_slapd_connection, 34 user_in_slapd_cn, 35 user_in_slapd_uid, 36) 37from authentik.tasks.models import Task 38 39LDAP_PASSWORD = generate_key() 40 41 42class LDAPSyncTests(TestCase): 43 """LDAP Sync tests""" 44 45 @apply_blueprint("system/sources-ldap.yaml") 46 def setUp(self): 47 self.source: LDAPSource = LDAPSource.objects.create( 48 name="ldap", 49 slug="ldap", 50 base_dn="dc=goauthentik,dc=io", 51 additional_user_dn="ou=users", 52 additional_group_dn="ou=groups", 53 ) 54 55 def test_sync_missing_page(self): 56 """Test sync with missing page""" 57 connection = MagicMock(return_value=mock_ad_connection()) 58 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 59 ldap_sync_page.send(self.source.pk, class_to_path(UserLDAPSynchronizer), "foo") 60 61 def test_sync_error(self): 62 """Test user sync""" 63 self.source.base_dn = "dc=t,dc=goauthentik,dc=io" 64 self.source.additional_user_dn = "" 65 self.source.additional_group_dn = "" 66 self.source.save() 67 self.source.user_property_mappings.set( 68 LDAPSourcePropertyMapping.objects.filter( 69 Q(managed__startswith="goauthentik.io/sources/ldap/default") 70 | Q(managed__startswith="goauthentik.io/sources/ldap/ms") 71 ) 72 ) 73 mapping = LDAPSourcePropertyMapping.objects.create( 74 name="name", 75 expression="q", 76 ) 77 self.source.user_property_mappings.set([mapping]) 78 self.source.save() 79 connection = MagicMock(return_value=mock_ad_connection()) 80 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 81 user_sync = UserLDAPSynchronizer(self.source, Task()) 82 with self.assertRaises(StopSync): 83 user_sync.sync_full() 84 self.assertFalse(User.objects.filter(username="user0_sn").exists()) 85 self.assertFalse(User.objects.filter(username="user1_sn").exists()) 86 events = Event.objects.filter( 87 action=EventAction.CONFIGURATION_ERROR, 88 context__message="Failed to evaluate property mapping: 'name'", 89 context__mapping__pk=mapping.pk.hex, 90 ) 91 self.assertTrue(events.exists()) 92 93 def test_sync_mapping(self): 94 """Test property mappings""" 95 none = LDAPSourcePropertyMapping.objects.create( 96 name=generate_id(), expression="return None" 97 ) 98 byte_mapping = LDAPSourcePropertyMapping.objects.create( 99 name=generate_id(), expression="return b''" 100 ) 101 self.source.user_property_mappings.set( 102 LDAPSourcePropertyMapping.objects.filter( 103 Q(managed__startswith="goauthentik.io/sources/ldap/default") 104 | Q(managed__startswith="goauthentik.io/sources/ldap/ms") 105 ) 106 ) 107 self.source.user_property_mappings.add(none, byte_mapping) 108 connection = MagicMock(return_value=mock_ad_connection()) 109 110 # we basically just test that the mappings don't throw errors 111 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 112 user_sync = UserLDAPSynchronizer(self.source, Task()) 113 user_sync.sync_full() 114 115 def test_sync_users_ad(self): 116 """Test user sync""" 117 self.source.base_dn = "dc=t,dc=goauthentik,dc=io" 118 self.source.additional_user_dn = "" 119 self.source.additional_group_dn = "" 120 self.source.save() 121 self.source.user_property_mappings.set( 122 LDAPSourcePropertyMapping.objects.filter( 123 Q(managed__startswith="goauthentik.io/sources/ldap/default") 124 | Q(managed__startswith="goauthentik.io/sources/ldap/ms") 125 ) 126 ) 127 connection = MagicMock(return_value=mock_ad_connection()) 128 129 # Create the user beforehand so we can set attributes and check they aren't removed 130 user = User.objects.create( 131 username="erin.h", 132 attributes={ 133 "ldap_uniq": "S-1-5-21-1955698215-2946288202-2760262721-1114", 134 "foo": "bar", 135 }, 136 ) 137 138 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 139 user_sync = UserLDAPSynchronizer(self.source, Task()) 140 user_sync.sync_full() 141 142 user.refresh_from_db() 143 self.assertEqual(user.name, "Erin M. Hagens") 144 self.assertEqual(user.attributes["foo"], "bar") 145 self.assertTrue(user.is_active) 146 self.assertEqual(user.path, "goauthentik.io/sources/ldap/ak-test") 147 148 deactivated = User.objects.filter(username="deactivated.a").first() 149 self.assertIsNotNone(deactivated) 150 self.assertFalse(deactivated.is_active) 151 152 def test_sync_users_openldap(self): 153 """Test user sync""" 154 self.source.object_uniqueness_field = "uid" 155 self.source.user_property_mappings.set( 156 LDAPSourcePropertyMapping.objects.filter( 157 Q(managed__startswith="goauthentik.io/sources/ldap/default") 158 | Q(managed__startswith="goauthentik.io/sources/ldap/openldap") 159 ) 160 ) 161 connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD)) 162 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 163 user_sync = UserLDAPSynchronizer(self.source, Task()) 164 user_sync.sync_full() 165 self.assertTrue(User.objects.filter(username="user0_sn").exists()) 166 self.assertFalse(User.objects.filter(username="user1_sn").exists()) 167 168 def test_sync_users_freeipa_ish(self): 169 """Test user sync (FreeIPA-ish), mainly testing vendor quirks""" 170 self.source.object_uniqueness_field = "uid" 171 self.source.user_property_mappings.set( 172 LDAPSourcePropertyMapping.objects.filter( 173 Q(managed__startswith="goauthentik.io/sources/ldap/default") 174 | Q(managed__startswith="goauthentik.io/sources/ldap/openldap") 175 ) 176 ) 177 connection = MagicMock(return_value=mock_freeipa_connection(LDAP_PASSWORD)) 178 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 179 user_sync = UserLDAPSynchronizer(self.source, Task()) 180 user_sync.sync_full() 181 self.assertTrue(User.objects.filter(username="user0_sn").exists()) 182 self.assertFalse(User.objects.filter(username="user1_sn").exists()) 183 self.assertFalse(User.objects.get(username="user-nsaccountlock").is_active) 184 185 def test_sync_groups_freeipa_memberOf(self): 186 """Test group sync when membership is derived from memberOf user attribute""" 187 self.source.object_uniqueness_field = "uid" 188 self.source.group_object_filter = "(objectClass=groupOfNames)" 189 self.source.lookup_groups_from_user = True 190 self.source.group_membership_field = "memberOf" 191 self.source.user_property_mappings.set( 192 LDAPSourcePropertyMapping.objects.filter( 193 Q(managed__startswith="goauthentik.io/sources/ldap/default") 194 | Q(managed__startswith="goauthentik.io/sources/ldap/openldap") 195 ) 196 ) 197 self.source.group_property_mappings.set( 198 LDAPSourcePropertyMapping.objects.filter( 199 managed="goauthentik.io/sources/ldap/openldap-cn" 200 ) 201 ) 202 connection = MagicMock(return_value=mock_freeipa_connection(LDAP_PASSWORD)) 203 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 204 user_sync = UserLDAPSynchronizer(self.source, Task()) 205 user_sync.sync_full() 206 group_sync = GroupLDAPSynchronizer(self.source, Task()) 207 group_sync.sync_full() 208 membership_sync = MembershipLDAPSynchronizer(self.source, Task()) 209 membership_sync.sync_full() 210 211 self.assertTrue( 212 User.objects.filter(username="user4_sn").exists(), "User does not exist" 213 ) 214 # Test if membership mapping based on memberOf works. 215 memberof_group = Group.objects.filter(name="reverse-lookup-group") 216 self.assertTrue(memberof_group.exists(), "Group does not exist") 217 self.assertTrue( 218 memberof_group.first().users.filter(username="user4_sn").exists(), 219 "User not a member of the group", 220 ) 221 222 def test_sync_groups_ad(self): 223 """Test group sync""" 224 self.source.base_dn = "dc=t,dc=goauthentik,dc=io" 225 self.source.additional_user_dn = "" 226 self.source.additional_group_dn = "" 227 self.source.save() 228 self.source.user_property_mappings.set( 229 LDAPSourcePropertyMapping.objects.filter( 230 Q(managed__startswith="goauthentik.io/sources/ldap/default") 231 | Q(managed__startswith="goauthentik.io/sources/ldap/ms") 232 ) 233 ) 234 self.source.group_property_mappings.set( 235 LDAPSourcePropertyMapping.objects.filter( 236 managed="goauthentik.io/sources/ldap/default-name" 237 ) 238 ) 239 connection = MagicMock(return_value=mock_ad_connection()) 240 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 241 _user = create_test_admin_user() 242 parent_group = Group.objects.get(name=_user.username) 243 self.source.sync_parent_group = parent_group 244 self.source.save() 245 group_sync = GroupLDAPSynchronizer(self.source, Task()) 246 group_sync.sync_full() 247 membership_sync = MembershipLDAPSynchronizer(self.source, Task()) 248 membership_sync.sync_full() 249 group: Group = Group.objects.filter(name="Test Group").first() 250 self.assertIsNotNone(group) 251 self.assertEqual(group.parents.first(), parent_group) 252 253 def test_sync_groups_openldap(self): 254 """Test group sync""" 255 self.source.object_uniqueness_field = "uid" 256 self.source.group_object_filter = "(objectClass=groupOfNames)" 257 self.source.user_property_mappings.set( 258 LDAPSourcePropertyMapping.objects.filter( 259 Q(managed__startswith="goauthentik.io/sources/ldap/default") 260 | Q(managed__startswith="goauthentik.io/sources/ldap/openldap") 261 ) 262 ) 263 self.source.group_property_mappings.set( 264 LDAPSourcePropertyMapping.objects.filter( 265 managed="goauthentik.io/sources/ldap/openldap-cn" 266 ) 267 ) 268 connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD)) 269 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 270 self.source.save() 271 group_sync = GroupLDAPSynchronizer(self.source, Task()) 272 group_sync.sync_full() 273 membership_sync = MembershipLDAPSynchronizer(self.source, Task()) 274 membership_sync.sync_full() 275 group = Group.objects.filter(name="group1") 276 self.assertTrue(group.exists()) 277 278 def test_sync_groups_openldap_posix_group(self): 279 """Test posix group sync""" 280 self.source.object_uniqueness_field = "cn" 281 self.source.group_membership_field = "memberUid" 282 self.source.user_object_filter = "(objectClass=posixAccount)" 283 self.source.group_object_filter = "(objectClass=posixGroup)" 284 self.source.user_membership_attribute = "uid" 285 self.source.user_property_mappings.set( 286 [ 287 *LDAPSourcePropertyMapping.objects.filter( 288 Q(managed__startswith="goauthentik.io/sources/ldap/default") 289 | Q(managed__startswith="goauthentik.io/sources/ldap/openldap") 290 ).all(), 291 LDAPSourcePropertyMapping.objects.create( 292 name="name", 293 expression='return {"attributes": {"uid": list_flatten(ldap.get("uid"))}}', 294 ), 295 ] 296 ) 297 self.source.group_property_mappings.set( 298 LDAPSourcePropertyMapping.objects.filter( 299 managed="goauthentik.io/sources/ldap/openldap-cn" 300 ) 301 ) 302 connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD)) 303 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 304 self.source.save() 305 user_sync = UserLDAPSynchronizer(self.source, Task()) 306 user_sync.sync_full() 307 group_sync = GroupLDAPSynchronizer(self.source, Task()) 308 group_sync.sync_full() 309 membership_sync = MembershipLDAPSynchronizer(self.source, Task()) 310 membership_sync.sync_full() 311 # Test if membership mapping based on memberUid works. 312 posix_group = Group.objects.filter(name="group-posix").first() 313 self.assertTrue(posix_group.users.filter(name="user-posix").exists()) 314 315 def test_sync_groups_openldap_posix_group_nonstandard_membership_attribute(self): 316 """Test posix group sync""" 317 self.source.object_uniqueness_field = "cn" 318 self.source.group_membership_field = "memberUid" 319 self.source.user_object_filter = "(objectClass=posixAccount)" 320 self.source.group_object_filter = "(objectClass=posixGroup)" 321 self.source.user_membership_attribute = "cn" 322 self.source.user_property_mappings.set( 323 [ 324 *LDAPSourcePropertyMapping.objects.filter( 325 Q(managed__startswith="goauthentik.io/sources/ldap/default") 326 | Q(managed__startswith="goauthentik.io/sources/ldap/openldap") 327 ).all(), 328 LDAPSourcePropertyMapping.objects.create( 329 name="name", 330 expression='return {"attributes": {"cn": list_flatten(ldap.get("cn"))}}', 331 ), 332 ] 333 ) 334 self.source.group_property_mappings.set( 335 LDAPSourcePropertyMapping.objects.filter( 336 managed="goauthentik.io/sources/ldap/openldap-cn" 337 ) 338 ) 339 connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD)) 340 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 341 self.source.save() 342 user_sync = UserLDAPSynchronizer(self.source, Task()) 343 user_sync.sync_full() 344 group_sync = GroupLDAPSynchronizer(self.source, Task()) 345 group_sync.sync_full() 346 membership_sync = MembershipLDAPSynchronizer(self.source, Task()) 347 membership_sync.sync_full() 348 # Test if membership mapping based on memberUid works. 349 posix_group = Group.objects.filter(name="group-posix").first() 350 self.assertTrue(posix_group.users.filter(name="user-posix").exists()) 351 352 def test_tasks_ad(self): 353 """Test Scheduled tasks""" 354 self.source.user_property_mappings.set( 355 LDAPSourcePropertyMapping.objects.filter( 356 Q(managed__startswith="goauthentik.io/sources/ldap/default") 357 | Q(managed__startswith="goauthentik.io/sources/ldap/ms") 358 ) 359 ) 360 self.source.save() 361 connection = MagicMock(return_value=mock_ad_connection()) 362 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 363 ldap_sync.send(self.source.pk) 364 365 def test_tasks_openldap(self): 366 """Test Scheduled tasks""" 367 self.source.object_uniqueness_field = "uid" 368 self.source.group_object_filter = "(objectClass=groupOfNames)" 369 self.source.user_property_mappings.set( 370 LDAPSourcePropertyMapping.objects.filter( 371 Q(managed__startswith="goauthentik.io/sources/ldap/default") 372 | Q(managed__startswith="goauthentik.io/sources/ldap/openldap") 373 ) 374 ) 375 self.source.save() 376 connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD)) 377 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 378 ldap_sync.send(self.source.pk) 379 380 def test_user_deletion(self): 381 """Test user deletion""" 382 user = User.objects.create_user(username="not-in-the-source") 383 UserLDAPSourceConnection.objects.create( 384 user=user, source=self.source, identifier="not-in-the-source" 385 ) 386 self.source.object_uniqueness_field = "uid" 387 self.source.group_object_filter = "(objectClass=groupOfNames)" 388 self.source.delete_not_found_objects = True 389 self.source.save() 390 391 connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD)) 392 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 393 ldap_sync.send(self.source.pk) 394 self.assertFalse(User.objects.filter(username="not-in-the-source").exists()) 395 396 def test_user_deletion_still_in_source(self): 397 """Test that user is not deleted if it's still in the source""" 398 username = user_in_slapd_cn 399 identifier = user_in_slapd_uid 400 user = User.objects.create_user(username=username) 401 UserLDAPSourceConnection.objects.create( 402 user=user, source=self.source, identifier=identifier 403 ) 404 self.source.object_uniqueness_field = "uid" 405 self.source.group_object_filter = "(objectClass=groupOfNames)" 406 self.source.delete_not_found_objects = True 407 self.source.save() 408 409 connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD)) 410 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 411 ldap_sync.send(self.source.pk) 412 self.assertTrue(User.objects.filter(username=username).exists()) 413 414 def test_user_deletion_no_sync(self): 415 """Test that user is not deleted if sync_users is False""" 416 user = User.objects.create_user(username="not-in-the-source") 417 UserLDAPSourceConnection.objects.create( 418 user=user, source=self.source, identifier="not-in-the-source" 419 ) 420 self.source.object_uniqueness_field = "uid" 421 self.source.group_object_filter = "(objectClass=groupOfNames)" 422 self.source.delete_not_found_objects = True 423 self.source.sync_users = False 424 self.source.save() 425 426 connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD)) 427 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 428 ldap_sync.send(self.source.pk) 429 self.assertTrue(User.objects.filter(username="not-in-the-source").exists()) 430 431 def test_user_deletion_no_delete(self): 432 """Test that user is not deleted if delete_not_found_objects is False""" 433 user = User.objects.create_user(username="not-in-the-source") 434 UserLDAPSourceConnection.objects.create( 435 user=user, source=self.source, identifier="not-in-the-source" 436 ) 437 self.source.object_uniqueness_field = "uid" 438 self.source.group_object_filter = "(objectClass=groupOfNames)" 439 self.source.save() 440 441 connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD)) 442 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 443 ldap_sync.send(self.source.pk) 444 self.assertTrue(User.objects.filter(username="not-in-the-source").exists()) 445 446 def test_group_deletion(self): 447 """Test group deletion""" 448 group = Group.objects.create(name="not-in-the-source") 449 GroupLDAPSourceConnection.objects.create( 450 group=group, source=self.source, identifier="not-in-the-source" 451 ) 452 self.source.object_uniqueness_field = "uid" 453 self.source.group_object_filter = "(objectClass=groupOfNames)" 454 self.source.delete_not_found_objects = True 455 self.source.save() 456 457 connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD)) 458 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 459 ldap_sync.send(self.source.pk) 460 self.assertFalse(Group.objects.filter(name="not-in-the-source").exists()) 461 462 def test_group_deletion_still_in_source(self): 463 """Test that group is not deleted if it's still in the source""" 464 groupname = group_in_slapd_cn 465 identifier = group_in_slapd_uid 466 group = Group.objects.create(name=groupname) 467 GroupLDAPSourceConnection.objects.create( 468 group=group, source=self.source, identifier=identifier 469 ) 470 self.source.object_uniqueness_field = "uid" 471 self.source.group_object_filter = "(objectClass=groupOfNames)" 472 self.source.delete_not_found_objects = True 473 self.source.save() 474 475 connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD)) 476 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 477 ldap_sync.send(self.source.pk) 478 self.assertTrue(Group.objects.filter(name=groupname).exists()) 479 480 def test_group_deletion_no_sync(self): 481 """Test that group is not deleted if sync_groups is False""" 482 group = Group.objects.create(name="not-in-the-source") 483 GroupLDAPSourceConnection.objects.create( 484 group=group, source=self.source, identifier="not-in-the-source" 485 ) 486 self.source.object_uniqueness_field = "uid" 487 self.source.group_object_filter = "(objectClass=groupOfNames)" 488 self.source.delete_not_found_objects = True 489 self.source.sync_groups = False 490 self.source.save() 491 492 connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD)) 493 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 494 ldap_sync.send(self.source.pk) 495 self.assertTrue(Group.objects.filter(name="not-in-the-source").exists()) 496 497 def test_group_deletion_no_delete(self): 498 """Test that group is not deleted if delete_not_found_objects is False""" 499 group = Group.objects.create(name="not-in-the-source") 500 GroupLDAPSourceConnection.objects.create( 501 group=group, source=self.source, identifier="not-in-the-source" 502 ) 503 self.source.object_uniqueness_field = "uid" 504 self.source.group_object_filter = "(objectClass=groupOfNames)" 505 self.source.save() 506 507 connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD)) 508 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 509 ldap_sync.send(self.source.pk) 510 self.assertTrue(Group.objects.filter(name="not-in-the-source").exists()) 511 512 def test_batch_deletion(self): 513 """Test batch deletion""" 514 BATCH_SIZE = DELETE_CHUNK_SIZE + 1 515 for i in range(BATCH_SIZE): 516 user = User.objects.create_user(username=f"not-in-the-source-{i}") 517 group = Group.objects.create(name=f"not-in-the-source-{i}") 518 group.users.add(user) 519 UserLDAPSourceConnection.objects.create( 520 user=user, source=self.source, identifier=f"not-in-the-source-{i}-user" 521 ) 522 GroupLDAPSourceConnection.objects.create( 523 group=group, source=self.source, identifier=f"not-in-the-source-{i}-group" 524 ) 525 self.source.object_uniqueness_field = "uid" 526 self.source.group_object_filter = "(objectClass=groupOfNames)" 527 self.source.delete_not_found_objects = True 528 self.source.save() 529 530 connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD)) 531 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 532 ldap_sync.send(self.source.pk) 533 534 self.assertFalse(User.objects.filter(username__startswith="not-in-the-source").exists()) 535 self.assertFalse(Group.objects.filter(name__startswith="not-in-the-source").exists()) 536 537 def test_membership_sync_special_chars_in_group_dn(self): 538 """Test membership synchronization with special characters in group DN""" 539 self.source.object_uniqueness_field = "uid" 540 self.source.group_object_filter = "(objectClass=groupOfNames)" 541 self.source.lookup_groups_from_user = True 542 self.source.group_membership_field = "memberOf" 543 544 # Mock connection with group DN containing special characters 545 mock_conn = MagicMock() 546 547 # Simulate group with special characters in DN: parentheses, backslashes, asterisks 548 special_group_dn = "cn=test(group),ou=groups,dc=example,dc=com" 549 backslash_group_dn = "cn=test\\group,ou=groups,dc=example,dc=com" 550 asterisk_group_dn = "cn=test*group,ou=groups,dc=example,dc=com" 551 552 # Mock the paged_search method that would be called with the filter 553 mock_standard = MagicMock() 554 mock_conn.extend.standard = mock_standard 555 556 # Test case 1: Group DN with parentheses 557 with patch("authentik.sources.ldap.models.LDAPSource.connection", return_value=mock_conn): 558 membership_sync = MembershipLDAPSynchronizer(self.source, Task()) 559 560 # Simulate group data with special characters in DN 561 page_data = [{"dn": special_group_dn}] 562 563 # This should not raise LDAPInvalidFilterError anymore 564 try: 565 membership_sync.sync(page_data) 566 # Verify that the filter was properly escaped 567 # The call should have been made with escaped characters 568 mock_standard.paged_search.assert_called() 569 call_args = mock_standard.paged_search.call_args 570 search_filter = call_args[1]["search_filter"] 571 # The parentheses should be escaped as \28 and \29 572 self.assertIn("\\28", search_filter) # Escaped ( 573 self.assertIn("\\29", search_filter) # Escaped ) 574 except LDAPInvalidFilterError: 575 self.fail("LDAPInvalidFilterError should not be raised with escaped filter") 576 577 # Test case 2: Group DN with backslashes 578 with patch("authentik.sources.ldap.models.LDAPSource.connection", return_value=mock_conn): 579 membership_sync = MembershipLDAPSynchronizer(self.source, Task()) 580 page_data = [{"dn": backslash_group_dn}] 581 582 try: 583 membership_sync.sync(page_data) 584 call_args = mock_standard.paged_search.call_args 585 search_filter = call_args[1]["search_filter"] 586 # The backslash should be escaped as \5c 587 self.assertIn("\\5c", search_filter) # Escaped \ 588 except LDAPInvalidFilterError: 589 self.fail("LDAPInvalidFilterError should not be raised with escaped filter") 590 591 # Test case 3: Group DN with asterisks 592 with patch("authentik.sources.ldap.models.LDAPSource.connection", return_value=mock_conn): 593 membership_sync = MembershipLDAPSynchronizer(self.source, Task()) 594 page_data = [{"dn": asterisk_group_dn}] 595 596 try: 597 membership_sync.sync(page_data) 598 call_args = mock_standard.paged_search.call_args 599 search_filter = call_args[1]["search_filter"] 600 # The asterisk should be escaped as \2a 601 self.assertIn("\\2a", search_filter) # Escaped * 602 except LDAPInvalidFilterError: 603 self.fail("LDAPInvalidFilterError should not be raised with escaped filter") 604 605 def test_escape_filter_chars_function(self): 606 """Test the escape_filter_chars function directly""" 607 608 # Test various special characters that need escaping 609 test_cases = [ 610 ("test(group)", "test\\28group\\29"), # parentheses 611 ("test\\group", "test\\5cgroup"), # backslash 612 ("test*group", "test\\2agroup"), # asterisk 613 ("test(*)group", "test\\28\\2a\\29group"), # multiple special chars 614 ("normalgroup", "normalgroup"), # no special chars 615 ("", ""), # empty string 616 ] 617 618 for input_str, expected in test_cases: 619 with self.subTest(input_str=input_str): 620 result = escape_filter_chars(input_str) 621 self.assertEqual(result, expected)
43class LDAPSyncTests(TestCase): 44 """LDAP Sync tests""" 45 46 @apply_blueprint("system/sources-ldap.yaml") 47 def setUp(self): 48 self.source: LDAPSource = LDAPSource.objects.create( 49 name="ldap", 50 slug="ldap", 51 base_dn="dc=goauthentik,dc=io", 52 additional_user_dn="ou=users", 53 additional_group_dn="ou=groups", 54 ) 55 56 def test_sync_missing_page(self): 57 """Test sync with missing page""" 58 connection = MagicMock(return_value=mock_ad_connection()) 59 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 60 ldap_sync_page.send(self.source.pk, class_to_path(UserLDAPSynchronizer), "foo") 61 62 def test_sync_error(self): 63 """Test user sync""" 64 self.source.base_dn = "dc=t,dc=goauthentik,dc=io" 65 self.source.additional_user_dn = "" 66 self.source.additional_group_dn = "" 67 self.source.save() 68 self.source.user_property_mappings.set( 69 LDAPSourcePropertyMapping.objects.filter( 70 Q(managed__startswith="goauthentik.io/sources/ldap/default") 71 | Q(managed__startswith="goauthentik.io/sources/ldap/ms") 72 ) 73 ) 74 mapping = LDAPSourcePropertyMapping.objects.create( 75 name="name", 76 expression="q", 77 ) 78 self.source.user_property_mappings.set([mapping]) 79 self.source.save() 80 connection = MagicMock(return_value=mock_ad_connection()) 81 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 82 user_sync = UserLDAPSynchronizer(self.source, Task()) 83 with self.assertRaises(StopSync): 84 user_sync.sync_full() 85 self.assertFalse(User.objects.filter(username="user0_sn").exists()) 86 self.assertFalse(User.objects.filter(username="user1_sn").exists()) 87 events = Event.objects.filter( 88 action=EventAction.CONFIGURATION_ERROR, 89 context__message="Failed to evaluate property mapping: 'name'", 90 context__mapping__pk=mapping.pk.hex, 91 ) 92 self.assertTrue(events.exists()) 93 94 def test_sync_mapping(self): 95 """Test property mappings""" 96 none = LDAPSourcePropertyMapping.objects.create( 97 name=generate_id(), expression="return None" 98 ) 99 byte_mapping = LDAPSourcePropertyMapping.objects.create( 100 name=generate_id(), expression="return b''" 101 ) 102 self.source.user_property_mappings.set( 103 LDAPSourcePropertyMapping.objects.filter( 104 Q(managed__startswith="goauthentik.io/sources/ldap/default") 105 | Q(managed__startswith="goauthentik.io/sources/ldap/ms") 106 ) 107 ) 108 self.source.user_property_mappings.add(none, byte_mapping) 109 connection = MagicMock(return_value=mock_ad_connection()) 110 111 # we basically just test that the mappings don't throw errors 112 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 113 user_sync = UserLDAPSynchronizer(self.source, Task()) 114 user_sync.sync_full() 115 116 def test_sync_users_ad(self): 117 """Test user sync""" 118 self.source.base_dn = "dc=t,dc=goauthentik,dc=io" 119 self.source.additional_user_dn = "" 120 self.source.additional_group_dn = "" 121 self.source.save() 122 self.source.user_property_mappings.set( 123 LDAPSourcePropertyMapping.objects.filter( 124 Q(managed__startswith="goauthentik.io/sources/ldap/default") 125 | Q(managed__startswith="goauthentik.io/sources/ldap/ms") 126 ) 127 ) 128 connection = MagicMock(return_value=mock_ad_connection()) 129 130 # Create the user beforehand so we can set attributes and check they aren't removed 131 user = User.objects.create( 132 username="erin.h", 133 attributes={ 134 "ldap_uniq": "S-1-5-21-1955698215-2946288202-2760262721-1114", 135 "foo": "bar", 136 }, 137 ) 138 139 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 140 user_sync = UserLDAPSynchronizer(self.source, Task()) 141 user_sync.sync_full() 142 143 user.refresh_from_db() 144 self.assertEqual(user.name, "Erin M. Hagens") 145 self.assertEqual(user.attributes["foo"], "bar") 146 self.assertTrue(user.is_active) 147 self.assertEqual(user.path, "goauthentik.io/sources/ldap/ak-test") 148 149 deactivated = User.objects.filter(username="deactivated.a").first() 150 self.assertIsNotNone(deactivated) 151 self.assertFalse(deactivated.is_active) 152 153 def test_sync_users_openldap(self): 154 """Test user sync""" 155 self.source.object_uniqueness_field = "uid" 156 self.source.user_property_mappings.set( 157 LDAPSourcePropertyMapping.objects.filter( 158 Q(managed__startswith="goauthentik.io/sources/ldap/default") 159 | Q(managed__startswith="goauthentik.io/sources/ldap/openldap") 160 ) 161 ) 162 connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD)) 163 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 164 user_sync = UserLDAPSynchronizer(self.source, Task()) 165 user_sync.sync_full() 166 self.assertTrue(User.objects.filter(username="user0_sn").exists()) 167 self.assertFalse(User.objects.filter(username="user1_sn").exists()) 168 169 def test_sync_users_freeipa_ish(self): 170 """Test user sync (FreeIPA-ish), mainly testing vendor quirks""" 171 self.source.object_uniqueness_field = "uid" 172 self.source.user_property_mappings.set( 173 LDAPSourcePropertyMapping.objects.filter( 174 Q(managed__startswith="goauthentik.io/sources/ldap/default") 175 | Q(managed__startswith="goauthentik.io/sources/ldap/openldap") 176 ) 177 ) 178 connection = MagicMock(return_value=mock_freeipa_connection(LDAP_PASSWORD)) 179 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 180 user_sync = UserLDAPSynchronizer(self.source, Task()) 181 user_sync.sync_full() 182 self.assertTrue(User.objects.filter(username="user0_sn").exists()) 183 self.assertFalse(User.objects.filter(username="user1_sn").exists()) 184 self.assertFalse(User.objects.get(username="user-nsaccountlock").is_active) 185 186 def test_sync_groups_freeipa_memberOf(self): 187 """Test group sync when membership is derived from memberOf user attribute""" 188 self.source.object_uniqueness_field = "uid" 189 self.source.group_object_filter = "(objectClass=groupOfNames)" 190 self.source.lookup_groups_from_user = True 191 self.source.group_membership_field = "memberOf" 192 self.source.user_property_mappings.set( 193 LDAPSourcePropertyMapping.objects.filter( 194 Q(managed__startswith="goauthentik.io/sources/ldap/default") 195 | Q(managed__startswith="goauthentik.io/sources/ldap/openldap") 196 ) 197 ) 198 self.source.group_property_mappings.set( 199 LDAPSourcePropertyMapping.objects.filter( 200 managed="goauthentik.io/sources/ldap/openldap-cn" 201 ) 202 ) 203 connection = MagicMock(return_value=mock_freeipa_connection(LDAP_PASSWORD)) 204 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 205 user_sync = UserLDAPSynchronizer(self.source, Task()) 206 user_sync.sync_full() 207 group_sync = GroupLDAPSynchronizer(self.source, Task()) 208 group_sync.sync_full() 209 membership_sync = MembershipLDAPSynchronizer(self.source, Task()) 210 membership_sync.sync_full() 211 212 self.assertTrue( 213 User.objects.filter(username="user4_sn").exists(), "User does not exist" 214 ) 215 # Test if membership mapping based on memberOf works. 216 memberof_group = Group.objects.filter(name="reverse-lookup-group") 217 self.assertTrue(memberof_group.exists(), "Group does not exist") 218 self.assertTrue( 219 memberof_group.first().users.filter(username="user4_sn").exists(), 220 "User not a member of the group", 221 ) 222 223 def test_sync_groups_ad(self): 224 """Test group sync""" 225 self.source.base_dn = "dc=t,dc=goauthentik,dc=io" 226 self.source.additional_user_dn = "" 227 self.source.additional_group_dn = "" 228 self.source.save() 229 self.source.user_property_mappings.set( 230 LDAPSourcePropertyMapping.objects.filter( 231 Q(managed__startswith="goauthentik.io/sources/ldap/default") 232 | Q(managed__startswith="goauthentik.io/sources/ldap/ms") 233 ) 234 ) 235 self.source.group_property_mappings.set( 236 LDAPSourcePropertyMapping.objects.filter( 237 managed="goauthentik.io/sources/ldap/default-name" 238 ) 239 ) 240 connection = MagicMock(return_value=mock_ad_connection()) 241 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 242 _user = create_test_admin_user() 243 parent_group = Group.objects.get(name=_user.username) 244 self.source.sync_parent_group = parent_group 245 self.source.save() 246 group_sync = GroupLDAPSynchronizer(self.source, Task()) 247 group_sync.sync_full() 248 membership_sync = MembershipLDAPSynchronizer(self.source, Task()) 249 membership_sync.sync_full() 250 group: Group = Group.objects.filter(name="Test Group").first() 251 self.assertIsNotNone(group) 252 self.assertEqual(group.parents.first(), parent_group) 253 254 def test_sync_groups_openldap(self): 255 """Test group sync""" 256 self.source.object_uniqueness_field = "uid" 257 self.source.group_object_filter = "(objectClass=groupOfNames)" 258 self.source.user_property_mappings.set( 259 LDAPSourcePropertyMapping.objects.filter( 260 Q(managed__startswith="goauthentik.io/sources/ldap/default") 261 | Q(managed__startswith="goauthentik.io/sources/ldap/openldap") 262 ) 263 ) 264 self.source.group_property_mappings.set( 265 LDAPSourcePropertyMapping.objects.filter( 266 managed="goauthentik.io/sources/ldap/openldap-cn" 267 ) 268 ) 269 connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD)) 270 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 271 self.source.save() 272 group_sync = GroupLDAPSynchronizer(self.source, Task()) 273 group_sync.sync_full() 274 membership_sync = MembershipLDAPSynchronizer(self.source, Task()) 275 membership_sync.sync_full() 276 group = Group.objects.filter(name="group1") 277 self.assertTrue(group.exists()) 278 279 def test_sync_groups_openldap_posix_group(self): 280 """Test posix group sync""" 281 self.source.object_uniqueness_field = "cn" 282 self.source.group_membership_field = "memberUid" 283 self.source.user_object_filter = "(objectClass=posixAccount)" 284 self.source.group_object_filter = "(objectClass=posixGroup)" 285 self.source.user_membership_attribute = "uid" 286 self.source.user_property_mappings.set( 287 [ 288 *LDAPSourcePropertyMapping.objects.filter( 289 Q(managed__startswith="goauthentik.io/sources/ldap/default") 290 | Q(managed__startswith="goauthentik.io/sources/ldap/openldap") 291 ).all(), 292 LDAPSourcePropertyMapping.objects.create( 293 name="name", 294 expression='return {"attributes": {"uid": list_flatten(ldap.get("uid"))}}', 295 ), 296 ] 297 ) 298 self.source.group_property_mappings.set( 299 LDAPSourcePropertyMapping.objects.filter( 300 managed="goauthentik.io/sources/ldap/openldap-cn" 301 ) 302 ) 303 connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD)) 304 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 305 self.source.save() 306 user_sync = UserLDAPSynchronizer(self.source, Task()) 307 user_sync.sync_full() 308 group_sync = GroupLDAPSynchronizer(self.source, Task()) 309 group_sync.sync_full() 310 membership_sync = MembershipLDAPSynchronizer(self.source, Task()) 311 membership_sync.sync_full() 312 # Test if membership mapping based on memberUid works. 313 posix_group = Group.objects.filter(name="group-posix").first() 314 self.assertTrue(posix_group.users.filter(name="user-posix").exists()) 315 316 def test_sync_groups_openldap_posix_group_nonstandard_membership_attribute(self): 317 """Test posix group sync""" 318 self.source.object_uniqueness_field = "cn" 319 self.source.group_membership_field = "memberUid" 320 self.source.user_object_filter = "(objectClass=posixAccount)" 321 self.source.group_object_filter = "(objectClass=posixGroup)" 322 self.source.user_membership_attribute = "cn" 323 self.source.user_property_mappings.set( 324 [ 325 *LDAPSourcePropertyMapping.objects.filter( 326 Q(managed__startswith="goauthentik.io/sources/ldap/default") 327 | Q(managed__startswith="goauthentik.io/sources/ldap/openldap") 328 ).all(), 329 LDAPSourcePropertyMapping.objects.create( 330 name="name", 331 expression='return {"attributes": {"cn": list_flatten(ldap.get("cn"))}}', 332 ), 333 ] 334 ) 335 self.source.group_property_mappings.set( 336 LDAPSourcePropertyMapping.objects.filter( 337 managed="goauthentik.io/sources/ldap/openldap-cn" 338 ) 339 ) 340 connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD)) 341 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 342 self.source.save() 343 user_sync = UserLDAPSynchronizer(self.source, Task()) 344 user_sync.sync_full() 345 group_sync = GroupLDAPSynchronizer(self.source, Task()) 346 group_sync.sync_full() 347 membership_sync = MembershipLDAPSynchronizer(self.source, Task()) 348 membership_sync.sync_full() 349 # Test if membership mapping based on memberUid works. 350 posix_group = Group.objects.filter(name="group-posix").first() 351 self.assertTrue(posix_group.users.filter(name="user-posix").exists()) 352 353 def test_tasks_ad(self): 354 """Test Scheduled tasks""" 355 self.source.user_property_mappings.set( 356 LDAPSourcePropertyMapping.objects.filter( 357 Q(managed__startswith="goauthentik.io/sources/ldap/default") 358 | Q(managed__startswith="goauthentik.io/sources/ldap/ms") 359 ) 360 ) 361 self.source.save() 362 connection = MagicMock(return_value=mock_ad_connection()) 363 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 364 ldap_sync.send(self.source.pk) 365 366 def test_tasks_openldap(self): 367 """Test Scheduled tasks""" 368 self.source.object_uniqueness_field = "uid" 369 self.source.group_object_filter = "(objectClass=groupOfNames)" 370 self.source.user_property_mappings.set( 371 LDAPSourcePropertyMapping.objects.filter( 372 Q(managed__startswith="goauthentik.io/sources/ldap/default") 373 | Q(managed__startswith="goauthentik.io/sources/ldap/openldap") 374 ) 375 ) 376 self.source.save() 377 connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD)) 378 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 379 ldap_sync.send(self.source.pk) 380 381 def test_user_deletion(self): 382 """Test user deletion""" 383 user = User.objects.create_user(username="not-in-the-source") 384 UserLDAPSourceConnection.objects.create( 385 user=user, source=self.source, identifier="not-in-the-source" 386 ) 387 self.source.object_uniqueness_field = "uid" 388 self.source.group_object_filter = "(objectClass=groupOfNames)" 389 self.source.delete_not_found_objects = True 390 self.source.save() 391 392 connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD)) 393 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 394 ldap_sync.send(self.source.pk) 395 self.assertFalse(User.objects.filter(username="not-in-the-source").exists()) 396 397 def test_user_deletion_still_in_source(self): 398 """Test that user is not deleted if it's still in the source""" 399 username = user_in_slapd_cn 400 identifier = user_in_slapd_uid 401 user = User.objects.create_user(username=username) 402 UserLDAPSourceConnection.objects.create( 403 user=user, source=self.source, identifier=identifier 404 ) 405 self.source.object_uniqueness_field = "uid" 406 self.source.group_object_filter = "(objectClass=groupOfNames)" 407 self.source.delete_not_found_objects = True 408 self.source.save() 409 410 connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD)) 411 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 412 ldap_sync.send(self.source.pk) 413 self.assertTrue(User.objects.filter(username=username).exists()) 414 415 def test_user_deletion_no_sync(self): 416 """Test that user is not deleted if sync_users is False""" 417 user = User.objects.create_user(username="not-in-the-source") 418 UserLDAPSourceConnection.objects.create( 419 user=user, source=self.source, identifier="not-in-the-source" 420 ) 421 self.source.object_uniqueness_field = "uid" 422 self.source.group_object_filter = "(objectClass=groupOfNames)" 423 self.source.delete_not_found_objects = True 424 self.source.sync_users = False 425 self.source.save() 426 427 connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD)) 428 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 429 ldap_sync.send(self.source.pk) 430 self.assertTrue(User.objects.filter(username="not-in-the-source").exists()) 431 432 def test_user_deletion_no_delete(self): 433 """Test that user is not deleted if delete_not_found_objects is False""" 434 user = User.objects.create_user(username="not-in-the-source") 435 UserLDAPSourceConnection.objects.create( 436 user=user, source=self.source, identifier="not-in-the-source" 437 ) 438 self.source.object_uniqueness_field = "uid" 439 self.source.group_object_filter = "(objectClass=groupOfNames)" 440 self.source.save() 441 442 connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD)) 443 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 444 ldap_sync.send(self.source.pk) 445 self.assertTrue(User.objects.filter(username="not-in-the-source").exists()) 446 447 def test_group_deletion(self): 448 """Test group deletion""" 449 group = Group.objects.create(name="not-in-the-source") 450 GroupLDAPSourceConnection.objects.create( 451 group=group, source=self.source, identifier="not-in-the-source" 452 ) 453 self.source.object_uniqueness_field = "uid" 454 self.source.group_object_filter = "(objectClass=groupOfNames)" 455 self.source.delete_not_found_objects = True 456 self.source.save() 457 458 connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD)) 459 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 460 ldap_sync.send(self.source.pk) 461 self.assertFalse(Group.objects.filter(name="not-in-the-source").exists()) 462 463 def test_group_deletion_still_in_source(self): 464 """Test that group is not deleted if it's still in the source""" 465 groupname = group_in_slapd_cn 466 identifier = group_in_slapd_uid 467 group = Group.objects.create(name=groupname) 468 GroupLDAPSourceConnection.objects.create( 469 group=group, source=self.source, identifier=identifier 470 ) 471 self.source.object_uniqueness_field = "uid" 472 self.source.group_object_filter = "(objectClass=groupOfNames)" 473 self.source.delete_not_found_objects = True 474 self.source.save() 475 476 connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD)) 477 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 478 ldap_sync.send(self.source.pk) 479 self.assertTrue(Group.objects.filter(name=groupname).exists()) 480 481 def test_group_deletion_no_sync(self): 482 """Test that group is not deleted if sync_groups is False""" 483 group = Group.objects.create(name="not-in-the-source") 484 GroupLDAPSourceConnection.objects.create( 485 group=group, source=self.source, identifier="not-in-the-source" 486 ) 487 self.source.object_uniqueness_field = "uid" 488 self.source.group_object_filter = "(objectClass=groupOfNames)" 489 self.source.delete_not_found_objects = True 490 self.source.sync_groups = False 491 self.source.save() 492 493 connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD)) 494 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 495 ldap_sync.send(self.source.pk) 496 self.assertTrue(Group.objects.filter(name="not-in-the-source").exists()) 497 498 def test_group_deletion_no_delete(self): 499 """Test that group is not deleted if delete_not_found_objects is False""" 500 group = Group.objects.create(name="not-in-the-source") 501 GroupLDAPSourceConnection.objects.create( 502 group=group, source=self.source, identifier="not-in-the-source" 503 ) 504 self.source.object_uniqueness_field = "uid" 505 self.source.group_object_filter = "(objectClass=groupOfNames)" 506 self.source.save() 507 508 connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD)) 509 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 510 ldap_sync.send(self.source.pk) 511 self.assertTrue(Group.objects.filter(name="not-in-the-source").exists()) 512 513 def test_batch_deletion(self): 514 """Test batch deletion""" 515 BATCH_SIZE = DELETE_CHUNK_SIZE + 1 516 for i in range(BATCH_SIZE): 517 user = User.objects.create_user(username=f"not-in-the-source-{i}") 518 group = Group.objects.create(name=f"not-in-the-source-{i}") 519 group.users.add(user) 520 UserLDAPSourceConnection.objects.create( 521 user=user, source=self.source, identifier=f"not-in-the-source-{i}-user" 522 ) 523 GroupLDAPSourceConnection.objects.create( 524 group=group, source=self.source, identifier=f"not-in-the-source-{i}-group" 525 ) 526 self.source.object_uniqueness_field = "uid" 527 self.source.group_object_filter = "(objectClass=groupOfNames)" 528 self.source.delete_not_found_objects = True 529 self.source.save() 530 531 connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD)) 532 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 533 ldap_sync.send(self.source.pk) 534 535 self.assertFalse(User.objects.filter(username__startswith="not-in-the-source").exists()) 536 self.assertFalse(Group.objects.filter(name__startswith="not-in-the-source").exists()) 537 538 def test_membership_sync_special_chars_in_group_dn(self): 539 """Test membership synchronization with special characters in group DN""" 540 self.source.object_uniqueness_field = "uid" 541 self.source.group_object_filter = "(objectClass=groupOfNames)" 542 self.source.lookup_groups_from_user = True 543 self.source.group_membership_field = "memberOf" 544 545 # Mock connection with group DN containing special characters 546 mock_conn = MagicMock() 547 548 # Simulate group with special characters in DN: parentheses, backslashes, asterisks 549 special_group_dn = "cn=test(group),ou=groups,dc=example,dc=com" 550 backslash_group_dn = "cn=test\\group,ou=groups,dc=example,dc=com" 551 asterisk_group_dn = "cn=test*group,ou=groups,dc=example,dc=com" 552 553 # Mock the paged_search method that would be called with the filter 554 mock_standard = MagicMock() 555 mock_conn.extend.standard = mock_standard 556 557 # Test case 1: Group DN with parentheses 558 with patch("authentik.sources.ldap.models.LDAPSource.connection", return_value=mock_conn): 559 membership_sync = MembershipLDAPSynchronizer(self.source, Task()) 560 561 # Simulate group data with special characters in DN 562 page_data = [{"dn": special_group_dn}] 563 564 # This should not raise LDAPInvalidFilterError anymore 565 try: 566 membership_sync.sync(page_data) 567 # Verify that the filter was properly escaped 568 # The call should have been made with escaped characters 569 mock_standard.paged_search.assert_called() 570 call_args = mock_standard.paged_search.call_args 571 search_filter = call_args[1]["search_filter"] 572 # The parentheses should be escaped as \28 and \29 573 self.assertIn("\\28", search_filter) # Escaped ( 574 self.assertIn("\\29", search_filter) # Escaped ) 575 except LDAPInvalidFilterError: 576 self.fail("LDAPInvalidFilterError should not be raised with escaped filter") 577 578 # Test case 2: Group DN with backslashes 579 with patch("authentik.sources.ldap.models.LDAPSource.connection", return_value=mock_conn): 580 membership_sync = MembershipLDAPSynchronizer(self.source, Task()) 581 page_data = [{"dn": backslash_group_dn}] 582 583 try: 584 membership_sync.sync(page_data) 585 call_args = mock_standard.paged_search.call_args 586 search_filter = call_args[1]["search_filter"] 587 # The backslash should be escaped as \5c 588 self.assertIn("\\5c", search_filter) # Escaped \ 589 except LDAPInvalidFilterError: 590 self.fail("LDAPInvalidFilterError should not be raised with escaped filter") 591 592 # Test case 3: Group DN with asterisks 593 with patch("authentik.sources.ldap.models.LDAPSource.connection", return_value=mock_conn): 594 membership_sync = MembershipLDAPSynchronizer(self.source, Task()) 595 page_data = [{"dn": asterisk_group_dn}] 596 597 try: 598 membership_sync.sync(page_data) 599 call_args = mock_standard.paged_search.call_args 600 search_filter = call_args[1]["search_filter"] 601 # The asterisk should be escaped as \2a 602 self.assertIn("\\2a", search_filter) # Escaped * 603 except LDAPInvalidFilterError: 604 self.fail("LDAPInvalidFilterError should not be raised with escaped filter") 605 606 def test_escape_filter_chars_function(self): 607 """Test the escape_filter_chars function directly""" 608 609 # Test various special characters that need escaping 610 test_cases = [ 611 ("test(group)", "test\\28group\\29"), # parentheses 612 ("test\\group", "test\\5cgroup"), # backslash 613 ("test*group", "test\\2agroup"), # asterisk 614 ("test(*)group", "test\\28\\2a\\29group"), # multiple special chars 615 ("normalgroup", "normalgroup"), # no special chars 616 ("", ""), # empty string 617 ] 618 619 for input_str, expected in test_cases: 620 with self.subTest(input_str=input_str): 621 result = escape_filter_chars(input_str) 622 self.assertEqual(result, expected)
LDAP Sync tests
46 @apply_blueprint("system/sources-ldap.yaml") 47 def setUp(self): 48 self.source: LDAPSource = LDAPSource.objects.create( 49 name="ldap", 50 slug="ldap", 51 base_dn="dc=goauthentik,dc=io", 52 additional_user_dn="ou=users", 53 additional_group_dn="ou=groups", 54 )
Hook method for setting up the test fixture before exercising it.
56 def test_sync_missing_page(self): 57 """Test sync with missing page""" 58 connection = MagicMock(return_value=mock_ad_connection()) 59 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 60 ldap_sync_page.send(self.source.pk, class_to_path(UserLDAPSynchronizer), "foo")
Test sync with missing page
62 def test_sync_error(self): 63 """Test user sync""" 64 self.source.base_dn = "dc=t,dc=goauthentik,dc=io" 65 self.source.additional_user_dn = "" 66 self.source.additional_group_dn = "" 67 self.source.save() 68 self.source.user_property_mappings.set( 69 LDAPSourcePropertyMapping.objects.filter( 70 Q(managed__startswith="goauthentik.io/sources/ldap/default") 71 | Q(managed__startswith="goauthentik.io/sources/ldap/ms") 72 ) 73 ) 74 mapping = LDAPSourcePropertyMapping.objects.create( 75 name="name", 76 expression="q", 77 ) 78 self.source.user_property_mappings.set([mapping]) 79 self.source.save() 80 connection = MagicMock(return_value=mock_ad_connection()) 81 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 82 user_sync = UserLDAPSynchronizer(self.source, Task()) 83 with self.assertRaises(StopSync): 84 user_sync.sync_full() 85 self.assertFalse(User.objects.filter(username="user0_sn").exists()) 86 self.assertFalse(User.objects.filter(username="user1_sn").exists()) 87 events = Event.objects.filter( 88 action=EventAction.CONFIGURATION_ERROR, 89 context__message="Failed to evaluate property mapping: 'name'", 90 context__mapping__pk=mapping.pk.hex, 91 ) 92 self.assertTrue(events.exists())
Test user sync
94 def test_sync_mapping(self): 95 """Test property mappings""" 96 none = LDAPSourcePropertyMapping.objects.create( 97 name=generate_id(), expression="return None" 98 ) 99 byte_mapping = LDAPSourcePropertyMapping.objects.create( 100 name=generate_id(), expression="return b''" 101 ) 102 self.source.user_property_mappings.set( 103 LDAPSourcePropertyMapping.objects.filter( 104 Q(managed__startswith="goauthentik.io/sources/ldap/default") 105 | Q(managed__startswith="goauthentik.io/sources/ldap/ms") 106 ) 107 ) 108 self.source.user_property_mappings.add(none, byte_mapping) 109 connection = MagicMock(return_value=mock_ad_connection()) 110 111 # we basically just test that the mappings don't throw errors 112 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 113 user_sync = UserLDAPSynchronizer(self.source, Task()) 114 user_sync.sync_full()
Test property mappings
116 def test_sync_users_ad(self): 117 """Test user sync""" 118 self.source.base_dn = "dc=t,dc=goauthentik,dc=io" 119 self.source.additional_user_dn = "" 120 self.source.additional_group_dn = "" 121 self.source.save() 122 self.source.user_property_mappings.set( 123 LDAPSourcePropertyMapping.objects.filter( 124 Q(managed__startswith="goauthentik.io/sources/ldap/default") 125 | Q(managed__startswith="goauthentik.io/sources/ldap/ms") 126 ) 127 ) 128 connection = MagicMock(return_value=mock_ad_connection()) 129 130 # Create the user beforehand so we can set attributes and check they aren't removed 131 user = User.objects.create( 132 username="erin.h", 133 attributes={ 134 "ldap_uniq": "S-1-5-21-1955698215-2946288202-2760262721-1114", 135 "foo": "bar", 136 }, 137 ) 138 139 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 140 user_sync = UserLDAPSynchronizer(self.source, Task()) 141 user_sync.sync_full() 142 143 user.refresh_from_db() 144 self.assertEqual(user.name, "Erin M. Hagens") 145 self.assertEqual(user.attributes["foo"], "bar") 146 self.assertTrue(user.is_active) 147 self.assertEqual(user.path, "goauthentik.io/sources/ldap/ak-test") 148 149 deactivated = User.objects.filter(username="deactivated.a").first() 150 self.assertIsNotNone(deactivated) 151 self.assertFalse(deactivated.is_active)
Test user sync
153 def test_sync_users_openldap(self): 154 """Test user sync""" 155 self.source.object_uniqueness_field = "uid" 156 self.source.user_property_mappings.set( 157 LDAPSourcePropertyMapping.objects.filter( 158 Q(managed__startswith="goauthentik.io/sources/ldap/default") 159 | Q(managed__startswith="goauthentik.io/sources/ldap/openldap") 160 ) 161 ) 162 connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD)) 163 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 164 user_sync = UserLDAPSynchronizer(self.source, Task()) 165 user_sync.sync_full() 166 self.assertTrue(User.objects.filter(username="user0_sn").exists()) 167 self.assertFalse(User.objects.filter(username="user1_sn").exists())
Test user sync
169 def test_sync_users_freeipa_ish(self): 170 """Test user sync (FreeIPA-ish), mainly testing vendor quirks""" 171 self.source.object_uniqueness_field = "uid" 172 self.source.user_property_mappings.set( 173 LDAPSourcePropertyMapping.objects.filter( 174 Q(managed__startswith="goauthentik.io/sources/ldap/default") 175 | Q(managed__startswith="goauthentik.io/sources/ldap/openldap") 176 ) 177 ) 178 connection = MagicMock(return_value=mock_freeipa_connection(LDAP_PASSWORD)) 179 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 180 user_sync = UserLDAPSynchronizer(self.source, Task()) 181 user_sync.sync_full() 182 self.assertTrue(User.objects.filter(username="user0_sn").exists()) 183 self.assertFalse(User.objects.filter(username="user1_sn").exists()) 184 self.assertFalse(User.objects.get(username="user-nsaccountlock").is_active)
Test user sync (FreeIPA-ish), mainly testing vendor quirks
186 def test_sync_groups_freeipa_memberOf(self): 187 """Test group sync when membership is derived from memberOf user attribute""" 188 self.source.object_uniqueness_field = "uid" 189 self.source.group_object_filter = "(objectClass=groupOfNames)" 190 self.source.lookup_groups_from_user = True 191 self.source.group_membership_field = "memberOf" 192 self.source.user_property_mappings.set( 193 LDAPSourcePropertyMapping.objects.filter( 194 Q(managed__startswith="goauthentik.io/sources/ldap/default") 195 | Q(managed__startswith="goauthentik.io/sources/ldap/openldap") 196 ) 197 ) 198 self.source.group_property_mappings.set( 199 LDAPSourcePropertyMapping.objects.filter( 200 managed="goauthentik.io/sources/ldap/openldap-cn" 201 ) 202 ) 203 connection = MagicMock(return_value=mock_freeipa_connection(LDAP_PASSWORD)) 204 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 205 user_sync = UserLDAPSynchronizer(self.source, Task()) 206 user_sync.sync_full() 207 group_sync = GroupLDAPSynchronizer(self.source, Task()) 208 group_sync.sync_full() 209 membership_sync = MembershipLDAPSynchronizer(self.source, Task()) 210 membership_sync.sync_full() 211 212 self.assertTrue( 213 User.objects.filter(username="user4_sn").exists(), "User does not exist" 214 ) 215 # Test if membership mapping based on memberOf works. 216 memberof_group = Group.objects.filter(name="reverse-lookup-group") 217 self.assertTrue(memberof_group.exists(), "Group does not exist") 218 self.assertTrue( 219 memberof_group.first().users.filter(username="user4_sn").exists(), 220 "User not a member of the group", 221 )
Test group sync when membership is derived from memberOf user attribute
223 def test_sync_groups_ad(self): 224 """Test group sync""" 225 self.source.base_dn = "dc=t,dc=goauthentik,dc=io" 226 self.source.additional_user_dn = "" 227 self.source.additional_group_dn = "" 228 self.source.save() 229 self.source.user_property_mappings.set( 230 LDAPSourcePropertyMapping.objects.filter( 231 Q(managed__startswith="goauthentik.io/sources/ldap/default") 232 | Q(managed__startswith="goauthentik.io/sources/ldap/ms") 233 ) 234 ) 235 self.source.group_property_mappings.set( 236 LDAPSourcePropertyMapping.objects.filter( 237 managed="goauthentik.io/sources/ldap/default-name" 238 ) 239 ) 240 connection = MagicMock(return_value=mock_ad_connection()) 241 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 242 _user = create_test_admin_user() 243 parent_group = Group.objects.get(name=_user.username) 244 self.source.sync_parent_group = parent_group 245 self.source.save() 246 group_sync = GroupLDAPSynchronizer(self.source, Task()) 247 group_sync.sync_full() 248 membership_sync = MembershipLDAPSynchronizer(self.source, Task()) 249 membership_sync.sync_full() 250 group: Group = Group.objects.filter(name="Test Group").first() 251 self.assertIsNotNone(group) 252 self.assertEqual(group.parents.first(), parent_group)
Test group sync
254 def test_sync_groups_openldap(self): 255 """Test group sync""" 256 self.source.object_uniqueness_field = "uid" 257 self.source.group_object_filter = "(objectClass=groupOfNames)" 258 self.source.user_property_mappings.set( 259 LDAPSourcePropertyMapping.objects.filter( 260 Q(managed__startswith="goauthentik.io/sources/ldap/default") 261 | Q(managed__startswith="goauthentik.io/sources/ldap/openldap") 262 ) 263 ) 264 self.source.group_property_mappings.set( 265 LDAPSourcePropertyMapping.objects.filter( 266 managed="goauthentik.io/sources/ldap/openldap-cn" 267 ) 268 ) 269 connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD)) 270 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 271 self.source.save() 272 group_sync = GroupLDAPSynchronizer(self.source, Task()) 273 group_sync.sync_full() 274 membership_sync = MembershipLDAPSynchronizer(self.source, Task()) 275 membership_sync.sync_full() 276 group = Group.objects.filter(name="group1") 277 self.assertTrue(group.exists())
Test group sync
279 def test_sync_groups_openldap_posix_group(self): 280 """Test posix group sync""" 281 self.source.object_uniqueness_field = "cn" 282 self.source.group_membership_field = "memberUid" 283 self.source.user_object_filter = "(objectClass=posixAccount)" 284 self.source.group_object_filter = "(objectClass=posixGroup)" 285 self.source.user_membership_attribute = "uid" 286 self.source.user_property_mappings.set( 287 [ 288 *LDAPSourcePropertyMapping.objects.filter( 289 Q(managed__startswith="goauthentik.io/sources/ldap/default") 290 | Q(managed__startswith="goauthentik.io/sources/ldap/openldap") 291 ).all(), 292 LDAPSourcePropertyMapping.objects.create( 293 name="name", 294 expression='return {"attributes": {"uid": list_flatten(ldap.get("uid"))}}', 295 ), 296 ] 297 ) 298 self.source.group_property_mappings.set( 299 LDAPSourcePropertyMapping.objects.filter( 300 managed="goauthentik.io/sources/ldap/openldap-cn" 301 ) 302 ) 303 connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD)) 304 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 305 self.source.save() 306 user_sync = UserLDAPSynchronizer(self.source, Task()) 307 user_sync.sync_full() 308 group_sync = GroupLDAPSynchronizer(self.source, Task()) 309 group_sync.sync_full() 310 membership_sync = MembershipLDAPSynchronizer(self.source, Task()) 311 membership_sync.sync_full() 312 # Test if membership mapping based on memberUid works. 313 posix_group = Group.objects.filter(name="group-posix").first() 314 self.assertTrue(posix_group.users.filter(name="user-posix").exists())
Test posix group sync
316 def test_sync_groups_openldap_posix_group_nonstandard_membership_attribute(self): 317 """Test posix group sync""" 318 self.source.object_uniqueness_field = "cn" 319 self.source.group_membership_field = "memberUid" 320 self.source.user_object_filter = "(objectClass=posixAccount)" 321 self.source.group_object_filter = "(objectClass=posixGroup)" 322 self.source.user_membership_attribute = "cn" 323 self.source.user_property_mappings.set( 324 [ 325 *LDAPSourcePropertyMapping.objects.filter( 326 Q(managed__startswith="goauthentik.io/sources/ldap/default") 327 | Q(managed__startswith="goauthentik.io/sources/ldap/openldap") 328 ).all(), 329 LDAPSourcePropertyMapping.objects.create( 330 name="name", 331 expression='return {"attributes": {"cn": list_flatten(ldap.get("cn"))}}', 332 ), 333 ] 334 ) 335 self.source.group_property_mappings.set( 336 LDAPSourcePropertyMapping.objects.filter( 337 managed="goauthentik.io/sources/ldap/openldap-cn" 338 ) 339 ) 340 connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD)) 341 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 342 self.source.save() 343 user_sync = UserLDAPSynchronizer(self.source, Task()) 344 user_sync.sync_full() 345 group_sync = GroupLDAPSynchronizer(self.source, Task()) 346 group_sync.sync_full() 347 membership_sync = MembershipLDAPSynchronizer(self.source, Task()) 348 membership_sync.sync_full() 349 # Test if membership mapping based on memberUid works. 350 posix_group = Group.objects.filter(name="group-posix").first() 351 self.assertTrue(posix_group.users.filter(name="user-posix").exists())
Test posix group sync
353 def test_tasks_ad(self): 354 """Test Scheduled tasks""" 355 self.source.user_property_mappings.set( 356 LDAPSourcePropertyMapping.objects.filter( 357 Q(managed__startswith="goauthentik.io/sources/ldap/default") 358 | Q(managed__startswith="goauthentik.io/sources/ldap/ms") 359 ) 360 ) 361 self.source.save() 362 connection = MagicMock(return_value=mock_ad_connection()) 363 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 364 ldap_sync.send(self.source.pk)
Test Scheduled tasks
366 def test_tasks_openldap(self): 367 """Test Scheduled tasks""" 368 self.source.object_uniqueness_field = "uid" 369 self.source.group_object_filter = "(objectClass=groupOfNames)" 370 self.source.user_property_mappings.set( 371 LDAPSourcePropertyMapping.objects.filter( 372 Q(managed__startswith="goauthentik.io/sources/ldap/default") 373 | Q(managed__startswith="goauthentik.io/sources/ldap/openldap") 374 ) 375 ) 376 self.source.save() 377 connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD)) 378 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 379 ldap_sync.send(self.source.pk)
Test Scheduled tasks
381 def test_user_deletion(self): 382 """Test user deletion""" 383 user = User.objects.create_user(username="not-in-the-source") 384 UserLDAPSourceConnection.objects.create( 385 user=user, source=self.source, identifier="not-in-the-source" 386 ) 387 self.source.object_uniqueness_field = "uid" 388 self.source.group_object_filter = "(objectClass=groupOfNames)" 389 self.source.delete_not_found_objects = True 390 self.source.save() 391 392 connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD)) 393 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 394 ldap_sync.send(self.source.pk) 395 self.assertFalse(User.objects.filter(username="not-in-the-source").exists())
Test user deletion
397 def test_user_deletion_still_in_source(self): 398 """Test that user is not deleted if it's still in the source""" 399 username = user_in_slapd_cn 400 identifier = user_in_slapd_uid 401 user = User.objects.create_user(username=username) 402 UserLDAPSourceConnection.objects.create( 403 user=user, source=self.source, identifier=identifier 404 ) 405 self.source.object_uniqueness_field = "uid" 406 self.source.group_object_filter = "(objectClass=groupOfNames)" 407 self.source.delete_not_found_objects = True 408 self.source.save() 409 410 connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD)) 411 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 412 ldap_sync.send(self.source.pk) 413 self.assertTrue(User.objects.filter(username=username).exists())
Test that user is not deleted if it's still in the source
415 def test_user_deletion_no_sync(self): 416 """Test that user is not deleted if sync_users is False""" 417 user = User.objects.create_user(username="not-in-the-source") 418 UserLDAPSourceConnection.objects.create( 419 user=user, source=self.source, identifier="not-in-the-source" 420 ) 421 self.source.object_uniqueness_field = "uid" 422 self.source.group_object_filter = "(objectClass=groupOfNames)" 423 self.source.delete_not_found_objects = True 424 self.source.sync_users = False 425 self.source.save() 426 427 connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD)) 428 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 429 ldap_sync.send(self.source.pk) 430 self.assertTrue(User.objects.filter(username="not-in-the-source").exists())
Test that user is not deleted if sync_users is False
432 def test_user_deletion_no_delete(self): 433 """Test that user is not deleted if delete_not_found_objects is False""" 434 user = User.objects.create_user(username="not-in-the-source") 435 UserLDAPSourceConnection.objects.create( 436 user=user, source=self.source, identifier="not-in-the-source" 437 ) 438 self.source.object_uniqueness_field = "uid" 439 self.source.group_object_filter = "(objectClass=groupOfNames)" 440 self.source.save() 441 442 connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD)) 443 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 444 ldap_sync.send(self.source.pk) 445 self.assertTrue(User.objects.filter(username="not-in-the-source").exists())
Test that user is not deleted if delete_not_found_objects is False
447 def test_group_deletion(self): 448 """Test group deletion""" 449 group = Group.objects.create(name="not-in-the-source") 450 GroupLDAPSourceConnection.objects.create( 451 group=group, source=self.source, identifier="not-in-the-source" 452 ) 453 self.source.object_uniqueness_field = "uid" 454 self.source.group_object_filter = "(objectClass=groupOfNames)" 455 self.source.delete_not_found_objects = True 456 self.source.save() 457 458 connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD)) 459 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 460 ldap_sync.send(self.source.pk) 461 self.assertFalse(Group.objects.filter(name="not-in-the-source").exists())
Test group deletion
463 def test_group_deletion_still_in_source(self): 464 """Test that group is not deleted if it's still in the source""" 465 groupname = group_in_slapd_cn 466 identifier = group_in_slapd_uid 467 group = Group.objects.create(name=groupname) 468 GroupLDAPSourceConnection.objects.create( 469 group=group, source=self.source, identifier=identifier 470 ) 471 self.source.object_uniqueness_field = "uid" 472 self.source.group_object_filter = "(objectClass=groupOfNames)" 473 self.source.delete_not_found_objects = True 474 self.source.save() 475 476 connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD)) 477 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 478 ldap_sync.send(self.source.pk) 479 self.assertTrue(Group.objects.filter(name=groupname).exists())
Test that group is not deleted if it's still in the source
481 def test_group_deletion_no_sync(self): 482 """Test that group is not deleted if sync_groups is False""" 483 group = Group.objects.create(name="not-in-the-source") 484 GroupLDAPSourceConnection.objects.create( 485 group=group, source=self.source, identifier="not-in-the-source" 486 ) 487 self.source.object_uniqueness_field = "uid" 488 self.source.group_object_filter = "(objectClass=groupOfNames)" 489 self.source.delete_not_found_objects = True 490 self.source.sync_groups = False 491 self.source.save() 492 493 connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD)) 494 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 495 ldap_sync.send(self.source.pk) 496 self.assertTrue(Group.objects.filter(name="not-in-the-source").exists())
Test that group is not deleted if sync_groups is False
498 def test_group_deletion_no_delete(self): 499 """Test that group is not deleted if delete_not_found_objects is False""" 500 group = Group.objects.create(name="not-in-the-source") 501 GroupLDAPSourceConnection.objects.create( 502 group=group, source=self.source, identifier="not-in-the-source" 503 ) 504 self.source.object_uniqueness_field = "uid" 505 self.source.group_object_filter = "(objectClass=groupOfNames)" 506 self.source.save() 507 508 connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD)) 509 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 510 ldap_sync.send(self.source.pk) 511 self.assertTrue(Group.objects.filter(name="not-in-the-source").exists())
Test that group is not deleted if delete_not_found_objects is False
513 def test_batch_deletion(self): 514 """Test batch deletion""" 515 BATCH_SIZE = DELETE_CHUNK_SIZE + 1 516 for i in range(BATCH_SIZE): 517 user = User.objects.create_user(username=f"not-in-the-source-{i}") 518 group = Group.objects.create(name=f"not-in-the-source-{i}") 519 group.users.add(user) 520 UserLDAPSourceConnection.objects.create( 521 user=user, source=self.source, identifier=f"not-in-the-source-{i}-user" 522 ) 523 GroupLDAPSourceConnection.objects.create( 524 group=group, source=self.source, identifier=f"not-in-the-source-{i}-group" 525 ) 526 self.source.object_uniqueness_field = "uid" 527 self.source.group_object_filter = "(objectClass=groupOfNames)" 528 self.source.delete_not_found_objects = True 529 self.source.save() 530 531 connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD)) 532 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 533 ldap_sync.send(self.source.pk) 534 535 self.assertFalse(User.objects.filter(username__startswith="not-in-the-source").exists()) 536 self.assertFalse(Group.objects.filter(name__startswith="not-in-the-source").exists())
Test batch deletion
538 def test_membership_sync_special_chars_in_group_dn(self): 539 """Test membership synchronization with special characters in group DN""" 540 self.source.object_uniqueness_field = "uid" 541 self.source.group_object_filter = "(objectClass=groupOfNames)" 542 self.source.lookup_groups_from_user = True 543 self.source.group_membership_field = "memberOf" 544 545 # Mock connection with group DN containing special characters 546 mock_conn = MagicMock() 547 548 # Simulate group with special characters in DN: parentheses, backslashes, asterisks 549 special_group_dn = "cn=test(group),ou=groups,dc=example,dc=com" 550 backslash_group_dn = "cn=test\\group,ou=groups,dc=example,dc=com" 551 asterisk_group_dn = "cn=test*group,ou=groups,dc=example,dc=com" 552 553 # Mock the paged_search method that would be called with the filter 554 mock_standard = MagicMock() 555 mock_conn.extend.standard = mock_standard 556 557 # Test case 1: Group DN with parentheses 558 with patch("authentik.sources.ldap.models.LDAPSource.connection", return_value=mock_conn): 559 membership_sync = MembershipLDAPSynchronizer(self.source, Task()) 560 561 # Simulate group data with special characters in DN 562 page_data = [{"dn": special_group_dn}] 563 564 # This should not raise LDAPInvalidFilterError anymore 565 try: 566 membership_sync.sync(page_data) 567 # Verify that the filter was properly escaped 568 # The call should have been made with escaped characters 569 mock_standard.paged_search.assert_called() 570 call_args = mock_standard.paged_search.call_args 571 search_filter = call_args[1]["search_filter"] 572 # The parentheses should be escaped as \28 and \29 573 self.assertIn("\\28", search_filter) # Escaped ( 574 self.assertIn("\\29", search_filter) # Escaped ) 575 except LDAPInvalidFilterError: 576 self.fail("LDAPInvalidFilterError should not be raised with escaped filter") 577 578 # Test case 2: Group DN with backslashes 579 with patch("authentik.sources.ldap.models.LDAPSource.connection", return_value=mock_conn): 580 membership_sync = MembershipLDAPSynchronizer(self.source, Task()) 581 page_data = [{"dn": backslash_group_dn}] 582 583 try: 584 membership_sync.sync(page_data) 585 call_args = mock_standard.paged_search.call_args 586 search_filter = call_args[1]["search_filter"] 587 # The backslash should be escaped as \5c 588 self.assertIn("\\5c", search_filter) # Escaped \ 589 except LDAPInvalidFilterError: 590 self.fail("LDAPInvalidFilterError should not be raised with escaped filter") 591 592 # Test case 3: Group DN with asterisks 593 with patch("authentik.sources.ldap.models.LDAPSource.connection", return_value=mock_conn): 594 membership_sync = MembershipLDAPSynchronizer(self.source, Task()) 595 page_data = [{"dn": asterisk_group_dn}] 596 597 try: 598 membership_sync.sync(page_data) 599 call_args = mock_standard.paged_search.call_args 600 search_filter = call_args[1]["search_filter"] 601 # The asterisk should be escaped as \2a 602 self.assertIn("\\2a", search_filter) # Escaped * 603 except LDAPInvalidFilterError: 604 self.fail("LDAPInvalidFilterError should not be raised with escaped filter")
Test membership synchronization with special characters in group DN
606 def test_escape_filter_chars_function(self): 607 """Test the escape_filter_chars function directly""" 608 609 # Test various special characters that need escaping 610 test_cases = [ 611 ("test(group)", "test\\28group\\29"), # parentheses 612 ("test\\group", "test\\5cgroup"), # backslash 613 ("test*group", "test\\2agroup"), # asterisk 614 ("test(*)group", "test\\28\\2a\\29group"), # multiple special chars 615 ("normalgroup", "normalgroup"), # no special chars 616 ("", ""), # empty string 617 ] 618 619 for input_str, expected in test_cases: 620 with self.subTest(input_str=input_str): 621 result = escape_filter_chars(input_str) 622 self.assertEqual(result, expected)
Test the escape_filter_chars function directly