authentik.sources.ldap.tests.test_api

LDAP Source API tests

  1"""LDAP Source API tests"""
  2
  3from json import loads
  4from unittest.mock import MagicMock, patch
  5
  6from django.db.models import Q
  7from django.urls import reverse
  8from rest_framework.exceptions import ErrorDetail
  9from rest_framework.test import APITestCase
 10
 11from authentik.blueprints.tests import apply_blueprint
 12from authentik.core.tests.utils import create_test_admin_user
 13from authentik.lib.generators import generate_id
 14from authentik.sources.ldap.api import LDAPSourceSerializer
 15from authentik.sources.ldap.models import LDAPSource, LDAPSourcePropertyMapping
 16from authentik.sources.ldap.tests.mock_ad import mock_ad_connection
 17
 18
 19class LDAPAPITests(APITestCase):
 20    """LDAP API tests"""
 21
 22    def test_sync_users_password_valid(self):
 23        """Check that single source with sync_users_password is valid"""
 24        serializer = LDAPSourceSerializer(
 25            data={
 26                "name": "foo",
 27                "slug": " foo",
 28                "server_uri": "ldaps://1.2.3.4",
 29                "bind_cn": "",
 30                "bind_password": generate_id(),
 31                "base_dn": "dc=foo",
 32                "sync_users_password": True,
 33            }
 34        )
 35        self.assertTrue(serializer.is_valid())
 36        self.assertEqual(serializer.errors, {})
 37
 38    def test_sync_users_password_invalid(self):
 39        """Ensure only a single source with password sync can be created"""
 40        LDAPSource.objects.create(
 41            name="foo",
 42            slug=generate_id(),
 43            server_uri="ldaps://1.2.3.4",
 44            bind_cn="",
 45            bind_password=generate_id(),
 46            base_dn="dc=foo",
 47            sync_users_password=True,
 48        )
 49        serializer = LDAPSourceSerializer(
 50            data={
 51                "name": "foo",
 52                "slug": generate_id(),
 53                "server_uri": "ldaps://1.2.3.4",
 54                "bind_cn": "",
 55                "bind_password": generate_id(),
 56                "base_dn": "dc=foo",
 57                "sync_users_password": True,
 58            }
 59        )
 60        self.assertFalse(serializer.is_valid())
 61        self.assertEqual(
 62            serializer.errors,
 63            {
 64                "sync_users_password": [
 65                    ErrorDetail(
 66                        string="Only a single LDAP Source with password synchronization is allowed",
 67                        code="invalid",
 68                    )
 69                ]
 70            },
 71        )
 72
 73    def test_sync_users_mapping_empty(self):
 74        """Check that when sync_users is enabled, property mappings must be set"""
 75        serializer = LDAPSourceSerializer(
 76            data={
 77                "name": "foo",
 78                "slug": " foo",
 79                "server_uri": "ldaps://1.2.3.4",
 80                "bind_cn": "",
 81                "bind_password": generate_id(),
 82                "base_dn": "dc=foo",
 83                "sync_users": True,
 84                "user_property_mappings": [],
 85            }
 86        )
 87        self.assertFalse(serializer.is_valid())
 88
 89    def test_sync_groups_mapping_empty(self):
 90        """Check that when sync_groups is enabled, property mappings must be set"""
 91        serializer = LDAPSourceSerializer(
 92            data={
 93                "name": "foo",
 94                "slug": " foo",
 95                "server_uri": "ldaps://1.2.3.4",
 96                "bind_cn": "",
 97                "bind_password": generate_id(),
 98                "base_dn": "dc=foo",
 99                "sync_groups": True,
100                "group_property_mappings": [],
101            }
102        )
103        self.assertFalse(serializer.is_valid())
104
105    @apply_blueprint("system/sources-ldap.yaml")
106    def test_sync_debug(self):
107        user = create_test_admin_user()
108        self.client.force_login(user)
109
110        source: LDAPSource = LDAPSource.objects.create(
111            name=generate_id(),
112            slug=generate_id(),
113            base_dn="dc=goauthentik,dc=io",
114            additional_user_dn="ou=users",
115            additional_group_dn="ou=groups",
116        )
117        source.user_property_mappings.set(
118            LDAPSourcePropertyMapping.objects.filter(
119                Q(managed__startswith="goauthentik.io/sources/ldap/default")
120                | Q(managed__startswith="goauthentik.io/sources/ldap/ms")
121            )
122        )
123        connection = MagicMock(return_value=mock_ad_connection())
124        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
125            res = self.client.get(
126                reverse("authentik_api:ldapsource-debug", kwargs={"slug": source.slug})
127            )
128            self.assertEqual(res.status_code, 200)
129            body = loads(res.content.decode())
130            self.assertIn("users", body)
131            self.assertIn("groups", body)
132            self.assertIn("membership", body)
class LDAPAPITests(rest_framework.test.APITestCase):
 20class LDAPAPITests(APITestCase):
 21    """LDAP API tests"""
 22
 23    def test_sync_users_password_valid(self):
 24        """Check that single source with sync_users_password is valid"""
 25        serializer = LDAPSourceSerializer(
 26            data={
 27                "name": "foo",
 28                "slug": " foo",
 29                "server_uri": "ldaps://1.2.3.4",
 30                "bind_cn": "",
 31                "bind_password": generate_id(),
 32                "base_dn": "dc=foo",
 33                "sync_users_password": True,
 34            }
 35        )
 36        self.assertTrue(serializer.is_valid())
 37        self.assertEqual(serializer.errors, {})
 38
 39    def test_sync_users_password_invalid(self):
 40        """Ensure only a single source with password sync can be created"""
 41        LDAPSource.objects.create(
 42            name="foo",
 43            slug=generate_id(),
 44            server_uri="ldaps://1.2.3.4",
 45            bind_cn="",
 46            bind_password=generate_id(),
 47            base_dn="dc=foo",
 48            sync_users_password=True,
 49        )
 50        serializer = LDAPSourceSerializer(
 51            data={
 52                "name": "foo",
 53                "slug": generate_id(),
 54                "server_uri": "ldaps://1.2.3.4",
 55                "bind_cn": "",
 56                "bind_password": generate_id(),
 57                "base_dn": "dc=foo",
 58                "sync_users_password": True,
 59            }
 60        )
 61        self.assertFalse(serializer.is_valid())
 62        self.assertEqual(
 63            serializer.errors,
 64            {
 65                "sync_users_password": [
 66                    ErrorDetail(
 67                        string="Only a single LDAP Source with password synchronization is allowed",
 68                        code="invalid",
 69                    )
 70                ]
 71            },
 72        )
 73
 74    def test_sync_users_mapping_empty(self):
 75        """Check that when sync_users is enabled, property mappings must be set"""
 76        serializer = LDAPSourceSerializer(
 77            data={
 78                "name": "foo",
 79                "slug": " foo",
 80                "server_uri": "ldaps://1.2.3.4",
 81                "bind_cn": "",
 82                "bind_password": generate_id(),
 83                "base_dn": "dc=foo",
 84                "sync_users": True,
 85                "user_property_mappings": [],
 86            }
 87        )
 88        self.assertFalse(serializer.is_valid())
 89
 90    def test_sync_groups_mapping_empty(self):
 91        """Check that when sync_groups is enabled, property mappings must be set"""
 92        serializer = LDAPSourceSerializer(
 93            data={
 94                "name": "foo",
 95                "slug": " foo",
 96                "server_uri": "ldaps://1.2.3.4",
 97                "bind_cn": "",
 98                "bind_password": generate_id(),
 99                "base_dn": "dc=foo",
100                "sync_groups": True,
101                "group_property_mappings": [],
102            }
103        )
104        self.assertFalse(serializer.is_valid())
105
106    @apply_blueprint("system/sources-ldap.yaml")
107    def test_sync_debug(self):
108        user = create_test_admin_user()
109        self.client.force_login(user)
110
111        source: LDAPSource = LDAPSource.objects.create(
112            name=generate_id(),
113            slug=generate_id(),
114            base_dn="dc=goauthentik,dc=io",
115            additional_user_dn="ou=users",
116            additional_group_dn="ou=groups",
117        )
118        source.user_property_mappings.set(
119            LDAPSourcePropertyMapping.objects.filter(
120                Q(managed__startswith="goauthentik.io/sources/ldap/default")
121                | Q(managed__startswith="goauthentik.io/sources/ldap/ms")
122            )
123        )
124        connection = MagicMock(return_value=mock_ad_connection())
125        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
126            res = self.client.get(
127                reverse("authentik_api:ldapsource-debug", kwargs={"slug": source.slug})
128            )
129            self.assertEqual(res.status_code, 200)
130            body = loads(res.content.decode())
131            self.assertIn("users", body)
132            self.assertIn("groups", body)
133            self.assertIn("membership", body)

LDAP API tests

def test_sync_users_password_valid(self):
23    def test_sync_users_password_valid(self):
24        """Check that single source with sync_users_password is valid"""
25        serializer = LDAPSourceSerializer(
26            data={
27                "name": "foo",
28                "slug": " foo",
29                "server_uri": "ldaps://1.2.3.4",
30                "bind_cn": "",
31                "bind_password": generate_id(),
32                "base_dn": "dc=foo",
33                "sync_users_password": True,
34            }
35        )
36        self.assertTrue(serializer.is_valid())
37        self.assertEqual(serializer.errors, {})

Check that single source with sync_users_password is valid

def test_sync_users_password_invalid(self):
39    def test_sync_users_password_invalid(self):
40        """Ensure only a single source with password sync can be created"""
41        LDAPSource.objects.create(
42            name="foo",
43            slug=generate_id(),
44            server_uri="ldaps://1.2.3.4",
45            bind_cn="",
46            bind_password=generate_id(),
47            base_dn="dc=foo",
48            sync_users_password=True,
49        )
50        serializer = LDAPSourceSerializer(
51            data={
52                "name": "foo",
53                "slug": generate_id(),
54                "server_uri": "ldaps://1.2.3.4",
55                "bind_cn": "",
56                "bind_password": generate_id(),
57                "base_dn": "dc=foo",
58                "sync_users_password": True,
59            }
60        )
61        self.assertFalse(serializer.is_valid())
62        self.assertEqual(
63            serializer.errors,
64            {
65                "sync_users_password": [
66                    ErrorDetail(
67                        string="Only a single LDAP Source with password synchronization is allowed",
68                        code="invalid",
69                    )
70                ]
71            },
72        )

Ensure only a single source with password sync can be created

def test_sync_users_mapping_empty(self):
74    def test_sync_users_mapping_empty(self):
75        """Check that when sync_users is enabled, property mappings must be set"""
76        serializer = LDAPSourceSerializer(
77            data={
78                "name": "foo",
79                "slug": " foo",
80                "server_uri": "ldaps://1.2.3.4",
81                "bind_cn": "",
82                "bind_password": generate_id(),
83                "base_dn": "dc=foo",
84                "sync_users": True,
85                "user_property_mappings": [],
86            }
87        )
88        self.assertFalse(serializer.is_valid())

Check that when sync_users is enabled, property mappings must be set

def test_sync_groups_mapping_empty(self):
 90    def test_sync_groups_mapping_empty(self):
 91        """Check that when sync_groups is enabled, property mappings must be set"""
 92        serializer = LDAPSourceSerializer(
 93            data={
 94                "name": "foo",
 95                "slug": " foo",
 96                "server_uri": "ldaps://1.2.3.4",
 97                "bind_cn": "",
 98                "bind_password": generate_id(),
 99                "base_dn": "dc=foo",
100                "sync_groups": True,
101                "group_property_mappings": [],
102            }
103        )
104        self.assertFalse(serializer.is_valid())

Check that when sync_groups is enabled, property mappings must be set

@apply_blueprint('system/sources-ldap.yaml')
def test_sync_debug(self):
106    @apply_blueprint("system/sources-ldap.yaml")
107    def test_sync_debug(self):
108        user = create_test_admin_user()
109        self.client.force_login(user)
110
111        source: LDAPSource = LDAPSource.objects.create(
112            name=generate_id(),
113            slug=generate_id(),
114            base_dn="dc=goauthentik,dc=io",
115            additional_user_dn="ou=users",
116            additional_group_dn="ou=groups",
117        )
118        source.user_property_mappings.set(
119            LDAPSourcePropertyMapping.objects.filter(
120                Q(managed__startswith="goauthentik.io/sources/ldap/default")
121                | Q(managed__startswith="goauthentik.io/sources/ldap/ms")
122            )
123        )
124        connection = MagicMock(return_value=mock_ad_connection())
125        with patch("authentik.sources.ldap.models.LDAPSource.connection", connection):
126            res = self.client.get(
127                reverse("authentik_api:ldapsource-debug", kwargs={"slug": source.slug})
128            )
129            self.assertEqual(res.status_code, 200)
130            body = loads(res.content.decode())
131            self.assertIn("users", body)
132            self.assertIn("groups", body)
133            self.assertIn("membership", body)