authentik.sources.ldap.tests.test_auth

LDAP Source tests

  1"""LDAP Source tests"""
  2
  3from unittest.mock import MagicMock, Mock, patch
  4
  5from django.db.models import Q
  6from django.test import TestCase
  7
  8from authentik.blueprints.tests import apply_blueprint
  9from authentik.core.models import User
 10from authentik.lib.generators import generate_key
 11from authentik.sources.ldap.auth import LDAPBackend
 12from authentik.sources.ldap.models import LDAPSource, LDAPSourcePropertyMapping
 13from authentik.sources.ldap.sync.users import UserLDAPSynchronizer
 14from authentik.sources.ldap.tests.mock_ad import mock_ad_connection
 15from authentik.sources.ldap.tests.mock_slapd import mock_slapd_connection
 16from authentik.tasks.models import Task
 17
 18LDAP_PASSWORD = generate_key()
 19
 20
 21class LDAPSyncTests(TestCase):
 22    """LDAP Sync tests"""
 23
 24    @apply_blueprint("system/sources-ldap.yaml")
 25    def setUp(self):
 26        self.source = LDAPSource.objects.create(
 27            name="ldap",
 28            slug="ldap",
 29            base_dn="dc=goauthentik,dc=io",
 30            additional_user_dn="ou=users",
 31            additional_group_dn="ou=groups",
 32        )
 33
 34    def test_auth_direct_user_ad(self):
 35        """Test direct auth"""
 36        self.source.base_dn = "dc=t,dc=goauthentik,dc=io"
 37        self.source.additional_user_dn = ""
 38        self.source.additional_group_dn = ""
 39        self.source.save()
 40        self.source.user_property_mappings.set(
 41            LDAPSourcePropertyMapping.objects.filter(
 42                Q(managed__startswith="goauthentik.io/sources/ldap/default-")
 43                | Q(managed__startswith="goauthentik.io/sources/ldap/ms-")
 44            )
 45        )
 46        raw_conn = mock_ad_connection()
 47        bind_mock = Mock(wraps=raw_conn.bind)
 48        raw_conn.bind = bind_mock
 49        connection = MagicMock(return_value=raw_conn)
 50        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
 51            user_sync = UserLDAPSynchronizer(self.source, Task())
 52            user_sync.sync_full()
 53
 54            user = User.objects.get(username="erin.h")
 55            # auth_user_by_bind = Mock(return_value=user)
 56            backend = LDAPBackend()
 57            self.assertEqual(
 58                backend.authenticate(None, username="erin.h", password=LDAP_PASSWORD),
 59                user,
 60            )
 61            connection.assert_called_with(
 62                connection_kwargs={
 63                    "user": "CN=Erin M. Hagens,OU=ak-test,DC=t,DC=goauthentik,DC=io",
 64                    "password": LDAP_PASSWORD,
 65                }
 66            )
 67            bind_mock.assert_not_called()
 68
 69    def test_auth_synced_user_ad(self):
 70        """Test Cached auth"""
 71        self.source.base_dn = "dc=t,dc=goauthentik,dc=io"
 72        self.source.additional_user_dn = ""
 73        self.source.additional_group_dn = ""
 74        self.source.save()
 75        self.source.user_property_mappings.set(
 76            LDAPSourcePropertyMapping.objects.filter(
 77                Q(managed__startswith="goauthentik.io/sources/ldap/default-")
 78                | Q(managed__startswith="goauthentik.io/sources/ldap/ms-")
 79            )
 80        )
 81        connection = MagicMock(return_value=mock_ad_connection())
 82        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
 83            user_sync = UserLDAPSynchronizer(self.source, Task())
 84            user_sync.sync_full()
 85
 86            user = User.objects.get(username="erin.h")
 87            auth_user_by_bind = Mock(return_value=user)
 88            with patch(
 89                "authentik.sources.ldap.auth.LDAPBackend.auth_user_by_bind",
 90                auth_user_by_bind,
 91            ):
 92                backend = LDAPBackend()
 93                self.assertEqual(
 94                    backend.authenticate(None, username="erin.h", password=LDAP_PASSWORD),
 95                    user,
 96                )
 97
 98    def test_auth_synced_user_openldap(self):
 99        """Test Cached auth"""
100        self.source.object_uniqueness_field = "uid"
101        self.source.user_property_mappings.set(
102            LDAPSourcePropertyMapping.objects.filter(
103                Q(name__startswith="authentik default LDAP Mapping")
104                | Q(name__startswith="authentik default OpenLDAP Mapping")
105            )
106        )
107        self.source.save()
108        connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD))
109        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
110            user_sync = UserLDAPSynchronizer(self.source, Task())
111            user_sync.sync_full()
112
113            user = User.objects.get(username="user0_sn")
114            auth_user_by_bind = Mock(return_value=user)
115            with patch(
116                "authentik.sources.ldap.auth.LDAPBackend.auth_user_by_bind",
117                auth_user_by_bind,
118            ):
119                backend = LDAPBackend()
120                self.assertEqual(
121                    backend.authenticate(None, username="user0_sn", password=LDAP_PASSWORD),
122                    user,
123                )
LDAP_PASSWORD = '3C:F*$es;ZC<w-U@5(Hd,(x}1|XsaOnF!3SH6"Arb]5{f4F"Q>&uYz:o1N6v[ixHE\'FLQZ;}Ol1v]9(xOCJQ&yb7BLlvbWwNZ~\'KXDL^_}mJ/.,8nk#&JYrCGi{_Mwcb'
class LDAPSyncTests(django.test.testcases.TestCase):
 22class LDAPSyncTests(TestCase):
 23    """LDAP Sync tests"""
 24
 25    @apply_blueprint("system/sources-ldap.yaml")
 26    def setUp(self):
 27        self.source = LDAPSource.objects.create(
 28            name="ldap",
 29            slug="ldap",
 30            base_dn="dc=goauthentik,dc=io",
 31            additional_user_dn="ou=users",
 32            additional_group_dn="ou=groups",
 33        )
 34
 35    def test_auth_direct_user_ad(self):
 36        """Test direct auth"""
 37        self.source.base_dn = "dc=t,dc=goauthentik,dc=io"
 38        self.source.additional_user_dn = ""
 39        self.source.additional_group_dn = ""
 40        self.source.save()
 41        self.source.user_property_mappings.set(
 42            LDAPSourcePropertyMapping.objects.filter(
 43                Q(managed__startswith="goauthentik.io/sources/ldap/default-")
 44                | Q(managed__startswith="goauthentik.io/sources/ldap/ms-")
 45            )
 46        )
 47        raw_conn = mock_ad_connection()
 48        bind_mock = Mock(wraps=raw_conn.bind)
 49        raw_conn.bind = bind_mock
 50        connection = MagicMock(return_value=raw_conn)
 51        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
 52            user_sync = UserLDAPSynchronizer(self.source, Task())
 53            user_sync.sync_full()
 54
 55            user = User.objects.get(username="erin.h")
 56            # auth_user_by_bind = Mock(return_value=user)
 57            backend = LDAPBackend()
 58            self.assertEqual(
 59                backend.authenticate(None, username="erin.h", password=LDAP_PASSWORD),
 60                user,
 61            )
 62            connection.assert_called_with(
 63                connection_kwargs={
 64                    "user": "CN=Erin M. Hagens,OU=ak-test,DC=t,DC=goauthentik,DC=io",
 65                    "password": LDAP_PASSWORD,
 66                }
 67            )
 68            bind_mock.assert_not_called()
 69
 70    def test_auth_synced_user_ad(self):
 71        """Test Cached auth"""
 72        self.source.base_dn = "dc=t,dc=goauthentik,dc=io"
 73        self.source.additional_user_dn = ""
 74        self.source.additional_group_dn = ""
 75        self.source.save()
 76        self.source.user_property_mappings.set(
 77            LDAPSourcePropertyMapping.objects.filter(
 78                Q(managed__startswith="goauthentik.io/sources/ldap/default-")
 79                | Q(managed__startswith="goauthentik.io/sources/ldap/ms-")
 80            )
 81        )
 82        connection = MagicMock(return_value=mock_ad_connection())
 83        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
 84            user_sync = UserLDAPSynchronizer(self.source, Task())
 85            user_sync.sync_full()
 86
 87            user = User.objects.get(username="erin.h")
 88            auth_user_by_bind = Mock(return_value=user)
 89            with patch(
 90                "authentik.sources.ldap.auth.LDAPBackend.auth_user_by_bind",
 91                auth_user_by_bind,
 92            ):
 93                backend = LDAPBackend()
 94                self.assertEqual(
 95                    backend.authenticate(None, username="erin.h", password=LDAP_PASSWORD),
 96                    user,
 97                )
 98
 99    def test_auth_synced_user_openldap(self):
100        """Test Cached auth"""
101        self.source.object_uniqueness_field = "uid"
102        self.source.user_property_mappings.set(
103            LDAPSourcePropertyMapping.objects.filter(
104                Q(name__startswith="authentik default LDAP Mapping")
105                | Q(name__startswith="authentik default OpenLDAP Mapping")
106            )
107        )
108        self.source.save()
109        connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD))
110        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
111            user_sync = UserLDAPSynchronizer(self.source, Task())
112            user_sync.sync_full()
113
114            user = User.objects.get(username="user0_sn")
115            auth_user_by_bind = Mock(return_value=user)
116            with patch(
117                "authentik.sources.ldap.auth.LDAPBackend.auth_user_by_bind",
118                auth_user_by_bind,
119            ):
120                backend = LDAPBackend()
121                self.assertEqual(
122                    backend.authenticate(None, username="user0_sn", password=LDAP_PASSWORD),
123                    user,
124                )

LDAP Sync tests

@apply_blueprint('system/sources-ldap.yaml')
def setUp(self):
25    @apply_blueprint("system/sources-ldap.yaml")
26    def setUp(self):
27        self.source = LDAPSource.objects.create(
28            name="ldap",
29            slug="ldap",
30            base_dn="dc=goauthentik,dc=io",
31            additional_user_dn="ou=users",
32            additional_group_dn="ou=groups",
33        )

Hook method for setting up the test fixture before exercising it.

def test_auth_direct_user_ad(self):
35    def test_auth_direct_user_ad(self):
36        """Test direct auth"""
37        self.source.base_dn = "dc=t,dc=goauthentik,dc=io"
38        self.source.additional_user_dn = ""
39        self.source.additional_group_dn = ""
40        self.source.save()
41        self.source.user_property_mappings.set(
42            LDAPSourcePropertyMapping.objects.filter(
43                Q(managed__startswith="goauthentik.io/sources/ldap/default-")
44                | Q(managed__startswith="goauthentik.io/sources/ldap/ms-")
45            )
46        )
47        raw_conn = mock_ad_connection()
48        bind_mock = Mock(wraps=raw_conn.bind)
49        raw_conn.bind = bind_mock
50        connection = MagicMock(return_value=raw_conn)
51        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
52            user_sync = UserLDAPSynchronizer(self.source, Task())
53            user_sync.sync_full()
54
55            user = User.objects.get(username="erin.h")
56            # auth_user_by_bind = Mock(return_value=user)
57            backend = LDAPBackend()
58            self.assertEqual(
59                backend.authenticate(None, username="erin.h", password=LDAP_PASSWORD),
60                user,
61            )
62            connection.assert_called_with(
63                connection_kwargs={
64                    "user": "CN=Erin M. Hagens,OU=ak-test,DC=t,DC=goauthentik,DC=io",
65                    "password": LDAP_PASSWORD,
66                }
67            )
68            bind_mock.assert_not_called()

Test direct auth

def test_auth_synced_user_ad(self):
70    def test_auth_synced_user_ad(self):
71        """Test Cached auth"""
72        self.source.base_dn = "dc=t,dc=goauthentik,dc=io"
73        self.source.additional_user_dn = ""
74        self.source.additional_group_dn = ""
75        self.source.save()
76        self.source.user_property_mappings.set(
77            LDAPSourcePropertyMapping.objects.filter(
78                Q(managed__startswith="goauthentik.io/sources/ldap/default-")
79                | Q(managed__startswith="goauthentik.io/sources/ldap/ms-")
80            )
81        )
82        connection = MagicMock(return_value=mock_ad_connection())
83        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
84            user_sync = UserLDAPSynchronizer(self.source, Task())
85            user_sync.sync_full()
86
87            user = User.objects.get(username="erin.h")
88            auth_user_by_bind = Mock(return_value=user)
89            with patch(
90                "authentik.sources.ldap.auth.LDAPBackend.auth_user_by_bind",
91                auth_user_by_bind,
92            ):
93                backend = LDAPBackend()
94                self.assertEqual(
95                    backend.authenticate(None, username="erin.h", password=LDAP_PASSWORD),
96                    user,
97                )

Test Cached auth

def test_auth_synced_user_openldap(self):
 99    def test_auth_synced_user_openldap(self):
100        """Test Cached auth"""
101        self.source.object_uniqueness_field = "uid"
102        self.source.user_property_mappings.set(
103            LDAPSourcePropertyMapping.objects.filter(
104                Q(name__startswith="authentik default LDAP Mapping")
105                | Q(name__startswith="authentik default OpenLDAP Mapping")
106            )
107        )
108        self.source.save()
109        connection = MagicMock(return_value=mock_slapd_connection(LDAP_PASSWORD))
110        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
111            user_sync = UserLDAPSynchronizer(self.source, Task())
112            user_sync.sync_full()
113
114            user = User.objects.get(username="user0_sn")
115            auth_user_by_bind = Mock(return_value=user)
116            with patch(
117                "authentik.sources.ldap.auth.LDAPBackend.auth_user_by_bind",
118                auth_user_by_bind,
119            ):
120                backend = LDAPBackend()
121                self.assertEqual(
122                    backend.authenticate(None, username="user0_sn", password=LDAP_PASSWORD),
123                    user,
124                )

Test Cached auth