authentik.sources.kerberos.tests.test_sync

Kerberos Source sync tests

 1"""Kerberos Source sync tests"""
 2
 3from authentik.blueprints.tests import apply_blueprint
 4from authentik.core.models import User
 5from authentik.lib.generators import generate_id
 6from authentik.sources.kerberos.models import KerberosSource, KerberosSourcePropertyMapping
 7from authentik.sources.kerberos.sync import KerberosSync
 8from authentik.sources.kerberos.tasks import kerberos_sync
 9from authentik.sources.kerberos.tests.utils import KerberosTestCase
10from authentik.tasks.models import Task
11
12
13class TestKerberosSync(KerberosTestCase):
14    """Kerberos Sync tests"""
15
16    @apply_blueprint("system/sources-kerberos.yaml")
17    def setUp(self):
18        self.source: KerberosSource = KerberosSource.objects.create(
19            name="kerberos",
20            slug="kerberos",
21            realm=self.realm.realm,
22            sync_users=True,
23            sync_users_password=True,
24            sync_principal=self.realm.admin_princ,
25            sync_password=self.realm.password("admin"),
26        )
27        self.source.user_property_mappings.set(
28            KerberosSourcePropertyMapping.objects.filter(
29                managed__startswith="goauthentik.io/sources/kerberos/user/default/"
30            )
31        )
32
33    def test_default_mappings(self):
34        """Test default mappings"""
35        KerberosSync(self.source, Task()).sync()
36
37        self.assertTrue(
38            User.objects.filter(username=self.realm.user_princ.rsplit("@", 1)[0]).exists()
39        )
40        self.assertFalse(
41            User.objects.filter(username=self.realm.nfs_princ.rsplit("@", 1)[0]).exists()
42        )
43
44    def test_sync_mapping(self):
45        """Test property mappings"""
46        noop = KerberosSourcePropertyMapping.objects.create(
47            name=generate_id(), expression="return {}"
48        )
49        email = KerberosSourcePropertyMapping.objects.create(
50            name=generate_id(), expression='return {"email": principal.lower()}'
51        )
52        dont_sync_service = KerberosSourcePropertyMapping.objects.create(
53            name=generate_id(),
54            expression='if "/" in principal:\n    return {"username": None}\nreturn {}',
55        )
56        self.source.user_property_mappings.set([noop, email, dont_sync_service])
57
58        KerberosSync(self.source, Task()).sync()
59
60        self.assertTrue(
61            User.objects.filter(username=self.realm.user_princ.rsplit("@", 1)[0]).exists()
62        )
63        self.assertEqual(
64            User.objects.get(username=self.realm.user_princ.rsplit("@", 1)[0]).email,
65            self.realm.user_princ.lower(),
66        )
67        self.assertFalse(
68            User.objects.filter(username=self.realm.nfs_princ.rsplit("@", 1)[0]).exists()
69        )
70
71    def test_tasks(self):
72        """Test Scheduled tasks"""
73        kerberos_sync.send(self.source.pk)
74        self.assertTrue(
75            User.objects.filter(username=self.realm.user_princ.rsplit("@", 1)[0]).exists()
76        )
class TestKerberosSync(authentik.sources.kerberos.tests.utils.KerberosTestCase):
14class TestKerberosSync(KerberosTestCase):
15    """Kerberos Sync tests"""
16
17    @apply_blueprint("system/sources-kerberos.yaml")
18    def setUp(self):
19        self.source: KerberosSource = KerberosSource.objects.create(
20            name="kerberos",
21            slug="kerberos",
22            realm=self.realm.realm,
23            sync_users=True,
24            sync_users_password=True,
25            sync_principal=self.realm.admin_princ,
26            sync_password=self.realm.password("admin"),
27        )
28        self.source.user_property_mappings.set(
29            KerberosSourcePropertyMapping.objects.filter(
30                managed__startswith="goauthentik.io/sources/kerberos/user/default/"
31            )
32        )
33
34    def test_default_mappings(self):
35        """Test default mappings"""
36        KerberosSync(self.source, Task()).sync()
37
38        self.assertTrue(
39            User.objects.filter(username=self.realm.user_princ.rsplit("@", 1)[0]).exists()
40        )
41        self.assertFalse(
42            User.objects.filter(username=self.realm.nfs_princ.rsplit("@", 1)[0]).exists()
43        )
44
45    def test_sync_mapping(self):
46        """Test property mappings"""
47        noop = KerberosSourcePropertyMapping.objects.create(
48            name=generate_id(), expression="return {}"
49        )
50        email = KerberosSourcePropertyMapping.objects.create(
51            name=generate_id(), expression='return {"email": principal.lower()}'
52        )
53        dont_sync_service = KerberosSourcePropertyMapping.objects.create(
54            name=generate_id(),
55            expression='if "/" in principal:\n    return {"username": None}\nreturn {}',
56        )
57        self.source.user_property_mappings.set([noop, email, dont_sync_service])
58
59        KerberosSync(self.source, Task()).sync()
60
61        self.assertTrue(
62            User.objects.filter(username=self.realm.user_princ.rsplit("@", 1)[0]).exists()
63        )
64        self.assertEqual(
65            User.objects.get(username=self.realm.user_princ.rsplit("@", 1)[0]).email,
66            self.realm.user_princ.lower(),
67        )
68        self.assertFalse(
69            User.objects.filter(username=self.realm.nfs_princ.rsplit("@", 1)[0]).exists()
70        )
71
72    def test_tasks(self):
73        """Test Scheduled tasks"""
74        kerberos_sync.send(self.source.pk)
75        self.assertTrue(
76            User.objects.filter(username=self.realm.user_princ.rsplit("@", 1)[0]).exists()
77        )

Kerberos Sync tests

@apply_blueprint('system/sources-kerberos.yaml')
def setUp(self):
17    @apply_blueprint("system/sources-kerberos.yaml")
18    def setUp(self):
19        self.source: KerberosSource = KerberosSource.objects.create(
20            name="kerberos",
21            slug="kerberos",
22            realm=self.realm.realm,
23            sync_users=True,
24            sync_users_password=True,
25            sync_principal=self.realm.admin_princ,
26            sync_password=self.realm.password("admin"),
27        )
28        self.source.user_property_mappings.set(
29            KerberosSourcePropertyMapping.objects.filter(
30                managed__startswith="goauthentik.io/sources/kerberos/user/default/"
31            )
32        )

Hook method for setting up the test fixture before exercising it.

def test_default_mappings(self):
34    def test_default_mappings(self):
35        """Test default mappings"""
36        KerberosSync(self.source, Task()).sync()
37
38        self.assertTrue(
39            User.objects.filter(username=self.realm.user_princ.rsplit("@", 1)[0]).exists()
40        )
41        self.assertFalse(
42            User.objects.filter(username=self.realm.nfs_princ.rsplit("@", 1)[0]).exists()
43        )

Test default mappings

def test_sync_mapping(self):
45    def test_sync_mapping(self):
46        """Test property mappings"""
47        noop = KerberosSourcePropertyMapping.objects.create(
48            name=generate_id(), expression="return {}"
49        )
50        email = KerberosSourcePropertyMapping.objects.create(
51            name=generate_id(), expression='return {"email": principal.lower()}'
52        )
53        dont_sync_service = KerberosSourcePropertyMapping.objects.create(
54            name=generate_id(),
55            expression='if "/" in principal:\n    return {"username": None}\nreturn {}',
56        )
57        self.source.user_property_mappings.set([noop, email, dont_sync_service])
58
59        KerberosSync(self.source, Task()).sync()
60
61        self.assertTrue(
62            User.objects.filter(username=self.realm.user_princ.rsplit("@", 1)[0]).exists()
63        )
64        self.assertEqual(
65            User.objects.get(username=self.realm.user_princ.rsplit("@", 1)[0]).email,
66            self.realm.user_princ.lower(),
67        )
68        self.assertFalse(
69            User.objects.filter(username=self.realm.nfs_princ.rsplit("@", 1)[0]).exists()
70        )

Test property mappings

def test_tasks(self):
72    def test_tasks(self):
73        """Test Scheduled tasks"""
74        kerberos_sync.send(self.source.pk)
75        self.assertTrue(
76            User.objects.filter(username=self.realm.user_princ.rsplit("@", 1)[0]).exists()
77        )

Test Scheduled tasks