authentik.sources.kerberos.tests.test_sync
Kerberos Source sync tests
1"""Kerberos Source sync tests""" 2 3from authentik.blueprints.tests import apply_blueprint 4from authentik.core.models import User 5from authentik.lib.generators import generate_id 6from authentik.sources.kerberos.models import KerberosSource, KerberosSourcePropertyMapping 7from authentik.sources.kerberos.sync import KerberosSync 8from authentik.sources.kerberos.tasks import kerberos_sync 9from authentik.sources.kerberos.tests.utils import KerberosTestCase 10from authentik.tasks.models import Task 11 12 13class TestKerberosSync(KerberosTestCase): 14 """Kerberos Sync tests""" 15 16 @apply_blueprint("system/sources-kerberos.yaml") 17 def setUp(self): 18 self.source: KerberosSource = KerberosSource.objects.create( 19 name="kerberos", 20 slug="kerberos", 21 realm=self.realm.realm, 22 sync_users=True, 23 sync_users_password=True, 24 sync_principal=self.realm.admin_princ, 25 sync_password=self.realm.password("admin"), 26 ) 27 self.source.user_property_mappings.set( 28 KerberosSourcePropertyMapping.objects.filter( 29 managed__startswith="goauthentik.io/sources/kerberos/user/default/" 30 ) 31 ) 32 33 def test_default_mappings(self): 34 """Test default mappings""" 35 KerberosSync(self.source, Task()).sync() 36 37 self.assertTrue( 38 User.objects.filter(username=self.realm.user_princ.rsplit("@", 1)[0]).exists() 39 ) 40 self.assertFalse( 41 User.objects.filter(username=self.realm.nfs_princ.rsplit("@", 1)[0]).exists() 42 ) 43 44 def test_sync_mapping(self): 45 """Test property mappings""" 46 noop = KerberosSourcePropertyMapping.objects.create( 47 name=generate_id(), expression="return {}" 48 ) 49 email = KerberosSourcePropertyMapping.objects.create( 50 name=generate_id(), expression='return {"email": principal.lower()}' 51 ) 52 dont_sync_service = KerberosSourcePropertyMapping.objects.create( 53 name=generate_id(), 54 expression='if "/" in principal:\n return {"username": None}\nreturn {}', 55 ) 56 self.source.user_property_mappings.set([noop, email, dont_sync_service]) 57 58 KerberosSync(self.source, Task()).sync() 59 60 self.assertTrue( 61 User.objects.filter(username=self.realm.user_princ.rsplit("@", 1)[0]).exists() 62 ) 63 self.assertEqual( 64 User.objects.get(username=self.realm.user_princ.rsplit("@", 1)[0]).email, 65 self.realm.user_princ.lower(), 66 ) 67 self.assertFalse( 68 User.objects.filter(username=self.realm.nfs_princ.rsplit("@", 1)[0]).exists() 69 ) 70 71 def test_tasks(self): 72 """Test Scheduled tasks""" 73 kerberos_sync.send(self.source.pk) 74 self.assertTrue( 75 User.objects.filter(username=self.realm.user_princ.rsplit("@", 1)[0]).exists() 76 )
14class TestKerberosSync(KerberosTestCase): 15 """Kerberos Sync tests""" 16 17 @apply_blueprint("system/sources-kerberos.yaml") 18 def setUp(self): 19 self.source: KerberosSource = KerberosSource.objects.create( 20 name="kerberos", 21 slug="kerberos", 22 realm=self.realm.realm, 23 sync_users=True, 24 sync_users_password=True, 25 sync_principal=self.realm.admin_princ, 26 sync_password=self.realm.password("admin"), 27 ) 28 self.source.user_property_mappings.set( 29 KerberosSourcePropertyMapping.objects.filter( 30 managed__startswith="goauthentik.io/sources/kerberos/user/default/" 31 ) 32 ) 33 34 def test_default_mappings(self): 35 """Test default mappings""" 36 KerberosSync(self.source, Task()).sync() 37 38 self.assertTrue( 39 User.objects.filter(username=self.realm.user_princ.rsplit("@", 1)[0]).exists() 40 ) 41 self.assertFalse( 42 User.objects.filter(username=self.realm.nfs_princ.rsplit("@", 1)[0]).exists() 43 ) 44 45 def test_sync_mapping(self): 46 """Test property mappings""" 47 noop = KerberosSourcePropertyMapping.objects.create( 48 name=generate_id(), expression="return {}" 49 ) 50 email = KerberosSourcePropertyMapping.objects.create( 51 name=generate_id(), expression='return {"email": principal.lower()}' 52 ) 53 dont_sync_service = KerberosSourcePropertyMapping.objects.create( 54 name=generate_id(), 55 expression='if "/" in principal:\n return {"username": None}\nreturn {}', 56 ) 57 self.source.user_property_mappings.set([noop, email, dont_sync_service]) 58 59 KerberosSync(self.source, Task()).sync() 60 61 self.assertTrue( 62 User.objects.filter(username=self.realm.user_princ.rsplit("@", 1)[0]).exists() 63 ) 64 self.assertEqual( 65 User.objects.get(username=self.realm.user_princ.rsplit("@", 1)[0]).email, 66 self.realm.user_princ.lower(), 67 ) 68 self.assertFalse( 69 User.objects.filter(username=self.realm.nfs_princ.rsplit("@", 1)[0]).exists() 70 ) 71 72 def test_tasks(self): 73 """Test Scheduled tasks""" 74 kerberos_sync.send(self.source.pk) 75 self.assertTrue( 76 User.objects.filter(username=self.realm.user_princ.rsplit("@", 1)[0]).exists() 77 )
Kerberos Sync tests
@apply_blueprint('system/sources-kerberos.yaml')
def
setUp(self):
17 @apply_blueprint("system/sources-kerberos.yaml") 18 def setUp(self): 19 self.source: KerberosSource = KerberosSource.objects.create( 20 name="kerberos", 21 slug="kerberos", 22 realm=self.realm.realm, 23 sync_users=True, 24 sync_users_password=True, 25 sync_principal=self.realm.admin_princ, 26 sync_password=self.realm.password("admin"), 27 ) 28 self.source.user_property_mappings.set( 29 KerberosSourcePropertyMapping.objects.filter( 30 managed__startswith="goauthentik.io/sources/kerberos/user/default/" 31 ) 32 )
Hook method for setting up the test fixture before exercising it.
def
test_default_mappings(self):
34 def test_default_mappings(self): 35 """Test default mappings""" 36 KerberosSync(self.source, Task()).sync() 37 38 self.assertTrue( 39 User.objects.filter(username=self.realm.user_princ.rsplit("@", 1)[0]).exists() 40 ) 41 self.assertFalse( 42 User.objects.filter(username=self.realm.nfs_princ.rsplit("@", 1)[0]).exists() 43 )
Test default mappings
def
test_sync_mapping(self):
45 def test_sync_mapping(self): 46 """Test property mappings""" 47 noop = KerberosSourcePropertyMapping.objects.create( 48 name=generate_id(), expression="return {}" 49 ) 50 email = KerberosSourcePropertyMapping.objects.create( 51 name=generate_id(), expression='return {"email": principal.lower()}' 52 ) 53 dont_sync_service = KerberosSourcePropertyMapping.objects.create( 54 name=generate_id(), 55 expression='if "/" in principal:\n return {"username": None}\nreturn {}', 56 ) 57 self.source.user_property_mappings.set([noop, email, dont_sync_service]) 58 59 KerberosSync(self.source, Task()).sync() 60 61 self.assertTrue( 62 User.objects.filter(username=self.realm.user_princ.rsplit("@", 1)[0]).exists() 63 ) 64 self.assertEqual( 65 User.objects.get(username=self.realm.user_princ.rsplit("@", 1)[0]).email, 66 self.realm.user_princ.lower(), 67 ) 68 self.assertFalse( 69 User.objects.filter(username=self.realm.nfs_princ.rsplit("@", 1)[0]).exists() 70 )
Test property mappings
def
test_tasks(self):
72 def test_tasks(self): 73 """Test Scheduled tasks""" 74 kerberos_sync.send(self.source.pk) 75 self.assertTrue( 76 User.objects.filter(username=self.realm.user_princ.rsplit("@", 1)[0]).exists() 77 )
Test Scheduled tasks