authentik.sources.ldap.tests.test_sync
LDAP Source tests
1"""LDAP Source tests""" 2 3from unittest.mock import MagicMock, patch 4 5from django.db.models import Q 6from django.test import TestCase 7from ldap3.core.exceptions import LDAPInvalidFilterError 8from ldap3.utils.conv import escape_filter_chars 9 10from authentik.blueprints.tests import apply_blueprint 11from authentik.core.models import Group, User 12from authentik.core.tests.utils import create_test_admin_user 13from authentik.events.models import Event, EventAction 14from authentik.lib.generators import generate_id, generate_key 15from authentik.lib.sync.outgoing.exceptions import StopSync 16from authentik.lib.utils.reflection import class_to_path 17from authentik.sources.ldap.models import ( 18 GroupLDAPSourceConnection, 19 LDAPSource, 20 LDAPSourcePropertyMapping, 21 UserLDAPSourceConnection, 22) 23from authentik.sources.ldap.sync.forward_delete_users import DELETE_CHUNK_SIZE 24from authentik.sources.ldap.sync.groups import GroupLDAPSynchronizer 25from authentik.sources.ldap.sync.membership import MembershipLDAPSynchronizer 26from authentik.sources.ldap.sync.users import UserLDAPSynchronizer 27from authentik.sources.ldap.tasks import ldap_sync, ldap_sync_page 28from authentik.sources.ldap.tests.mock_ad import mock_ad_connection 29from authentik.sources.ldap.tests.mock_freeipa import mock_freeipa_connection 30from authentik.sources.ldap.tests.mock_slapd import ( 31 group_in_slapd_cn, 32 group_in_slapd_uid, 33 mock_slapd_connection, 34 user_in_slapd_cn, 35 user_in_slapd_uid, 36) 37from authentik.tasks.models import Task 38 39LDAP_PASSWORD = generate_key() 40 41 42class LDAPSyncTests(TestCase): 43 """LDAP Sync tests""" 44 45 @apply_blueprint("system/sources-ldap.yaml") 46 def setUp(self): 47 self.source: LDAPSource = LDAPSource.objects.create( 48 name="ldap", 49 slug="ldap", 50 base_dn="dc=goauthentik,dc=io", 51 additional_user_dn="ou=users", 52 additional_group_dn="ou=groups", 53 ) 54 55 def test_sync_missing_page(self): 56 """Test sync with missing page""" 57 connection = MagicMock(return_value=mock_ad_connection()) 58 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 59 ldap_sync_page.send(self.source.pk, class_to_path(UserLDAPSynchronizer), "foo") 60 61 def test_sync_error(self): 62 """Test user sync""" 63 self.source.base_dn = "dc=t,dc=goauthentik,dc=io" 64 self.source.additional_user_dn = "" 65 self.source.additional_group_dn = "" 66 self.source.save() 67 self.source.user_property_mappings.set( 68 LDAPSourcePropertyMapping.objects.filter( 69 Q(managed__startswith="goauthentik.io/sources/ldap/default") 70 | Q(managed__startswith="goauthentik.io/sources/ldap/ms") 71 ) 72 ) 73 mapping = LDAPSourcePropertyMapping.objects.create( 74 name="name", 75 expression="q", 76 ) 77 self.source.user_property_mappings.set([mapping]) 78 self.source.save() 79 connection = MagicMock(return_value=mock_ad_connection()) 80 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 81 user_sync = UserLDAPSynchronizer(self.source, Task()) 82 with self.assertRaises(StopSync): 83 user_sync.sync_full() 84 self.assertFalse(User.objects.filter(username="user0_sn").exists()) 85 self.assertFalse(User.objects.filter(username="user1_sn").exists()) 86 events = Event.objects.filter( 87 action=EventAction.CONFIGURATION_ERROR, 88 context__message="Failed to evaluate property mapping: 'name'", 89 context__mapping__pk=mapping.pk.hex, 90 ) 91 self.assertTrue(events.exists()) 92 93 def test_sync_mapping(self): 94 """Test property mappings""" 95 none = LDAPSourcePropertyMapping.objects.create( 96 name=generate_id(), expression="return None" 97 ) 98 byte_mapping = LDAPSourcePropertyMapping.objects.create( 99 name=generate_id(), expression="return b''" 100 ) 101 self.source.user_property_mappings.set( 102 LDAPSourcePropertyMapping.objects.filter( 103 Q(managed__startswith="goauthentik.io/sources/ldap/default") 104 | Q(managed__startswith="goauthentik.io/sources/ldap/ms") 105 ) 106 ) 107 self.source.user_property_mappings.add(none, byte_mapping) 108 connection = MagicMock(return_value=mock_ad_connection()) 109 110 # we basically just test that the mappings don't throw errors 111 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 112 user_sync = UserLDAPSynchronizer(self.source, Task()) 113 user_sync.sync_full() 114 115 def test_sync_users_ad(self): 116 """Test user sync""" 117 self.source.base_dn = "dc=t,dc=goauthentik,dc=io" 118 self.source.additional_user_dn = "" 119 self.source.additional_group_dn = "" 120 self.source.save() 121 self.source.user_property_mappings.set( 122 LDAPSourcePropertyMapping.objects.filter( 123 Q(managed__startswith="goauthentik.io/sources/ldap/default") 124 | Q(managed__startswith="goauthentik.io/sources/ldap/ms") 125 ) 126 ) 127 connection = MagicMock(return_value=mock_ad_connection()) 128 129 # Create the user beforehand so we can set attributes and check they aren't removed 130 user = User.objects.create( 131 username="erin.h", 132 attributes={ 133 "foo": "bar", 134 }, 135 ) 136 UserLDAPSourceConnection.objects.create( 137 user=user, 138 source=self.source, 139 identifier="S-1-5-21-1955698215-2946288202-2760262721-1114", 140 ) 141 142 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 143 user_sync = UserLDAPSynchronizer(self.source, Task()) 144 user_sync.sync_full() 145 146 user.refresh_from_db() 147 self.assertEqual(user.name, "Erin M. Hagens") 148 self.assertEqual(user.attributes["foo"], "bar") 149 self.assertTrue(user.is_active) 150 self.assertEqual(user.path, "goauthentik.io/sources/ldap/ak-test") 151 152 deactivated = User.objects.filter(username="deactivated.a").first() 153 self.assertIsNotNone(deactivated) 154 self.assertFalse(deactivated.is_active) 155 156 def test_sync_ad_legacy(self): 157 """Test user sync""" 158 self.source.base_dn = "dc=t,dc=goauthentik,dc=io" 159 self.source.additional_user_dn = "" 160 self.source.additional_group_dn = "" 161 self.source.save() 162 self.source.user_property_mappings.set( 163 LDAPSourcePropertyMapping.objects.filter( 164 Q(managed__startswith="goauthentik.io/sources/ldap/default") 165 | Q(managed__startswith="goauthentik.io/sources/ldap/ms") 166 ) 167 ) 168 self.source.group_property_mappings.set( 169 LDAPSourcePropertyMapping.objects.filter( 170 managed="goauthentik.io/sources/ldap/default-name" 171 ) 172 ) 173 connection = MagicMock(return_value=mock_ad_connection()) 174 175 # Create the user beforehand so we can set attributes and check they aren't removed 176 user = User.objects.create( 177 username="erin.h", 178 attributes={ 179 "ldap_uniq": "S-1-5-21-1955698215-2946288202-2760262721-1114", 180 "foo": "bar", 181 }, 182 ) 183 group = Group.objects.create( 184 name="Administrators", attributes={"ldap_uniq": "S-1-5-32-544", "foo": "bar"} 185 ) 186 187 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 188 user_sync = UserLDAPSynchronizer(self.source, Task()) 189 user_sync.sync_full() 190 group_sync = GroupLDAPSynchronizer(self.source, Task()) 191 group_sync.sync_full() 192 193 user.refresh_from_db() 194 group.refresh_from_db() 195 196 self.assertEqual(user.name, "Erin M. Hagens") 197 self.assertEqual(user.attributes["foo"], "bar") 198 self.assertTrue(user.is_active) 199 self.assertEqual(user.path, "goauthentik.io/sources/ldap/ak-test") 200 self.assertTrue( 201 UserLDAPSourceConnection.objects.filter( 202 source=self.source, 203 user=user, 204 identifier="S-1-5-21-1955698215-2946288202-2760262721-1114", 205 ).exists() 206 ) 207 208 deactivated = User.objects.filter(username="deactivated.a").first() 209 self.assertIsNotNone(deactivated) 210 self.assertFalse(deactivated.is_active) 211 212 self.assertEqual(group.name, "Administrators") 213 self.assertTrue( 214 GroupLDAPSourceConnection.objects.filter( 215 source=self.source, group=group, identifier="S-1-5-32-544" 216 ).exists() 217 ) 218 self.assertEqual(group.attributes["foo"], "bar") 219 220 def test_sync_users_openldap(self): 221 """Test user sync""" 222 self.source.object_uniqueness_field = "uid" 223 self.source.user_property_mappings.set( 224 LDAPSourcePropertyMapping.objects.filter( 225 Q(managed__startswith="goauthentik.io/sources/ldap/default") 226 | Q(managed__startswith="goauthentik.io/sources/ldap/openldap") 227 ) 228 ) 229 connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD)) 230 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 231 user_sync = UserLDAPSynchronizer(self.source, Task()) 232 user_sync.sync_full() 233 self.assertTrue(User.objects.filter(username="user0_sn").exists()) 234 self.assertFalse(User.objects.filter(username="user1_sn").exists()) 235 236 def test_sync_users_freeipa_ish(self): 237 """Test user sync (FreeIPA-ish), mainly testing vendor quirks""" 238 self.source.object_uniqueness_field = "uid" 239 self.source.user_property_mappings.set( 240 LDAPSourcePropertyMapping.objects.filter( 241 Q(managed__startswith="goauthentik.io/sources/ldap/default") 242 | Q(managed__startswith="goauthentik.io/sources/ldap/openldap") 243 ) 244 ) 245 connection = MagicMock(return_value=mock_freeipa_connection(LDAP_PASSWORD)) 246 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 247 user_sync = UserLDAPSynchronizer(self.source, Task()) 248 user_sync.sync_full() 249 self.assertTrue(User.objects.filter(username="user0_sn").exists()) 250 self.assertFalse(User.objects.filter(username="user1_sn").exists()) 251 self.assertFalse(User.objects.get(username="user-nsaccountlock").is_active) 252 253 def test_sync_groups_freeipa_memberOf(self): 254 """Test group sync when membership is derived from memberOf user attribute""" 255 self.source.object_uniqueness_field = "uid" 256 self.source.group_object_filter = "(objectClass=groupOfNames)" 257 self.source.lookup_groups_from_user = True 258 self.source.group_membership_field = "memberOf" 259 self.source.user_property_mappings.set( 260 LDAPSourcePropertyMapping.objects.filter( 261 Q(managed__startswith="goauthentik.io/sources/ldap/default") 262 | Q(managed__startswith="goauthentik.io/sources/ldap/openldap") 263 ) 264 ) 265 self.source.group_property_mappings.set( 266 LDAPSourcePropertyMapping.objects.filter( 267 managed="goauthentik.io/sources/ldap/openldap-cn" 268 ) 269 ) 270 connection = MagicMock(return_value=mock_freeipa_connection(LDAP_PASSWORD)) 271 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 272 user_sync = UserLDAPSynchronizer(self.source, Task()) 273 user_sync.sync_full() 274 group_sync = GroupLDAPSynchronizer(self.source, Task()) 275 group_sync.sync_full() 276 membership_sync = MembershipLDAPSynchronizer(self.source, Task()) 277 membership_sync.sync_full() 278 279 self.assertTrue( 280 User.objects.filter(username="user4_sn").exists(), "User does not exist" 281 ) 282 # Test if membership mapping based on memberOf works. 283 memberof_group = Group.objects.filter(name="reverse-lookup-group") 284 self.assertTrue(memberof_group.exists(), "Group does not exist") 285 self.assertTrue( 286 memberof_group.first().users.filter(username="user4_sn").exists(), 287 "User not a member of the group", 288 ) 289 290 def test_sync_groups_ad(self): 291 """Test group sync""" 292 self.source.base_dn = "dc=t,dc=goauthentik,dc=io" 293 self.source.additional_user_dn = "" 294 self.source.additional_group_dn = "" 295 self.source.save() 296 self.source.user_property_mappings.set( 297 LDAPSourcePropertyMapping.objects.filter( 298 Q(managed__startswith="goauthentik.io/sources/ldap/default") 299 | Q(managed__startswith="goauthentik.io/sources/ldap/ms") 300 ) 301 ) 302 self.source.group_property_mappings.set( 303 LDAPSourcePropertyMapping.objects.filter( 304 managed="goauthentik.io/sources/ldap/default-name" 305 ) 306 ) 307 connection = MagicMock(return_value=mock_ad_connection()) 308 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 309 _user = create_test_admin_user() 310 parent_group = Group.objects.get(name=_user.username) 311 self.source.sync_parent_group = parent_group 312 self.source.save() 313 group_sync = GroupLDAPSynchronizer(self.source, Task()) 314 group_sync.sync_full() 315 membership_sync = MembershipLDAPSynchronizer(self.source, Task()) 316 membership_sync.sync_full() 317 group: Group = Group.objects.filter(name="Test Group").first() 318 self.assertIsNotNone(group) 319 self.assertEqual(group.parents.first(), parent_group) 320 321 def test_sync_groups_openldap(self): 322 """Test group sync""" 323 self.source.object_uniqueness_field = "uid" 324 self.source.group_object_filter = "(objectClass=groupOfNames)" 325 self.source.user_property_mappings.set( 326 LDAPSourcePropertyMapping.objects.filter( 327 Q(managed__startswith="goauthentik.io/sources/ldap/default") 328 | Q(managed__startswith="goauthentik.io/sources/ldap/openldap") 329 ) 330 ) 331 self.source.group_property_mappings.set( 332 LDAPSourcePropertyMapping.objects.filter( 333 managed="goauthentik.io/sources/ldap/openldap-cn" 334 ) 335 ) 336 connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD)) 337 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 338 self.source.save() 339 group_sync = GroupLDAPSynchronizer(self.source, Task()) 340 group_sync.sync_full() 341 membership_sync = MembershipLDAPSynchronizer(self.source, Task()) 342 membership_sync.sync_full() 343 group = Group.objects.filter(name="group1") 344 self.assertTrue(group.exists()) 345 346 def test_sync_groups_openldap_posix_group(self): 347 """Test posix group sync""" 348 self.source.object_uniqueness_field = "cn" 349 self.source.group_membership_field = "memberUid" 350 self.source.user_object_filter = "(objectClass=posixAccount)" 351 self.source.group_object_filter = "(objectClass=posixGroup)" 352 self.source.user_membership_attribute = "uid" 353 self.source.user_property_mappings.set( 354 [ 355 *LDAPSourcePropertyMapping.objects.filter( 356 Q(managed__startswith="goauthentik.io/sources/ldap/default") 357 | Q(managed__startswith="goauthentik.io/sources/ldap/openldap") 358 ).all(), 359 LDAPSourcePropertyMapping.objects.create( 360 name="name", 361 expression='return {"attributes": {"uid": list_flatten(ldap.get("uid"))}}', 362 ), 363 ] 364 ) 365 self.source.group_property_mappings.set( 366 LDAPSourcePropertyMapping.objects.filter( 367 managed="goauthentik.io/sources/ldap/openldap-cn" 368 ) 369 ) 370 connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD)) 371 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 372 self.source.save() 373 user_sync = UserLDAPSynchronizer(self.source, Task()) 374 user_sync.sync_full() 375 group_sync = GroupLDAPSynchronizer(self.source, Task()) 376 group_sync.sync_full() 377 membership_sync = MembershipLDAPSynchronizer(self.source, Task()) 378 membership_sync.sync_full() 379 # Test if membership mapping based on memberUid works. 380 posix_group = Group.objects.filter(name="group-posix").first() 381 self.assertTrue(posix_group.users.filter(name="user-posix").exists()) 382 383 def test_sync_groups_openldap_posix_group_nonstandard_membership_attribute(self): 384 """Test posix group sync""" 385 self.source.object_uniqueness_field = "cn" 386 self.source.group_membership_field = "memberUid" 387 self.source.user_object_filter = "(objectClass=posixAccount)" 388 self.source.group_object_filter = "(objectClass=posixGroup)" 389 self.source.user_membership_attribute = "cn" 390 self.source.user_property_mappings.set( 391 [ 392 *LDAPSourcePropertyMapping.objects.filter( 393 Q(managed__startswith="goauthentik.io/sources/ldap/default") 394 | Q(managed__startswith="goauthentik.io/sources/ldap/openldap") 395 ).all(), 396 LDAPSourcePropertyMapping.objects.create( 397 name="name", 398 expression='return {"attributes": {"cn": list_flatten(ldap.get("cn"))}}', 399 ), 400 ] 401 ) 402 self.source.group_property_mappings.set( 403 LDAPSourcePropertyMapping.objects.filter( 404 managed="goauthentik.io/sources/ldap/openldap-cn" 405 ) 406 ) 407 connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD)) 408 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 409 self.source.save() 410 user_sync = UserLDAPSynchronizer(self.source, Task()) 411 user_sync.sync_full() 412 group_sync = GroupLDAPSynchronizer(self.source, Task()) 413 group_sync.sync_full() 414 membership_sync = MembershipLDAPSynchronizer(self.source, Task()) 415 membership_sync.sync_full() 416 # Test if membership mapping based on memberUid works. 417 posix_group = Group.objects.filter(name="group-posix").first() 418 self.assertTrue(posix_group.users.filter(name="user-posix").exists()) 419 420 def test_tasks_ad(self): 421 """Test Scheduled tasks""" 422 self.source.user_property_mappings.set( 423 LDAPSourcePropertyMapping.objects.filter( 424 Q(managed__startswith="goauthentik.io/sources/ldap/default") 425 | Q(managed__startswith="goauthentik.io/sources/ldap/ms") 426 ) 427 ) 428 self.source.save() 429 connection = MagicMock(return_value=mock_ad_connection()) 430 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 431 ldap_sync.send(self.source.pk) 432 433 def test_tasks_openldap(self): 434 """Test Scheduled tasks""" 435 self.source.object_uniqueness_field = "uid" 436 self.source.group_object_filter = "(objectClass=groupOfNames)" 437 self.source.user_property_mappings.set( 438 LDAPSourcePropertyMapping.objects.filter( 439 Q(managed__startswith="goauthentik.io/sources/ldap/default") 440 | Q(managed__startswith="goauthentik.io/sources/ldap/openldap") 441 ) 442 ) 443 self.source.save() 444 connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD)) 445 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 446 ldap_sync.send(self.source.pk) 447 448 def test_user_deletion(self): 449 """Test user deletion""" 450 user = User.objects.create_user(username="not-in-the-source") 451 UserLDAPSourceConnection.objects.create( 452 user=user, source=self.source, identifier="not-in-the-source" 453 ) 454 self.source.object_uniqueness_field = "uid" 455 self.source.group_object_filter = "(objectClass=groupOfNames)" 456 self.source.delete_not_found_objects = True 457 self.source.save() 458 459 connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD)) 460 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 461 ldap_sync.send(self.source.pk) 462 self.assertFalse(User.objects.filter(username="not-in-the-source").exists()) 463 464 def test_user_deletion_still_in_source(self): 465 """Test that user is not deleted if it's still in the source""" 466 username = user_in_slapd_cn 467 identifier = user_in_slapd_uid 468 user = User.objects.create_user(username=username) 469 UserLDAPSourceConnection.objects.create( 470 user=user, source=self.source, identifier=identifier 471 ) 472 self.source.object_uniqueness_field = "uid" 473 self.source.group_object_filter = "(objectClass=groupOfNames)" 474 self.source.delete_not_found_objects = True 475 self.source.save() 476 477 connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD)) 478 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 479 ldap_sync.send(self.source.pk) 480 self.assertTrue(User.objects.filter(username=username).exists()) 481 482 def test_user_deletion_no_sync(self): 483 """Test that user is not deleted if sync_users is False""" 484 user = User.objects.create_user(username="not-in-the-source") 485 UserLDAPSourceConnection.objects.create( 486 user=user, source=self.source, identifier="not-in-the-source" 487 ) 488 self.source.object_uniqueness_field = "uid" 489 self.source.group_object_filter = "(objectClass=groupOfNames)" 490 self.source.delete_not_found_objects = True 491 self.source.sync_users = False 492 self.source.save() 493 494 connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD)) 495 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 496 ldap_sync.send(self.source.pk) 497 self.assertTrue(User.objects.filter(username="not-in-the-source").exists()) 498 499 def test_user_deletion_no_delete(self): 500 """Test that user is not deleted if delete_not_found_objects is False""" 501 user = User.objects.create_user(username="not-in-the-source") 502 UserLDAPSourceConnection.objects.create( 503 user=user, source=self.source, identifier="not-in-the-source" 504 ) 505 self.source.object_uniqueness_field = "uid" 506 self.source.group_object_filter = "(objectClass=groupOfNames)" 507 self.source.save() 508 509 connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD)) 510 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 511 ldap_sync.send(self.source.pk) 512 self.assertTrue(User.objects.filter(username="not-in-the-source").exists()) 513 514 def test_group_deletion(self): 515 """Test group deletion""" 516 group = Group.objects.create(name="not-in-the-source") 517 GroupLDAPSourceConnection.objects.create( 518 group=group, source=self.source, identifier="not-in-the-source" 519 ) 520 self.source.object_uniqueness_field = "uid" 521 self.source.group_object_filter = "(objectClass=groupOfNames)" 522 self.source.delete_not_found_objects = True 523 self.source.save() 524 525 connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD)) 526 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 527 ldap_sync.send(self.source.pk) 528 self.assertFalse(Group.objects.filter(name="not-in-the-source").exists()) 529 530 def test_group_deletion_still_in_source(self): 531 """Test that group is not deleted if it's still in the source""" 532 groupname = group_in_slapd_cn 533 identifier = group_in_slapd_uid 534 group = Group.objects.create(name=groupname) 535 GroupLDAPSourceConnection.objects.create( 536 group=group, source=self.source, identifier=identifier 537 ) 538 self.source.object_uniqueness_field = "uid" 539 self.source.group_object_filter = "(objectClass=groupOfNames)" 540 self.source.delete_not_found_objects = True 541 self.source.save() 542 543 connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD)) 544 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 545 ldap_sync.send(self.source.pk) 546 self.assertTrue(Group.objects.filter(name=groupname).exists()) 547 548 def test_group_deletion_no_sync(self): 549 """Test that group is not deleted if sync_groups is False""" 550 group = Group.objects.create(name="not-in-the-source") 551 GroupLDAPSourceConnection.objects.create( 552 group=group, source=self.source, identifier="not-in-the-source" 553 ) 554 self.source.object_uniqueness_field = "uid" 555 self.source.group_object_filter = "(objectClass=groupOfNames)" 556 self.source.delete_not_found_objects = True 557 self.source.sync_groups = False 558 self.source.save() 559 560 connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD)) 561 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 562 ldap_sync.send(self.source.pk) 563 self.assertTrue(Group.objects.filter(name="not-in-the-source").exists()) 564 565 def test_group_deletion_no_delete(self): 566 """Test that group is not deleted if delete_not_found_objects is False""" 567 group = Group.objects.create(name="not-in-the-source") 568 GroupLDAPSourceConnection.objects.create( 569 group=group, source=self.source, identifier="not-in-the-source" 570 ) 571 self.source.object_uniqueness_field = "uid" 572 self.source.group_object_filter = "(objectClass=groupOfNames)" 573 self.source.save() 574 575 connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD)) 576 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 577 ldap_sync.send(self.source.pk) 578 self.assertTrue(Group.objects.filter(name="not-in-the-source").exists()) 579 580 def test_batch_deletion(self): 581 """Test batch deletion""" 582 BATCH_SIZE = DELETE_CHUNK_SIZE + 1 583 for i in range(BATCH_SIZE): 584 user = User.objects.create_user(username=f"not-in-the-source-{i}") 585 group = Group.objects.create(name=f"not-in-the-source-{i}") 586 group.users.add(user) 587 UserLDAPSourceConnection.objects.create( 588 user=user, source=self.source, identifier=f"not-in-the-source-{i}-user" 589 ) 590 GroupLDAPSourceConnection.objects.create( 591 group=group, source=self.source, identifier=f"not-in-the-source-{i}-group" 592 ) 593 self.source.object_uniqueness_field = "uid" 594 self.source.group_object_filter = "(objectClass=groupOfNames)" 595 self.source.delete_not_found_objects = True 596 self.source.save() 597 598 connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD)) 599 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 600 ldap_sync.send(self.source.pk) 601 602 self.assertFalse(User.objects.filter(username__startswith="not-in-the-source").exists()) 603 self.assertFalse(Group.objects.filter(name__startswith="not-in-the-source").exists()) 604 605 def test_membership_sync_special_chars_in_group_dn(self): 606 """Test membership synchronization with special characters in group DN""" 607 self.source.object_uniqueness_field = "uid" 608 self.source.group_object_filter = "(objectClass=groupOfNames)" 609 self.source.lookup_groups_from_user = True 610 self.source.group_membership_field = "memberOf" 611 612 # Mock connection with group DN containing special characters 613 mock_conn = MagicMock() 614 615 # Simulate group with special characters in DN: parentheses, backslashes, asterisks 616 special_group_dn = "cn=test(group),ou=groups,dc=example,dc=com" 617 backslash_group_dn = "cn=test\\group,ou=groups,dc=example,dc=com" 618 asterisk_group_dn = "cn=test*group,ou=groups,dc=example,dc=com" 619 620 # Mock the paged_search method that would be called with the filter 621 mock_standard = MagicMock() 622 mock_conn.extend.standard = mock_standard 623 624 # Test case 1: Group DN with parentheses 625 with patch("authentik.sources.ldap.models.LDAPSource.connection", return_value=mock_conn): 626 membership_sync = MembershipLDAPSynchronizer(self.source, Task()) 627 628 # Simulate group data with special characters in DN 629 page_data = [{"dn": special_group_dn}] 630 631 # This should not raise LDAPInvalidFilterError anymore 632 try: 633 membership_sync.sync(page_data) 634 # Verify that the filter was properly escaped 635 # The call should have been made with escaped characters 636 mock_standard.paged_search.assert_called() 637 call_args = mock_standard.paged_search.call_args 638 search_filter = call_args[1]["search_filter"] 639 # The parentheses should be escaped as \28 and \29 640 self.assertIn("\\28", search_filter) # Escaped ( 641 self.assertIn("\\29", search_filter) # Escaped ) 642 except LDAPInvalidFilterError: 643 self.fail("LDAPInvalidFilterError should not be raised with escaped filter") 644 645 # Test case 2: Group DN with backslashes 646 with patch("authentik.sources.ldap.models.LDAPSource.connection", return_value=mock_conn): 647 membership_sync = MembershipLDAPSynchronizer(self.source, Task()) 648 page_data = [{"dn": backslash_group_dn}] 649 650 try: 651 membership_sync.sync(page_data) 652 call_args = mock_standard.paged_search.call_args 653 search_filter = call_args[1]["search_filter"] 654 # The backslash should be escaped as \5c 655 self.assertIn("\\5c", search_filter) # Escaped \ 656 except LDAPInvalidFilterError: 657 self.fail("LDAPInvalidFilterError should not be raised with escaped filter") 658 659 # Test case 3: Group DN with asterisks 660 with patch("authentik.sources.ldap.models.LDAPSource.connection", return_value=mock_conn): 661 membership_sync = MembershipLDAPSynchronizer(self.source, Task()) 662 page_data = [{"dn": asterisk_group_dn}] 663 664 try: 665 membership_sync.sync(page_data) 666 call_args = mock_standard.paged_search.call_args 667 search_filter = call_args[1]["search_filter"] 668 # The asterisk should be escaped as \2a 669 self.assertIn("\\2a", search_filter) # Escaped * 670 except LDAPInvalidFilterError: 671 self.fail("LDAPInvalidFilterError should not be raised with escaped filter") 672 673 def test_escape_filter_chars_function(self): 674 """Test the escape_filter_chars function directly""" 675 676 # Test various special characters that need escaping 677 test_cases = [ 678 ("test(group)", "test\\28group\\29"), # parentheses 679 ("test\\group", "test\\5cgroup"), # backslash 680 ("test*group", "test\\2agroup"), # asterisk 681 ("test(*)group", "test\\28\\2a\\29group"), # multiple special chars 682 ("normalgroup", "normalgroup"), # no special chars 683 ("", ""), # empty string 684 ] 685 686 for input_str, expected in test_cases: 687 with self.subTest(input_str=input_str): 688 result = escape_filter_chars(input_str) 689 self.assertEqual(result, expected)
43class LDAPSyncTests(TestCase): 44 """LDAP Sync tests""" 45 46 @apply_blueprint("system/sources-ldap.yaml") 47 def setUp(self): 48 self.source: LDAPSource = LDAPSource.objects.create( 49 name="ldap", 50 slug="ldap", 51 base_dn="dc=goauthentik,dc=io", 52 additional_user_dn="ou=users", 53 additional_group_dn="ou=groups", 54 ) 55 56 def test_sync_missing_page(self): 57 """Test sync with missing page""" 58 connection = MagicMock(return_value=mock_ad_connection()) 59 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 60 ldap_sync_page.send(self.source.pk, class_to_path(UserLDAPSynchronizer), "foo") 61 62 def test_sync_error(self): 63 """Test user sync""" 64 self.source.base_dn = "dc=t,dc=goauthentik,dc=io" 65 self.source.additional_user_dn = "" 66 self.source.additional_group_dn = "" 67 self.source.save() 68 self.source.user_property_mappings.set( 69 LDAPSourcePropertyMapping.objects.filter( 70 Q(managed__startswith="goauthentik.io/sources/ldap/default") 71 | Q(managed__startswith="goauthentik.io/sources/ldap/ms") 72 ) 73 ) 74 mapping = LDAPSourcePropertyMapping.objects.create( 75 name="name", 76 expression="q", 77 ) 78 self.source.user_property_mappings.set([mapping]) 79 self.source.save() 80 connection = MagicMock(return_value=mock_ad_connection()) 81 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 82 user_sync = UserLDAPSynchronizer(self.source, Task()) 83 with self.assertRaises(StopSync): 84 user_sync.sync_full() 85 self.assertFalse(User.objects.filter(username="user0_sn").exists()) 86 self.assertFalse(User.objects.filter(username="user1_sn").exists()) 87 events = Event.objects.filter( 88 action=EventAction.CONFIGURATION_ERROR, 89 context__message="Failed to evaluate property mapping: 'name'", 90 context__mapping__pk=mapping.pk.hex, 91 ) 92 self.assertTrue(events.exists()) 93 94 def test_sync_mapping(self): 95 """Test property mappings""" 96 none = LDAPSourcePropertyMapping.objects.create( 97 name=generate_id(), expression="return None" 98 ) 99 byte_mapping = LDAPSourcePropertyMapping.objects.create( 100 name=generate_id(), expression="return b''" 101 ) 102 self.source.user_property_mappings.set( 103 LDAPSourcePropertyMapping.objects.filter( 104 Q(managed__startswith="goauthentik.io/sources/ldap/default") 105 | Q(managed__startswith="goauthentik.io/sources/ldap/ms") 106 ) 107 ) 108 self.source.user_property_mappings.add(none, byte_mapping) 109 connection = MagicMock(return_value=mock_ad_connection()) 110 111 # we basically just test that the mappings don't throw errors 112 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 113 user_sync = UserLDAPSynchronizer(self.source, Task()) 114 user_sync.sync_full() 115 116 def test_sync_users_ad(self): 117 """Test user sync""" 118 self.source.base_dn = "dc=t,dc=goauthentik,dc=io" 119 self.source.additional_user_dn = "" 120 self.source.additional_group_dn = "" 121 self.source.save() 122 self.source.user_property_mappings.set( 123 LDAPSourcePropertyMapping.objects.filter( 124 Q(managed__startswith="goauthentik.io/sources/ldap/default") 125 | Q(managed__startswith="goauthentik.io/sources/ldap/ms") 126 ) 127 ) 128 connection = MagicMock(return_value=mock_ad_connection()) 129 130 # Create the user beforehand so we can set attributes and check they aren't removed 131 user = User.objects.create( 132 username="erin.h", 133 attributes={ 134 "foo": "bar", 135 }, 136 ) 137 UserLDAPSourceConnection.objects.create( 138 user=user, 139 source=self.source, 140 identifier="S-1-5-21-1955698215-2946288202-2760262721-1114", 141 ) 142 143 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 144 user_sync = UserLDAPSynchronizer(self.source, Task()) 145 user_sync.sync_full() 146 147 user.refresh_from_db() 148 self.assertEqual(user.name, "Erin M. Hagens") 149 self.assertEqual(user.attributes["foo"], "bar") 150 self.assertTrue(user.is_active) 151 self.assertEqual(user.path, "goauthentik.io/sources/ldap/ak-test") 152 153 deactivated = User.objects.filter(username="deactivated.a").first() 154 self.assertIsNotNone(deactivated) 155 self.assertFalse(deactivated.is_active) 156 157 def test_sync_ad_legacy(self): 158 """Test user sync""" 159 self.source.base_dn = "dc=t,dc=goauthentik,dc=io" 160 self.source.additional_user_dn = "" 161 self.source.additional_group_dn = "" 162 self.source.save() 163 self.source.user_property_mappings.set( 164 LDAPSourcePropertyMapping.objects.filter( 165 Q(managed__startswith="goauthentik.io/sources/ldap/default") 166 | Q(managed__startswith="goauthentik.io/sources/ldap/ms") 167 ) 168 ) 169 self.source.group_property_mappings.set( 170 LDAPSourcePropertyMapping.objects.filter( 171 managed="goauthentik.io/sources/ldap/default-name" 172 ) 173 ) 174 connection = MagicMock(return_value=mock_ad_connection()) 175 176 # Create the user beforehand so we can set attributes and check they aren't removed 177 user = User.objects.create( 178 username="erin.h", 179 attributes={ 180 "ldap_uniq": "S-1-5-21-1955698215-2946288202-2760262721-1114", 181 "foo": "bar", 182 }, 183 ) 184 group = Group.objects.create( 185 name="Administrators", attributes={"ldap_uniq": "S-1-5-32-544", "foo": "bar"} 186 ) 187 188 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 189 user_sync = UserLDAPSynchronizer(self.source, Task()) 190 user_sync.sync_full() 191 group_sync = GroupLDAPSynchronizer(self.source, Task()) 192 group_sync.sync_full() 193 194 user.refresh_from_db() 195 group.refresh_from_db() 196 197 self.assertEqual(user.name, "Erin M. Hagens") 198 self.assertEqual(user.attributes["foo"], "bar") 199 self.assertTrue(user.is_active) 200 self.assertEqual(user.path, "goauthentik.io/sources/ldap/ak-test") 201 self.assertTrue( 202 UserLDAPSourceConnection.objects.filter( 203 source=self.source, 204 user=user, 205 identifier="S-1-5-21-1955698215-2946288202-2760262721-1114", 206 ).exists() 207 ) 208 209 deactivated = User.objects.filter(username="deactivated.a").first() 210 self.assertIsNotNone(deactivated) 211 self.assertFalse(deactivated.is_active) 212 213 self.assertEqual(group.name, "Administrators") 214 self.assertTrue( 215 GroupLDAPSourceConnection.objects.filter( 216 source=self.source, group=group, identifier="S-1-5-32-544" 217 ).exists() 218 ) 219 self.assertEqual(group.attributes["foo"], "bar") 220 221 def test_sync_users_openldap(self): 222 """Test user sync""" 223 self.source.object_uniqueness_field = "uid" 224 self.source.user_property_mappings.set( 225 LDAPSourcePropertyMapping.objects.filter( 226 Q(managed__startswith="goauthentik.io/sources/ldap/default") 227 | Q(managed__startswith="goauthentik.io/sources/ldap/openldap") 228 ) 229 ) 230 connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD)) 231 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 232 user_sync = UserLDAPSynchronizer(self.source, Task()) 233 user_sync.sync_full() 234 self.assertTrue(User.objects.filter(username="user0_sn").exists()) 235 self.assertFalse(User.objects.filter(username="user1_sn").exists()) 236 237 def test_sync_users_freeipa_ish(self): 238 """Test user sync (FreeIPA-ish), mainly testing vendor quirks""" 239 self.source.object_uniqueness_field = "uid" 240 self.source.user_property_mappings.set( 241 LDAPSourcePropertyMapping.objects.filter( 242 Q(managed__startswith="goauthentik.io/sources/ldap/default") 243 | Q(managed__startswith="goauthentik.io/sources/ldap/openldap") 244 ) 245 ) 246 connection = MagicMock(return_value=mock_freeipa_connection(LDAP_PASSWORD)) 247 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 248 user_sync = UserLDAPSynchronizer(self.source, Task()) 249 user_sync.sync_full() 250 self.assertTrue(User.objects.filter(username="user0_sn").exists()) 251 self.assertFalse(User.objects.filter(username="user1_sn").exists()) 252 self.assertFalse(User.objects.get(username="user-nsaccountlock").is_active) 253 254 def test_sync_groups_freeipa_memberOf(self): 255 """Test group sync when membership is derived from memberOf user attribute""" 256 self.source.object_uniqueness_field = "uid" 257 self.source.group_object_filter = "(objectClass=groupOfNames)" 258 self.source.lookup_groups_from_user = True 259 self.source.group_membership_field = "memberOf" 260 self.source.user_property_mappings.set( 261 LDAPSourcePropertyMapping.objects.filter( 262 Q(managed__startswith="goauthentik.io/sources/ldap/default") 263 | Q(managed__startswith="goauthentik.io/sources/ldap/openldap") 264 ) 265 ) 266 self.source.group_property_mappings.set( 267 LDAPSourcePropertyMapping.objects.filter( 268 managed="goauthentik.io/sources/ldap/openldap-cn" 269 ) 270 ) 271 connection = MagicMock(return_value=mock_freeipa_connection(LDAP_PASSWORD)) 272 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 273 user_sync = UserLDAPSynchronizer(self.source, Task()) 274 user_sync.sync_full() 275 group_sync = GroupLDAPSynchronizer(self.source, Task()) 276 group_sync.sync_full() 277 membership_sync = MembershipLDAPSynchronizer(self.source, Task()) 278 membership_sync.sync_full() 279 280 self.assertTrue( 281 User.objects.filter(username="user4_sn").exists(), "User does not exist" 282 ) 283 # Test if membership mapping based on memberOf works. 284 memberof_group = Group.objects.filter(name="reverse-lookup-group") 285 self.assertTrue(memberof_group.exists(), "Group does not exist") 286 self.assertTrue( 287 memberof_group.first().users.filter(username="user4_sn").exists(), 288 "User not a member of the group", 289 ) 290 291 def test_sync_groups_ad(self): 292 """Test group sync""" 293 self.source.base_dn = "dc=t,dc=goauthentik,dc=io" 294 self.source.additional_user_dn = "" 295 self.source.additional_group_dn = "" 296 self.source.save() 297 self.source.user_property_mappings.set( 298 LDAPSourcePropertyMapping.objects.filter( 299 Q(managed__startswith="goauthentik.io/sources/ldap/default") 300 | Q(managed__startswith="goauthentik.io/sources/ldap/ms") 301 ) 302 ) 303 self.source.group_property_mappings.set( 304 LDAPSourcePropertyMapping.objects.filter( 305 managed="goauthentik.io/sources/ldap/default-name" 306 ) 307 ) 308 connection = MagicMock(return_value=mock_ad_connection()) 309 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 310 _user = create_test_admin_user() 311 parent_group = Group.objects.get(name=_user.username) 312 self.source.sync_parent_group = parent_group 313 self.source.save() 314 group_sync = GroupLDAPSynchronizer(self.source, Task()) 315 group_sync.sync_full() 316 membership_sync = MembershipLDAPSynchronizer(self.source, Task()) 317 membership_sync.sync_full() 318 group: Group = Group.objects.filter(name="Test Group").first() 319 self.assertIsNotNone(group) 320 self.assertEqual(group.parents.first(), parent_group) 321 322 def test_sync_groups_openldap(self): 323 """Test group sync""" 324 self.source.object_uniqueness_field = "uid" 325 self.source.group_object_filter = "(objectClass=groupOfNames)" 326 self.source.user_property_mappings.set( 327 LDAPSourcePropertyMapping.objects.filter( 328 Q(managed__startswith="goauthentik.io/sources/ldap/default") 329 | Q(managed__startswith="goauthentik.io/sources/ldap/openldap") 330 ) 331 ) 332 self.source.group_property_mappings.set( 333 LDAPSourcePropertyMapping.objects.filter( 334 managed="goauthentik.io/sources/ldap/openldap-cn" 335 ) 336 ) 337 connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD)) 338 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 339 self.source.save() 340 group_sync = GroupLDAPSynchronizer(self.source, Task()) 341 group_sync.sync_full() 342 membership_sync = MembershipLDAPSynchronizer(self.source, Task()) 343 membership_sync.sync_full() 344 group = Group.objects.filter(name="group1") 345 self.assertTrue(group.exists()) 346 347 def test_sync_groups_openldap_posix_group(self): 348 """Test posix group sync""" 349 self.source.object_uniqueness_field = "cn" 350 self.source.group_membership_field = "memberUid" 351 self.source.user_object_filter = "(objectClass=posixAccount)" 352 self.source.group_object_filter = "(objectClass=posixGroup)" 353 self.source.user_membership_attribute = "uid" 354 self.source.user_property_mappings.set( 355 [ 356 *LDAPSourcePropertyMapping.objects.filter( 357 Q(managed__startswith="goauthentik.io/sources/ldap/default") 358 | Q(managed__startswith="goauthentik.io/sources/ldap/openldap") 359 ).all(), 360 LDAPSourcePropertyMapping.objects.create( 361 name="name", 362 expression='return {"attributes": {"uid": list_flatten(ldap.get("uid"))}}', 363 ), 364 ] 365 ) 366 self.source.group_property_mappings.set( 367 LDAPSourcePropertyMapping.objects.filter( 368 managed="goauthentik.io/sources/ldap/openldap-cn" 369 ) 370 ) 371 connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD)) 372 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 373 self.source.save() 374 user_sync = UserLDAPSynchronizer(self.source, Task()) 375 user_sync.sync_full() 376 group_sync = GroupLDAPSynchronizer(self.source, Task()) 377 group_sync.sync_full() 378 membership_sync = MembershipLDAPSynchronizer(self.source, Task()) 379 membership_sync.sync_full() 380 # Test if membership mapping based on memberUid works. 381 posix_group = Group.objects.filter(name="group-posix").first() 382 self.assertTrue(posix_group.users.filter(name="user-posix").exists()) 383 384 def test_sync_groups_openldap_posix_group_nonstandard_membership_attribute(self): 385 """Test posix group sync""" 386 self.source.object_uniqueness_field = "cn" 387 self.source.group_membership_field = "memberUid" 388 self.source.user_object_filter = "(objectClass=posixAccount)" 389 self.source.group_object_filter = "(objectClass=posixGroup)" 390 self.source.user_membership_attribute = "cn" 391 self.source.user_property_mappings.set( 392 [ 393 *LDAPSourcePropertyMapping.objects.filter( 394 Q(managed__startswith="goauthentik.io/sources/ldap/default") 395 | Q(managed__startswith="goauthentik.io/sources/ldap/openldap") 396 ).all(), 397 LDAPSourcePropertyMapping.objects.create( 398 name="name", 399 expression='return {"attributes": {"cn": list_flatten(ldap.get("cn"))}}', 400 ), 401 ] 402 ) 403 self.source.group_property_mappings.set( 404 LDAPSourcePropertyMapping.objects.filter( 405 managed="goauthentik.io/sources/ldap/openldap-cn" 406 ) 407 ) 408 connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD)) 409 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 410 self.source.save() 411 user_sync = UserLDAPSynchronizer(self.source, Task()) 412 user_sync.sync_full() 413 group_sync = GroupLDAPSynchronizer(self.source, Task()) 414 group_sync.sync_full() 415 membership_sync = MembershipLDAPSynchronizer(self.source, Task()) 416 membership_sync.sync_full() 417 # Test if membership mapping based on memberUid works. 418 posix_group = Group.objects.filter(name="group-posix").first() 419 self.assertTrue(posix_group.users.filter(name="user-posix").exists()) 420 421 def test_tasks_ad(self): 422 """Test Scheduled tasks""" 423 self.source.user_property_mappings.set( 424 LDAPSourcePropertyMapping.objects.filter( 425 Q(managed__startswith="goauthentik.io/sources/ldap/default") 426 | Q(managed__startswith="goauthentik.io/sources/ldap/ms") 427 ) 428 ) 429 self.source.save() 430 connection = MagicMock(return_value=mock_ad_connection()) 431 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 432 ldap_sync.send(self.source.pk) 433 434 def test_tasks_openldap(self): 435 """Test Scheduled tasks""" 436 self.source.object_uniqueness_field = "uid" 437 self.source.group_object_filter = "(objectClass=groupOfNames)" 438 self.source.user_property_mappings.set( 439 LDAPSourcePropertyMapping.objects.filter( 440 Q(managed__startswith="goauthentik.io/sources/ldap/default") 441 | Q(managed__startswith="goauthentik.io/sources/ldap/openldap") 442 ) 443 ) 444 self.source.save() 445 connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD)) 446 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 447 ldap_sync.send(self.source.pk) 448 449 def test_user_deletion(self): 450 """Test user deletion""" 451 user = User.objects.create_user(username="not-in-the-source") 452 UserLDAPSourceConnection.objects.create( 453 user=user, source=self.source, identifier="not-in-the-source" 454 ) 455 self.source.object_uniqueness_field = "uid" 456 self.source.group_object_filter = "(objectClass=groupOfNames)" 457 self.source.delete_not_found_objects = True 458 self.source.save() 459 460 connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD)) 461 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 462 ldap_sync.send(self.source.pk) 463 self.assertFalse(User.objects.filter(username="not-in-the-source").exists()) 464 465 def test_user_deletion_still_in_source(self): 466 """Test that user is not deleted if it's still in the source""" 467 username = user_in_slapd_cn 468 identifier = user_in_slapd_uid 469 user = User.objects.create_user(username=username) 470 UserLDAPSourceConnection.objects.create( 471 user=user, source=self.source, identifier=identifier 472 ) 473 self.source.object_uniqueness_field = "uid" 474 self.source.group_object_filter = "(objectClass=groupOfNames)" 475 self.source.delete_not_found_objects = True 476 self.source.save() 477 478 connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD)) 479 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 480 ldap_sync.send(self.source.pk) 481 self.assertTrue(User.objects.filter(username=username).exists()) 482 483 def test_user_deletion_no_sync(self): 484 """Test that user is not deleted if sync_users is False""" 485 user = User.objects.create_user(username="not-in-the-source") 486 UserLDAPSourceConnection.objects.create( 487 user=user, source=self.source, identifier="not-in-the-source" 488 ) 489 self.source.object_uniqueness_field = "uid" 490 self.source.group_object_filter = "(objectClass=groupOfNames)" 491 self.source.delete_not_found_objects = True 492 self.source.sync_users = False 493 self.source.save() 494 495 connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD)) 496 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 497 ldap_sync.send(self.source.pk) 498 self.assertTrue(User.objects.filter(username="not-in-the-source").exists()) 499 500 def test_user_deletion_no_delete(self): 501 """Test that user is not deleted if delete_not_found_objects is False""" 502 user = User.objects.create_user(username="not-in-the-source") 503 UserLDAPSourceConnection.objects.create( 504 user=user, source=self.source, identifier="not-in-the-source" 505 ) 506 self.source.object_uniqueness_field = "uid" 507 self.source.group_object_filter = "(objectClass=groupOfNames)" 508 self.source.save() 509 510 connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD)) 511 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 512 ldap_sync.send(self.source.pk) 513 self.assertTrue(User.objects.filter(username="not-in-the-source").exists()) 514 515 def test_group_deletion(self): 516 """Test group deletion""" 517 group = Group.objects.create(name="not-in-the-source") 518 GroupLDAPSourceConnection.objects.create( 519 group=group, source=self.source, identifier="not-in-the-source" 520 ) 521 self.source.object_uniqueness_field = "uid" 522 self.source.group_object_filter = "(objectClass=groupOfNames)" 523 self.source.delete_not_found_objects = True 524 self.source.save() 525 526 connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD)) 527 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 528 ldap_sync.send(self.source.pk) 529 self.assertFalse(Group.objects.filter(name="not-in-the-source").exists()) 530 531 def test_group_deletion_still_in_source(self): 532 """Test that group is not deleted if it's still in the source""" 533 groupname = group_in_slapd_cn 534 identifier = group_in_slapd_uid 535 group = Group.objects.create(name=groupname) 536 GroupLDAPSourceConnection.objects.create( 537 group=group, source=self.source, identifier=identifier 538 ) 539 self.source.object_uniqueness_field = "uid" 540 self.source.group_object_filter = "(objectClass=groupOfNames)" 541 self.source.delete_not_found_objects = True 542 self.source.save() 543 544 connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD)) 545 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 546 ldap_sync.send(self.source.pk) 547 self.assertTrue(Group.objects.filter(name=groupname).exists()) 548 549 def test_group_deletion_no_sync(self): 550 """Test that group is not deleted if sync_groups is False""" 551 group = Group.objects.create(name="not-in-the-source") 552 GroupLDAPSourceConnection.objects.create( 553 group=group, source=self.source, identifier="not-in-the-source" 554 ) 555 self.source.object_uniqueness_field = "uid" 556 self.source.group_object_filter = "(objectClass=groupOfNames)" 557 self.source.delete_not_found_objects = True 558 self.source.sync_groups = False 559 self.source.save() 560 561 connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD)) 562 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 563 ldap_sync.send(self.source.pk) 564 self.assertTrue(Group.objects.filter(name="not-in-the-source").exists()) 565 566 def test_group_deletion_no_delete(self): 567 """Test that group is not deleted if delete_not_found_objects is False""" 568 group = Group.objects.create(name="not-in-the-source") 569 GroupLDAPSourceConnection.objects.create( 570 group=group, source=self.source, identifier="not-in-the-source" 571 ) 572 self.source.object_uniqueness_field = "uid" 573 self.source.group_object_filter = "(objectClass=groupOfNames)" 574 self.source.save() 575 576 connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD)) 577 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 578 ldap_sync.send(self.source.pk) 579 self.assertTrue(Group.objects.filter(name="not-in-the-source").exists()) 580 581 def test_batch_deletion(self): 582 """Test batch deletion""" 583 BATCH_SIZE = DELETE_CHUNK_SIZE + 1 584 for i in range(BATCH_SIZE): 585 user = User.objects.create_user(username=f"not-in-the-source-{i}") 586 group = Group.objects.create(name=f"not-in-the-source-{i}") 587 group.users.add(user) 588 UserLDAPSourceConnection.objects.create( 589 user=user, source=self.source, identifier=f"not-in-the-source-{i}-user" 590 ) 591 GroupLDAPSourceConnection.objects.create( 592 group=group, source=self.source, identifier=f"not-in-the-source-{i}-group" 593 ) 594 self.source.object_uniqueness_field = "uid" 595 self.source.group_object_filter = "(objectClass=groupOfNames)" 596 self.source.delete_not_found_objects = True 597 self.source.save() 598 599 connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD)) 600 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 601 ldap_sync.send(self.source.pk) 602 603 self.assertFalse(User.objects.filter(username__startswith="not-in-the-source").exists()) 604 self.assertFalse(Group.objects.filter(name__startswith="not-in-the-source").exists()) 605 606 def test_membership_sync_special_chars_in_group_dn(self): 607 """Test membership synchronization with special characters in group DN""" 608 self.source.object_uniqueness_field = "uid" 609 self.source.group_object_filter = "(objectClass=groupOfNames)" 610 self.source.lookup_groups_from_user = True 611 self.source.group_membership_field = "memberOf" 612 613 # Mock connection with group DN containing special characters 614 mock_conn = MagicMock() 615 616 # Simulate group with special characters in DN: parentheses, backslashes, asterisks 617 special_group_dn = "cn=test(group),ou=groups,dc=example,dc=com" 618 backslash_group_dn = "cn=test\\group,ou=groups,dc=example,dc=com" 619 asterisk_group_dn = "cn=test*group,ou=groups,dc=example,dc=com" 620 621 # Mock the paged_search method that would be called with the filter 622 mock_standard = MagicMock() 623 mock_conn.extend.standard = mock_standard 624 625 # Test case 1: Group DN with parentheses 626 with patch("authentik.sources.ldap.models.LDAPSource.connection", return_value=mock_conn): 627 membership_sync = MembershipLDAPSynchronizer(self.source, Task()) 628 629 # Simulate group data with special characters in DN 630 page_data = [{"dn": special_group_dn}] 631 632 # This should not raise LDAPInvalidFilterError anymore 633 try: 634 membership_sync.sync(page_data) 635 # Verify that the filter was properly escaped 636 # The call should have been made with escaped characters 637 mock_standard.paged_search.assert_called() 638 call_args = mock_standard.paged_search.call_args 639 search_filter = call_args[1]["search_filter"] 640 # The parentheses should be escaped as \28 and \29 641 self.assertIn("\\28", search_filter) # Escaped ( 642 self.assertIn("\\29", search_filter) # Escaped ) 643 except LDAPInvalidFilterError: 644 self.fail("LDAPInvalidFilterError should not be raised with escaped filter") 645 646 # Test case 2: Group DN with backslashes 647 with patch("authentik.sources.ldap.models.LDAPSource.connection", return_value=mock_conn): 648 membership_sync = MembershipLDAPSynchronizer(self.source, Task()) 649 page_data = [{"dn": backslash_group_dn}] 650 651 try: 652 membership_sync.sync(page_data) 653 call_args = mock_standard.paged_search.call_args 654 search_filter = call_args[1]["search_filter"] 655 # The backslash should be escaped as \5c 656 self.assertIn("\\5c", search_filter) # Escaped \ 657 except LDAPInvalidFilterError: 658 self.fail("LDAPInvalidFilterError should not be raised with escaped filter") 659 660 # Test case 3: Group DN with asterisks 661 with patch("authentik.sources.ldap.models.LDAPSource.connection", return_value=mock_conn): 662 membership_sync = MembershipLDAPSynchronizer(self.source, Task()) 663 page_data = [{"dn": asterisk_group_dn}] 664 665 try: 666 membership_sync.sync(page_data) 667 call_args = mock_standard.paged_search.call_args 668 search_filter = call_args[1]["search_filter"] 669 # The asterisk should be escaped as \2a 670 self.assertIn("\\2a", search_filter) # Escaped * 671 except LDAPInvalidFilterError: 672 self.fail("LDAPInvalidFilterError should not be raised with escaped filter") 673 674 def test_escape_filter_chars_function(self): 675 """Test the escape_filter_chars function directly""" 676 677 # Test various special characters that need escaping 678 test_cases = [ 679 ("test(group)", "test\\28group\\29"), # parentheses 680 ("test\\group", "test\\5cgroup"), # backslash 681 ("test*group", "test\\2agroup"), # asterisk 682 ("test(*)group", "test\\28\\2a\\29group"), # multiple special chars 683 ("normalgroup", "normalgroup"), # no special chars 684 ("", ""), # empty string 685 ] 686 687 for input_str, expected in test_cases: 688 with self.subTest(input_str=input_str): 689 result = escape_filter_chars(input_str) 690 self.assertEqual(result, expected)
LDAP Sync tests
46 @apply_blueprint("system/sources-ldap.yaml") 47 def setUp(self): 48 self.source: LDAPSource = LDAPSource.objects.create( 49 name="ldap", 50 slug="ldap", 51 base_dn="dc=goauthentik,dc=io", 52 additional_user_dn="ou=users", 53 additional_group_dn="ou=groups", 54 )
Hook method for setting up the test fixture before exercising it.
56 def test_sync_missing_page(self): 57 """Test sync with missing page""" 58 connection = MagicMock(return_value=mock_ad_connection()) 59 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 60 ldap_sync_page.send(self.source.pk, class_to_path(UserLDAPSynchronizer), "foo")
Test sync with missing page
62 def test_sync_error(self): 63 """Test user sync""" 64 self.source.base_dn = "dc=t,dc=goauthentik,dc=io" 65 self.source.additional_user_dn = "" 66 self.source.additional_group_dn = "" 67 self.source.save() 68 self.source.user_property_mappings.set( 69 LDAPSourcePropertyMapping.objects.filter( 70 Q(managed__startswith="goauthentik.io/sources/ldap/default") 71 | Q(managed__startswith="goauthentik.io/sources/ldap/ms") 72 ) 73 ) 74 mapping = LDAPSourcePropertyMapping.objects.create( 75 name="name", 76 expression="q", 77 ) 78 self.source.user_property_mappings.set([mapping]) 79 self.source.save() 80 connection = MagicMock(return_value=mock_ad_connection()) 81 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 82 user_sync = UserLDAPSynchronizer(self.source, Task()) 83 with self.assertRaises(StopSync): 84 user_sync.sync_full() 85 self.assertFalse(User.objects.filter(username="user0_sn").exists()) 86 self.assertFalse(User.objects.filter(username="user1_sn").exists()) 87 events = Event.objects.filter( 88 action=EventAction.CONFIGURATION_ERROR, 89 context__message="Failed to evaluate property mapping: 'name'", 90 context__mapping__pk=mapping.pk.hex, 91 ) 92 self.assertTrue(events.exists())
Test user sync
94 def test_sync_mapping(self): 95 """Test property mappings""" 96 none = LDAPSourcePropertyMapping.objects.create( 97 name=generate_id(), expression="return None" 98 ) 99 byte_mapping = LDAPSourcePropertyMapping.objects.create( 100 name=generate_id(), expression="return b''" 101 ) 102 self.source.user_property_mappings.set( 103 LDAPSourcePropertyMapping.objects.filter( 104 Q(managed__startswith="goauthentik.io/sources/ldap/default") 105 | Q(managed__startswith="goauthentik.io/sources/ldap/ms") 106 ) 107 ) 108 self.source.user_property_mappings.add(none, byte_mapping) 109 connection = MagicMock(return_value=mock_ad_connection()) 110 111 # we basically just test that the mappings don't throw errors 112 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 113 user_sync = UserLDAPSynchronizer(self.source, Task()) 114 user_sync.sync_full()
Test property mappings
116 def test_sync_users_ad(self): 117 """Test user sync""" 118 self.source.base_dn = "dc=t,dc=goauthentik,dc=io" 119 self.source.additional_user_dn = "" 120 self.source.additional_group_dn = "" 121 self.source.save() 122 self.source.user_property_mappings.set( 123 LDAPSourcePropertyMapping.objects.filter( 124 Q(managed__startswith="goauthentik.io/sources/ldap/default") 125 | Q(managed__startswith="goauthentik.io/sources/ldap/ms") 126 ) 127 ) 128 connection = MagicMock(return_value=mock_ad_connection()) 129 130 # Create the user beforehand so we can set attributes and check they aren't removed 131 user = User.objects.create( 132 username="erin.h", 133 attributes={ 134 "foo": "bar", 135 }, 136 ) 137 UserLDAPSourceConnection.objects.create( 138 user=user, 139 source=self.source, 140 identifier="S-1-5-21-1955698215-2946288202-2760262721-1114", 141 ) 142 143 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 144 user_sync = UserLDAPSynchronizer(self.source, Task()) 145 user_sync.sync_full() 146 147 user.refresh_from_db() 148 self.assertEqual(user.name, "Erin M. Hagens") 149 self.assertEqual(user.attributes["foo"], "bar") 150 self.assertTrue(user.is_active) 151 self.assertEqual(user.path, "goauthentik.io/sources/ldap/ak-test") 152 153 deactivated = User.objects.filter(username="deactivated.a").first() 154 self.assertIsNotNone(deactivated) 155 self.assertFalse(deactivated.is_active)
Test user sync
157 def test_sync_ad_legacy(self): 158 """Test user sync""" 159 self.source.base_dn = "dc=t,dc=goauthentik,dc=io" 160 self.source.additional_user_dn = "" 161 self.source.additional_group_dn = "" 162 self.source.save() 163 self.source.user_property_mappings.set( 164 LDAPSourcePropertyMapping.objects.filter( 165 Q(managed__startswith="goauthentik.io/sources/ldap/default") 166 | Q(managed__startswith="goauthentik.io/sources/ldap/ms") 167 ) 168 ) 169 self.source.group_property_mappings.set( 170 LDAPSourcePropertyMapping.objects.filter( 171 managed="goauthentik.io/sources/ldap/default-name" 172 ) 173 ) 174 connection = MagicMock(return_value=mock_ad_connection()) 175 176 # Create the user beforehand so we can set attributes and check they aren't removed 177 user = User.objects.create( 178 username="erin.h", 179 attributes={ 180 "ldap_uniq": "S-1-5-21-1955698215-2946288202-2760262721-1114", 181 "foo": "bar", 182 }, 183 ) 184 group = Group.objects.create( 185 name="Administrators", attributes={"ldap_uniq": "S-1-5-32-544", "foo": "bar"} 186 ) 187 188 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 189 user_sync = UserLDAPSynchronizer(self.source, Task()) 190 user_sync.sync_full() 191 group_sync = GroupLDAPSynchronizer(self.source, Task()) 192 group_sync.sync_full() 193 194 user.refresh_from_db() 195 group.refresh_from_db() 196 197 self.assertEqual(user.name, "Erin M. Hagens") 198 self.assertEqual(user.attributes["foo"], "bar") 199 self.assertTrue(user.is_active) 200 self.assertEqual(user.path, "goauthentik.io/sources/ldap/ak-test") 201 self.assertTrue( 202 UserLDAPSourceConnection.objects.filter( 203 source=self.source, 204 user=user, 205 identifier="S-1-5-21-1955698215-2946288202-2760262721-1114", 206 ).exists() 207 ) 208 209 deactivated = User.objects.filter(username="deactivated.a").first() 210 self.assertIsNotNone(deactivated) 211 self.assertFalse(deactivated.is_active) 212 213 self.assertEqual(group.name, "Administrators") 214 self.assertTrue( 215 GroupLDAPSourceConnection.objects.filter( 216 source=self.source, group=group, identifier="S-1-5-32-544" 217 ).exists() 218 ) 219 self.assertEqual(group.attributes["foo"], "bar")
Test user sync
221 def test_sync_users_openldap(self): 222 """Test user sync""" 223 self.source.object_uniqueness_field = "uid" 224 self.source.user_property_mappings.set( 225 LDAPSourcePropertyMapping.objects.filter( 226 Q(managed__startswith="goauthentik.io/sources/ldap/default") 227 | Q(managed__startswith="goauthentik.io/sources/ldap/openldap") 228 ) 229 ) 230 connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD)) 231 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 232 user_sync = UserLDAPSynchronizer(self.source, Task()) 233 user_sync.sync_full() 234 self.assertTrue(User.objects.filter(username="user0_sn").exists()) 235 self.assertFalse(User.objects.filter(username="user1_sn").exists())
Test user sync
237 def test_sync_users_freeipa_ish(self): 238 """Test user sync (FreeIPA-ish), mainly testing vendor quirks""" 239 self.source.object_uniqueness_field = "uid" 240 self.source.user_property_mappings.set( 241 LDAPSourcePropertyMapping.objects.filter( 242 Q(managed__startswith="goauthentik.io/sources/ldap/default") 243 | Q(managed__startswith="goauthentik.io/sources/ldap/openldap") 244 ) 245 ) 246 connection = MagicMock(return_value=mock_freeipa_connection(LDAP_PASSWORD)) 247 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 248 user_sync = UserLDAPSynchronizer(self.source, Task()) 249 user_sync.sync_full() 250 self.assertTrue(User.objects.filter(username="user0_sn").exists()) 251 self.assertFalse(User.objects.filter(username="user1_sn").exists()) 252 self.assertFalse(User.objects.get(username="user-nsaccountlock").is_active)
Test user sync (FreeIPA-ish), mainly testing vendor quirks
254 def test_sync_groups_freeipa_memberOf(self): 255 """Test group sync when membership is derived from memberOf user attribute""" 256 self.source.object_uniqueness_field = "uid" 257 self.source.group_object_filter = "(objectClass=groupOfNames)" 258 self.source.lookup_groups_from_user = True 259 self.source.group_membership_field = "memberOf" 260 self.source.user_property_mappings.set( 261 LDAPSourcePropertyMapping.objects.filter( 262 Q(managed__startswith="goauthentik.io/sources/ldap/default") 263 | Q(managed__startswith="goauthentik.io/sources/ldap/openldap") 264 ) 265 ) 266 self.source.group_property_mappings.set( 267 LDAPSourcePropertyMapping.objects.filter( 268 managed="goauthentik.io/sources/ldap/openldap-cn" 269 ) 270 ) 271 connection = MagicMock(return_value=mock_freeipa_connection(LDAP_PASSWORD)) 272 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 273 user_sync = UserLDAPSynchronizer(self.source, Task()) 274 user_sync.sync_full() 275 group_sync = GroupLDAPSynchronizer(self.source, Task()) 276 group_sync.sync_full() 277 membership_sync = MembershipLDAPSynchronizer(self.source, Task()) 278 membership_sync.sync_full() 279 280 self.assertTrue( 281 User.objects.filter(username="user4_sn").exists(), "User does not exist" 282 ) 283 # Test if membership mapping based on memberOf works. 284 memberof_group = Group.objects.filter(name="reverse-lookup-group") 285 self.assertTrue(memberof_group.exists(), "Group does not exist") 286 self.assertTrue( 287 memberof_group.first().users.filter(username="user4_sn").exists(), 288 "User not a member of the group", 289 )
Test group sync when membership is derived from memberOf user attribute
291 def test_sync_groups_ad(self): 292 """Test group sync""" 293 self.source.base_dn = "dc=t,dc=goauthentik,dc=io" 294 self.source.additional_user_dn = "" 295 self.source.additional_group_dn = "" 296 self.source.save() 297 self.source.user_property_mappings.set( 298 LDAPSourcePropertyMapping.objects.filter( 299 Q(managed__startswith="goauthentik.io/sources/ldap/default") 300 | Q(managed__startswith="goauthentik.io/sources/ldap/ms") 301 ) 302 ) 303 self.source.group_property_mappings.set( 304 LDAPSourcePropertyMapping.objects.filter( 305 managed="goauthentik.io/sources/ldap/default-name" 306 ) 307 ) 308 connection = MagicMock(return_value=mock_ad_connection()) 309 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 310 _user = create_test_admin_user() 311 parent_group = Group.objects.get(name=_user.username) 312 self.source.sync_parent_group = parent_group 313 self.source.save() 314 group_sync = GroupLDAPSynchronizer(self.source, Task()) 315 group_sync.sync_full() 316 membership_sync = MembershipLDAPSynchronizer(self.source, Task()) 317 membership_sync.sync_full() 318 group: Group = Group.objects.filter(name="Test Group").first() 319 self.assertIsNotNone(group) 320 self.assertEqual(group.parents.first(), parent_group)
Test group sync
322 def test_sync_groups_openldap(self): 323 """Test group sync""" 324 self.source.object_uniqueness_field = "uid" 325 self.source.group_object_filter = "(objectClass=groupOfNames)" 326 self.source.user_property_mappings.set( 327 LDAPSourcePropertyMapping.objects.filter( 328 Q(managed__startswith="goauthentik.io/sources/ldap/default") 329 | Q(managed__startswith="goauthentik.io/sources/ldap/openldap") 330 ) 331 ) 332 self.source.group_property_mappings.set( 333 LDAPSourcePropertyMapping.objects.filter( 334 managed="goauthentik.io/sources/ldap/openldap-cn" 335 ) 336 ) 337 connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD)) 338 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 339 self.source.save() 340 group_sync = GroupLDAPSynchronizer(self.source, Task()) 341 group_sync.sync_full() 342 membership_sync = MembershipLDAPSynchronizer(self.source, Task()) 343 membership_sync.sync_full() 344 group = Group.objects.filter(name="group1") 345 self.assertTrue(group.exists())
Test group sync
347 def test_sync_groups_openldap_posix_group(self): 348 """Test posix group sync""" 349 self.source.object_uniqueness_field = "cn" 350 self.source.group_membership_field = "memberUid" 351 self.source.user_object_filter = "(objectClass=posixAccount)" 352 self.source.group_object_filter = "(objectClass=posixGroup)" 353 self.source.user_membership_attribute = "uid" 354 self.source.user_property_mappings.set( 355 [ 356 *LDAPSourcePropertyMapping.objects.filter( 357 Q(managed__startswith="goauthentik.io/sources/ldap/default") 358 | Q(managed__startswith="goauthentik.io/sources/ldap/openldap") 359 ).all(), 360 LDAPSourcePropertyMapping.objects.create( 361 name="name", 362 expression='return {"attributes": {"uid": list_flatten(ldap.get("uid"))}}', 363 ), 364 ] 365 ) 366 self.source.group_property_mappings.set( 367 LDAPSourcePropertyMapping.objects.filter( 368 managed="goauthentik.io/sources/ldap/openldap-cn" 369 ) 370 ) 371 connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD)) 372 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 373 self.source.save() 374 user_sync = UserLDAPSynchronizer(self.source, Task()) 375 user_sync.sync_full() 376 group_sync = GroupLDAPSynchronizer(self.source, Task()) 377 group_sync.sync_full() 378 membership_sync = MembershipLDAPSynchronizer(self.source, Task()) 379 membership_sync.sync_full() 380 # Test if membership mapping based on memberUid works. 381 posix_group = Group.objects.filter(name="group-posix").first() 382 self.assertTrue(posix_group.users.filter(name="user-posix").exists())
Test posix group sync
384 def test_sync_groups_openldap_posix_group_nonstandard_membership_attribute(self): 385 """Test posix group sync""" 386 self.source.object_uniqueness_field = "cn" 387 self.source.group_membership_field = "memberUid" 388 self.source.user_object_filter = "(objectClass=posixAccount)" 389 self.source.group_object_filter = "(objectClass=posixGroup)" 390 self.source.user_membership_attribute = "cn" 391 self.source.user_property_mappings.set( 392 [ 393 *LDAPSourcePropertyMapping.objects.filter( 394 Q(managed__startswith="goauthentik.io/sources/ldap/default") 395 | Q(managed__startswith="goauthentik.io/sources/ldap/openldap") 396 ).all(), 397 LDAPSourcePropertyMapping.objects.create( 398 name="name", 399 expression='return {"attributes": {"cn": list_flatten(ldap.get("cn"))}}', 400 ), 401 ] 402 ) 403 self.source.group_property_mappings.set( 404 LDAPSourcePropertyMapping.objects.filter( 405 managed="goauthentik.io/sources/ldap/openldap-cn" 406 ) 407 ) 408 connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD)) 409 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 410 self.source.save() 411 user_sync = UserLDAPSynchronizer(self.source, Task()) 412 user_sync.sync_full() 413 group_sync = GroupLDAPSynchronizer(self.source, Task()) 414 group_sync.sync_full() 415 membership_sync = MembershipLDAPSynchronizer(self.source, Task()) 416 membership_sync.sync_full() 417 # Test if membership mapping based on memberUid works. 418 posix_group = Group.objects.filter(name="group-posix").first() 419 self.assertTrue(posix_group.users.filter(name="user-posix").exists())
Test posix group sync
421 def test_tasks_ad(self): 422 """Test Scheduled tasks""" 423 self.source.user_property_mappings.set( 424 LDAPSourcePropertyMapping.objects.filter( 425 Q(managed__startswith="goauthentik.io/sources/ldap/default") 426 | Q(managed__startswith="goauthentik.io/sources/ldap/ms") 427 ) 428 ) 429 self.source.save() 430 connection = MagicMock(return_value=mock_ad_connection()) 431 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 432 ldap_sync.send(self.source.pk)
Test Scheduled tasks
434 def test_tasks_openldap(self): 435 """Test Scheduled tasks""" 436 self.source.object_uniqueness_field = "uid" 437 self.source.group_object_filter = "(objectClass=groupOfNames)" 438 self.source.user_property_mappings.set( 439 LDAPSourcePropertyMapping.objects.filter( 440 Q(managed__startswith="goauthentik.io/sources/ldap/default") 441 | Q(managed__startswith="goauthentik.io/sources/ldap/openldap") 442 ) 443 ) 444 self.source.save() 445 connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD)) 446 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 447 ldap_sync.send(self.source.pk)
Test Scheduled tasks
449 def test_user_deletion(self): 450 """Test user deletion""" 451 user = User.objects.create_user(username="not-in-the-source") 452 UserLDAPSourceConnection.objects.create( 453 user=user, source=self.source, identifier="not-in-the-source" 454 ) 455 self.source.object_uniqueness_field = "uid" 456 self.source.group_object_filter = "(objectClass=groupOfNames)" 457 self.source.delete_not_found_objects = True 458 self.source.save() 459 460 connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD)) 461 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 462 ldap_sync.send(self.source.pk) 463 self.assertFalse(User.objects.filter(username="not-in-the-source").exists())
Test user deletion
465 def test_user_deletion_still_in_source(self): 466 """Test that user is not deleted if it's still in the source""" 467 username = user_in_slapd_cn 468 identifier = user_in_slapd_uid 469 user = User.objects.create_user(username=username) 470 UserLDAPSourceConnection.objects.create( 471 user=user, source=self.source, identifier=identifier 472 ) 473 self.source.object_uniqueness_field = "uid" 474 self.source.group_object_filter = "(objectClass=groupOfNames)" 475 self.source.delete_not_found_objects = True 476 self.source.save() 477 478 connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD)) 479 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 480 ldap_sync.send(self.source.pk) 481 self.assertTrue(User.objects.filter(username=username).exists())
Test that user is not deleted if it's still in the source
483 def test_user_deletion_no_sync(self): 484 """Test that user is not deleted if sync_users is False""" 485 user = User.objects.create_user(username="not-in-the-source") 486 UserLDAPSourceConnection.objects.create( 487 user=user, source=self.source, identifier="not-in-the-source" 488 ) 489 self.source.object_uniqueness_field = "uid" 490 self.source.group_object_filter = "(objectClass=groupOfNames)" 491 self.source.delete_not_found_objects = True 492 self.source.sync_users = False 493 self.source.save() 494 495 connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD)) 496 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 497 ldap_sync.send(self.source.pk) 498 self.assertTrue(User.objects.filter(username="not-in-the-source").exists())
Test that user is not deleted if sync_users is False
500 def test_user_deletion_no_delete(self): 501 """Test that user is not deleted if delete_not_found_objects is False""" 502 user = User.objects.create_user(username="not-in-the-source") 503 UserLDAPSourceConnection.objects.create( 504 user=user, source=self.source, identifier="not-in-the-source" 505 ) 506 self.source.object_uniqueness_field = "uid" 507 self.source.group_object_filter = "(objectClass=groupOfNames)" 508 self.source.save() 509 510 connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD)) 511 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 512 ldap_sync.send(self.source.pk) 513 self.assertTrue(User.objects.filter(username="not-in-the-source").exists())
Test that user is not deleted if delete_not_found_objects is False
515 def test_group_deletion(self): 516 """Test group deletion""" 517 group = Group.objects.create(name="not-in-the-source") 518 GroupLDAPSourceConnection.objects.create( 519 group=group, source=self.source, identifier="not-in-the-source" 520 ) 521 self.source.object_uniqueness_field = "uid" 522 self.source.group_object_filter = "(objectClass=groupOfNames)" 523 self.source.delete_not_found_objects = True 524 self.source.save() 525 526 connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD)) 527 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 528 ldap_sync.send(self.source.pk) 529 self.assertFalse(Group.objects.filter(name="not-in-the-source").exists())
Test group deletion
531 def test_group_deletion_still_in_source(self): 532 """Test that group is not deleted if it's still in the source""" 533 groupname = group_in_slapd_cn 534 identifier = group_in_slapd_uid 535 group = Group.objects.create(name=groupname) 536 GroupLDAPSourceConnection.objects.create( 537 group=group, source=self.source, identifier=identifier 538 ) 539 self.source.object_uniqueness_field = "uid" 540 self.source.group_object_filter = "(objectClass=groupOfNames)" 541 self.source.delete_not_found_objects = True 542 self.source.save() 543 544 connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD)) 545 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 546 ldap_sync.send(self.source.pk) 547 self.assertTrue(Group.objects.filter(name=groupname).exists())
Test that group is not deleted if it's still in the source
549 def test_group_deletion_no_sync(self): 550 """Test that group is not deleted if sync_groups is False""" 551 group = Group.objects.create(name="not-in-the-source") 552 GroupLDAPSourceConnection.objects.create( 553 group=group, source=self.source, identifier="not-in-the-source" 554 ) 555 self.source.object_uniqueness_field = "uid" 556 self.source.group_object_filter = "(objectClass=groupOfNames)" 557 self.source.delete_not_found_objects = True 558 self.source.sync_groups = False 559 self.source.save() 560 561 connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD)) 562 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 563 ldap_sync.send(self.source.pk) 564 self.assertTrue(Group.objects.filter(name="not-in-the-source").exists())
Test that group is not deleted if sync_groups is False
566 def test_group_deletion_no_delete(self): 567 """Test that group is not deleted if delete_not_found_objects is False""" 568 group = Group.objects.create(name="not-in-the-source") 569 GroupLDAPSourceConnection.objects.create( 570 group=group, source=self.source, identifier="not-in-the-source" 571 ) 572 self.source.object_uniqueness_field = "uid" 573 self.source.group_object_filter = "(objectClass=groupOfNames)" 574 self.source.save() 575 576 connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD)) 577 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 578 ldap_sync.send(self.source.pk) 579 self.assertTrue(Group.objects.filter(name="not-in-the-source").exists())
Test that group is not deleted if delete_not_found_objects is False
581 def test_batch_deletion(self): 582 """Test batch deletion""" 583 BATCH_SIZE = DELETE_CHUNK_SIZE + 1 584 for i in range(BATCH_SIZE): 585 user = User.objects.create_user(username=f"not-in-the-source-{i}") 586 group = Group.objects.create(name=f"not-in-the-source-{i}") 587 group.users.add(user) 588 UserLDAPSourceConnection.objects.create( 589 user=user, source=self.source, identifier=f"not-in-the-source-{i}-user" 590 ) 591 GroupLDAPSourceConnection.objects.create( 592 group=group, source=self.source, identifier=f"not-in-the-source-{i}-group" 593 ) 594 self.source.object_uniqueness_field = "uid" 595 self.source.group_object_filter = "(objectClass=groupOfNames)" 596 self.source.delete_not_found_objects = True 597 self.source.save() 598 599 connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD)) 600 with patch("authentik.sources.ldap.models.LDAPSource.connection", connection): 601 ldap_sync.send(self.source.pk) 602 603 self.assertFalse(User.objects.filter(username__startswith="not-in-the-source").exists()) 604 self.assertFalse(Group.objects.filter(name__startswith="not-in-the-source").exists())
Test batch deletion
606 def test_membership_sync_special_chars_in_group_dn(self): 607 """Test membership synchronization with special characters in group DN""" 608 self.source.object_uniqueness_field = "uid" 609 self.source.group_object_filter = "(objectClass=groupOfNames)" 610 self.source.lookup_groups_from_user = True 611 self.source.group_membership_field = "memberOf" 612 613 # Mock connection with group DN containing special characters 614 mock_conn = MagicMock() 615 616 # Simulate group with special characters in DN: parentheses, backslashes, asterisks 617 special_group_dn = "cn=test(group),ou=groups,dc=example,dc=com" 618 backslash_group_dn = "cn=test\\group,ou=groups,dc=example,dc=com" 619 asterisk_group_dn = "cn=test*group,ou=groups,dc=example,dc=com" 620 621 # Mock the paged_search method that would be called with the filter 622 mock_standard = MagicMock() 623 mock_conn.extend.standard = mock_standard 624 625 # Test case 1: Group DN with parentheses 626 with patch("authentik.sources.ldap.models.LDAPSource.connection", return_value=mock_conn): 627 membership_sync = MembershipLDAPSynchronizer(self.source, Task()) 628 629 # Simulate group data with special characters in DN 630 page_data = [{"dn": special_group_dn}] 631 632 # This should not raise LDAPInvalidFilterError anymore 633 try: 634 membership_sync.sync(page_data) 635 # Verify that the filter was properly escaped 636 # The call should have been made with escaped characters 637 mock_standard.paged_search.assert_called() 638 call_args = mock_standard.paged_search.call_args 639 search_filter = call_args[1]["search_filter"] 640 # The parentheses should be escaped as \28 and \29 641 self.assertIn("\\28", search_filter) # Escaped ( 642 self.assertIn("\\29", search_filter) # Escaped ) 643 except LDAPInvalidFilterError: 644 self.fail("LDAPInvalidFilterError should not be raised with escaped filter") 645 646 # Test case 2: Group DN with backslashes 647 with patch("authentik.sources.ldap.models.LDAPSource.connection", return_value=mock_conn): 648 membership_sync = MembershipLDAPSynchronizer(self.source, Task()) 649 page_data = [{"dn": backslash_group_dn}] 650 651 try: 652 membership_sync.sync(page_data) 653 call_args = mock_standard.paged_search.call_args 654 search_filter = call_args[1]["search_filter"] 655 # The backslash should be escaped as \5c 656 self.assertIn("\\5c", search_filter) # Escaped \ 657 except LDAPInvalidFilterError: 658 self.fail("LDAPInvalidFilterError should not be raised with escaped filter") 659 660 # Test case 3: Group DN with asterisks 661 with patch("authentik.sources.ldap.models.LDAPSource.connection", return_value=mock_conn): 662 membership_sync = MembershipLDAPSynchronizer(self.source, Task()) 663 page_data = [{"dn": asterisk_group_dn}] 664 665 try: 666 membership_sync.sync(page_data) 667 call_args = mock_standard.paged_search.call_args 668 search_filter = call_args[1]["search_filter"] 669 # The asterisk should be escaped as \2a 670 self.assertIn("\\2a", search_filter) # Escaped * 671 except LDAPInvalidFilterError: 672 self.fail("LDAPInvalidFilterError should not be raised with escaped filter")
Test membership synchronization with special characters in group DN
674 def test_escape_filter_chars_function(self): 675 """Test the escape_filter_chars function directly""" 676 677 # Test various special characters that need escaping 678 test_cases = [ 679 ("test(group)", "test\\28group\\29"), # parentheses 680 ("test\\group", "test\\5cgroup"), # backslash 681 ("test*group", "test\\2agroup"), # asterisk 682 ("test(*)group", "test\\28\\2a\\29group"), # multiple special chars 683 ("normalgroup", "normalgroup"), # no special chars 684 ("", ""), # empty string 685 ] 686 687 for input_str, expected in test_cases: 688 with self.subTest(input_str=input_str): 689 result = escape_filter_chars(input_str) 690 self.assertEqual(result, expected)
Test the escape_filter_chars function directly