authentik.core.tests.test_users
user tests
1"""user tests""" 2 3from unittest.mock import patch 4 5from django.contrib.auth.hashers import make_password 6from django.test.testcases import TestCase 7from rest_framework.exceptions import ValidationError 8 9from authentik.blueprints.v1.importer import SERIALIZER_CONTEXT_BLUEPRINT 10from authentik.core.api.users import UserSerializer 11from authentik.core.models import User 12from authentik.core.signals import password_changed, password_hash_changed 13from authentik.events.models import Event 14from authentik.lib.generators import generate_id 15 16 17class TestUsers(TestCase): 18 """Test user""" 19 20 def test_user_managed_role(self): 21 """Test user managed role""" 22 perm = "authentik_core.view_user" 23 user = User.objects.create(username=generate_id()) 24 user.assign_perms_to_managed_role(perm) 25 self.assertEqual(user.roles.count(), 1) 26 self.assertTrue(user.has_perm(perm)) 27 user.remove_perms_from_managed_role(perm) 28 self.assertFalse(user.has_perm(perm)) 29 30 def test_user_ak_groups(self): 31 """Test user.ak_groups is a proxy for user.groups""" 32 user = User.objects.create(username=generate_id()) 33 self.assertEqual(user.ak_groups, user.groups) 34 35 def test_user_ak_groups_event(self): 36 """Test user.ak_groups creates exactly one event""" 37 user = User.objects.create(username=generate_id()) 38 self.assertEqual(Event.objects.count(), 0) 39 user.ak_groups.all() 40 self.assertEqual(Event.objects.count(), 1) 41 user.ak_groups.all() 42 self.assertEqual(Event.objects.count(), 1) 43 44 def test_set_password_from_hash_signal_skips_source_sync_receivers(self): 45 """Test hash password updates do not expose a raw password to sync receivers.""" 46 user = User.objects.create( 47 username=generate_id(), 48 attributes={"distinguishedName": "cn=test,ou=users,dc=example,dc=com"}, 49 ) 50 password_changed_captured = [] 51 password_hash_changed_captured = [] 52 dispatch_uid = generate_id() 53 hash_dispatch_uid = generate_id() 54 55 def password_changed_receiver(sender, **kwargs): 56 password_changed_captured.append(kwargs) 57 58 def password_hash_changed_receiver(sender, **kwargs): 59 password_hash_changed_captured.append(kwargs) 60 61 password_changed.connect(password_changed_receiver, dispatch_uid=dispatch_uid) 62 password_hash_changed.connect( 63 password_hash_changed_receiver, dispatch_uid=hash_dispatch_uid 64 ) 65 try: 66 with ( 67 patch( 68 "authentik.sources.ldap.signals.LDAPSource.objects.filter" 69 ) as ldap_sources_filter, 70 patch( 71 "authentik.sources.kerberos.signals." 72 "UserKerberosSourceConnection.objects.select_related" 73 ) as kerberos_connections_select, 74 ): 75 user.set_password_from_hash(make_password("new-password")) # nosec 76 user.save() 77 finally: 78 password_changed.disconnect(dispatch_uid=dispatch_uid) 79 password_hash_changed.disconnect(dispatch_uid=hash_dispatch_uid) 80 81 self.assertEqual(password_changed_captured, []) 82 self.assertEqual(len(password_hash_changed_captured), 1) 83 ldap_sources_filter.assert_not_called() 84 kerberos_connections_select.assert_not_called() 85 86 87class TestUserSerializerPasswordHash(TestCase): 88 """Test UserSerializer password_hash support in blueprint context.""" 89 90 def test_password_hash_sets_password_directly(self): 91 """Test a valid password hash is stored without re-hashing.""" 92 password = "test-password-123" # nosec 93 password_hash = make_password(password) 94 serializer = UserSerializer( 95 data={ 96 "username": generate_id(), 97 "name": "Test User", 98 "password_hash": password_hash, 99 }, 100 context={SERIALIZER_CONTEXT_BLUEPRINT: True}, 101 ) 102 103 self.assertTrue(serializer.is_valid(), serializer.errors) 104 user = serializer.save() 105 106 self.assertEqual(user.password, password_hash) 107 self.assertTrue(user.check_password(password)) 108 self.assertIsNotNone(user.password_change_date) 109 110 def test_password_hash_rejects_invalid_format(self): 111 """Test invalid password hash values are rejected.""" 112 serializer = UserSerializer( 113 data={ 114 "username": generate_id(), 115 "name": "Test User", 116 "password_hash": "not-a-valid-hash", 117 }, 118 context={SERIALIZER_CONTEXT_BLUEPRINT: True}, 119 ) 120 121 self.assertTrue(serializer.is_valid(), serializer.errors) 122 with self.assertRaises(ValidationError) as ctx: 123 serializer.save() 124 125 self.assertIn("Invalid password hash format", str(ctx.exception)) 126 127 def test_password_hash_ignored_outside_blueprint_context(self): 128 """Test password_hash is not accepted by the regular serializer.""" 129 serializer = UserSerializer( 130 data={ 131 "username": generate_id(), 132 "name": "Test User", 133 "password_hash": make_password("test"), # nosec 134 } 135 ) 136 137 self.assertTrue(serializer.is_valid(), serializer.errors) 138 self.assertNotIn("password_hash", serializer.validated_data)
class
TestUsers(django.test.testcases.TestCase):
18class TestUsers(TestCase): 19 """Test user""" 20 21 def test_user_managed_role(self): 22 """Test user managed role""" 23 perm = "authentik_core.view_user" 24 user = User.objects.create(username=generate_id()) 25 user.assign_perms_to_managed_role(perm) 26 self.assertEqual(user.roles.count(), 1) 27 self.assertTrue(user.has_perm(perm)) 28 user.remove_perms_from_managed_role(perm) 29 self.assertFalse(user.has_perm(perm)) 30 31 def test_user_ak_groups(self): 32 """Test user.ak_groups is a proxy for user.groups""" 33 user = User.objects.create(username=generate_id()) 34 self.assertEqual(user.ak_groups, user.groups) 35 36 def test_user_ak_groups_event(self): 37 """Test user.ak_groups creates exactly one event""" 38 user = User.objects.create(username=generate_id()) 39 self.assertEqual(Event.objects.count(), 0) 40 user.ak_groups.all() 41 self.assertEqual(Event.objects.count(), 1) 42 user.ak_groups.all() 43 self.assertEqual(Event.objects.count(), 1) 44 45 def test_set_password_from_hash_signal_skips_source_sync_receivers(self): 46 """Test hash password updates do not expose a raw password to sync receivers.""" 47 user = User.objects.create( 48 username=generate_id(), 49 attributes={"distinguishedName": "cn=test,ou=users,dc=example,dc=com"}, 50 ) 51 password_changed_captured = [] 52 password_hash_changed_captured = [] 53 dispatch_uid = generate_id() 54 hash_dispatch_uid = generate_id() 55 56 def password_changed_receiver(sender, **kwargs): 57 password_changed_captured.append(kwargs) 58 59 def password_hash_changed_receiver(sender, **kwargs): 60 password_hash_changed_captured.append(kwargs) 61 62 password_changed.connect(password_changed_receiver, dispatch_uid=dispatch_uid) 63 password_hash_changed.connect( 64 password_hash_changed_receiver, dispatch_uid=hash_dispatch_uid 65 ) 66 try: 67 with ( 68 patch( 69 "authentik.sources.ldap.signals.LDAPSource.objects.filter" 70 ) as ldap_sources_filter, 71 patch( 72 "authentik.sources.kerberos.signals." 73 "UserKerberosSourceConnection.objects.select_related" 74 ) as kerberos_connections_select, 75 ): 76 user.set_password_from_hash(make_password("new-password")) # nosec 77 user.save() 78 finally: 79 password_changed.disconnect(dispatch_uid=dispatch_uid) 80 password_hash_changed.disconnect(dispatch_uid=hash_dispatch_uid) 81 82 self.assertEqual(password_changed_captured, []) 83 self.assertEqual(len(password_hash_changed_captured), 1) 84 ldap_sources_filter.assert_not_called() 85 kerberos_connections_select.assert_not_called()
Test user
def
test_user_managed_role(self):
21 def test_user_managed_role(self): 22 """Test user managed role""" 23 perm = "authentik_core.view_user" 24 user = User.objects.create(username=generate_id()) 25 user.assign_perms_to_managed_role(perm) 26 self.assertEqual(user.roles.count(), 1) 27 self.assertTrue(user.has_perm(perm)) 28 user.remove_perms_from_managed_role(perm) 29 self.assertFalse(user.has_perm(perm))
Test user managed role
def
test_user_ak_groups(self):
31 def test_user_ak_groups(self): 32 """Test user.ak_groups is a proxy for user.groups""" 33 user = User.objects.create(username=generate_id()) 34 self.assertEqual(user.ak_groups, user.groups)
Test user.ak_groups is a proxy for user.groups
def
test_user_ak_groups_event(self):
36 def test_user_ak_groups_event(self): 37 """Test user.ak_groups creates exactly one event""" 38 user = User.objects.create(username=generate_id()) 39 self.assertEqual(Event.objects.count(), 0) 40 user.ak_groups.all() 41 self.assertEqual(Event.objects.count(), 1) 42 user.ak_groups.all() 43 self.assertEqual(Event.objects.count(), 1)
Test user.ak_groups creates exactly one event
def
test_set_password_from_hash_signal_skips_source_sync_receivers(self):
45 def test_set_password_from_hash_signal_skips_source_sync_receivers(self): 46 """Test hash password updates do not expose a raw password to sync receivers.""" 47 user = User.objects.create( 48 username=generate_id(), 49 attributes={"distinguishedName": "cn=test,ou=users,dc=example,dc=com"}, 50 ) 51 password_changed_captured = [] 52 password_hash_changed_captured = [] 53 dispatch_uid = generate_id() 54 hash_dispatch_uid = generate_id() 55 56 def password_changed_receiver(sender, **kwargs): 57 password_changed_captured.append(kwargs) 58 59 def password_hash_changed_receiver(sender, **kwargs): 60 password_hash_changed_captured.append(kwargs) 61 62 password_changed.connect(password_changed_receiver, dispatch_uid=dispatch_uid) 63 password_hash_changed.connect( 64 password_hash_changed_receiver, dispatch_uid=hash_dispatch_uid 65 ) 66 try: 67 with ( 68 patch( 69 "authentik.sources.ldap.signals.LDAPSource.objects.filter" 70 ) as ldap_sources_filter, 71 patch( 72 "authentik.sources.kerberos.signals." 73 "UserKerberosSourceConnection.objects.select_related" 74 ) as kerberos_connections_select, 75 ): 76 user.set_password_from_hash(make_password("new-password")) # nosec 77 user.save() 78 finally: 79 password_changed.disconnect(dispatch_uid=dispatch_uid) 80 password_hash_changed.disconnect(dispatch_uid=hash_dispatch_uid) 81 82 self.assertEqual(password_changed_captured, []) 83 self.assertEqual(len(password_hash_changed_captured), 1) 84 ldap_sources_filter.assert_not_called() 85 kerberos_connections_select.assert_not_called()
Test hash password updates do not expose a raw password to sync receivers.
class
TestUserSerializerPasswordHash(django.test.testcases.TestCase):
88class TestUserSerializerPasswordHash(TestCase): 89 """Test UserSerializer password_hash support in blueprint context.""" 90 91 def test_password_hash_sets_password_directly(self): 92 """Test a valid password hash is stored without re-hashing.""" 93 password = "test-password-123" # nosec 94 password_hash = make_password(password) 95 serializer = UserSerializer( 96 data={ 97 "username": generate_id(), 98 "name": "Test User", 99 "password_hash": password_hash, 100 }, 101 context={SERIALIZER_CONTEXT_BLUEPRINT: True}, 102 ) 103 104 self.assertTrue(serializer.is_valid(), serializer.errors) 105 user = serializer.save() 106 107 self.assertEqual(user.password, password_hash) 108 self.assertTrue(user.check_password(password)) 109 self.assertIsNotNone(user.password_change_date) 110 111 def test_password_hash_rejects_invalid_format(self): 112 """Test invalid password hash values are rejected.""" 113 serializer = UserSerializer( 114 data={ 115 "username": generate_id(), 116 "name": "Test User", 117 "password_hash": "not-a-valid-hash", 118 }, 119 context={SERIALIZER_CONTEXT_BLUEPRINT: True}, 120 ) 121 122 self.assertTrue(serializer.is_valid(), serializer.errors) 123 with self.assertRaises(ValidationError) as ctx: 124 serializer.save() 125 126 self.assertIn("Invalid password hash format", str(ctx.exception)) 127 128 def test_password_hash_ignored_outside_blueprint_context(self): 129 """Test password_hash is not accepted by the regular serializer.""" 130 serializer = UserSerializer( 131 data={ 132 "username": generate_id(), 133 "name": "Test User", 134 "password_hash": make_password("test"), # nosec 135 } 136 ) 137 138 self.assertTrue(serializer.is_valid(), serializer.errors) 139 self.assertNotIn("password_hash", serializer.validated_data)
Test UserSerializer password_hash support in blueprint context.
def
test_password_hash_sets_password_directly(self):
91 def test_password_hash_sets_password_directly(self): 92 """Test a valid password hash is stored without re-hashing.""" 93 password = "test-password-123" # nosec 94 password_hash = make_password(password) 95 serializer = UserSerializer( 96 data={ 97 "username": generate_id(), 98 "name": "Test User", 99 "password_hash": password_hash, 100 }, 101 context={SERIALIZER_CONTEXT_BLUEPRINT: True}, 102 ) 103 104 self.assertTrue(serializer.is_valid(), serializer.errors) 105 user = serializer.save() 106 107 self.assertEqual(user.password, password_hash) 108 self.assertTrue(user.check_password(password)) 109 self.assertIsNotNone(user.password_change_date)
Test a valid password hash is stored without re-hashing.
def
test_password_hash_rejects_invalid_format(self):
111 def test_password_hash_rejects_invalid_format(self): 112 """Test invalid password hash values are rejected.""" 113 serializer = UserSerializer( 114 data={ 115 "username": generate_id(), 116 "name": "Test User", 117 "password_hash": "not-a-valid-hash", 118 }, 119 context={SERIALIZER_CONTEXT_BLUEPRINT: True}, 120 ) 121 122 self.assertTrue(serializer.is_valid(), serializer.errors) 123 with self.assertRaises(ValidationError) as ctx: 124 serializer.save() 125 126 self.assertIn("Invalid password hash format", str(ctx.exception))
Test invalid password hash values are rejected.
def
test_password_hash_ignored_outside_blueprint_context(self):
128 def test_password_hash_ignored_outside_blueprint_context(self): 129 """Test password_hash is not accepted by the regular serializer.""" 130 serializer = UserSerializer( 131 data={ 132 "username": generate_id(), 133 "name": "Test User", 134 "password_hash": make_password("test"), # nosec 135 } 136 ) 137 138 self.assertTrue(serializer.is_valid(), serializer.errors) 139 self.assertNotIn("password_hash", serializer.validated_data)
Test password_hash is not accepted by the regular serializer.