authentik.core.tests.test_users

user tests

  1"""user tests"""
  2
  3from unittest.mock import patch
  4
  5from django.contrib.auth.hashers import make_password
  6from django.test.testcases import TestCase
  7from rest_framework.exceptions import ValidationError
  8
  9from authentik.blueprints.v1.importer import SERIALIZER_CONTEXT_BLUEPRINT
 10from authentik.core.api.users import UserSerializer
 11from authentik.core.models import User
 12from authentik.core.signals import password_changed, password_hash_changed
 13from authentik.events.models import Event
 14from authentik.lib.generators import generate_id
 15
 16
 17class TestUsers(TestCase):
 18    """Test user"""
 19
 20    def test_user_managed_role(self):
 21        """Test user managed role"""
 22        perm = "authentik_core.view_user"
 23        user = User.objects.create(username=generate_id())
 24        user.assign_perms_to_managed_role(perm)
 25        self.assertEqual(user.roles.count(), 1)
 26        self.assertTrue(user.has_perm(perm))
 27        user.remove_perms_from_managed_role(perm)
 28        self.assertFalse(user.has_perm(perm))
 29
 30    def test_user_ak_groups(self):
 31        """Test user.ak_groups is a proxy for user.groups"""
 32        user = User.objects.create(username=generate_id())
 33        self.assertEqual(user.ak_groups, user.groups)
 34
 35    def test_user_ak_groups_event(self):
 36        """Test user.ak_groups creates exactly one event"""
 37        user = User.objects.create(username=generate_id())
 38        self.assertEqual(Event.objects.count(), 0)
 39        user.ak_groups.all()
 40        self.assertEqual(Event.objects.count(), 1)
 41        user.ak_groups.all()
 42        self.assertEqual(Event.objects.count(), 1)
 43
 44    def test_set_password_from_hash_signal_skips_source_sync_receivers(self):
 45        """Test hash password updates do not expose a raw password to sync receivers."""
 46        user = User.objects.create(
 47            username=generate_id(),
 48            attributes={"distinguishedName": "cn=test,ou=users,dc=example,dc=com"},
 49        )
 50        password_changed_captured = []
 51        password_hash_changed_captured = []
 52        dispatch_uid = generate_id()
 53        hash_dispatch_uid = generate_id()
 54
 55        def password_changed_receiver(sender, **kwargs):
 56            password_changed_captured.append(kwargs)
 57
 58        def password_hash_changed_receiver(sender, **kwargs):
 59            password_hash_changed_captured.append(kwargs)
 60
 61        password_changed.connect(password_changed_receiver, dispatch_uid=dispatch_uid)
 62        password_hash_changed.connect(
 63            password_hash_changed_receiver, dispatch_uid=hash_dispatch_uid
 64        )
 65        try:
 66            with (
 67                patch(
 68                    "authentik.sources.ldap.signals.LDAPSource.objects.filter"
 69                ) as ldap_sources_filter,
 70                patch(
 71                    "authentik.sources.kerberos.signals."
 72                    "UserKerberosSourceConnection.objects.select_related"
 73                ) as kerberos_connections_select,
 74            ):
 75                user.set_password_from_hash(make_password("new-password"))  # nosec
 76                user.save()
 77        finally:
 78            password_changed.disconnect(dispatch_uid=dispatch_uid)
 79            password_hash_changed.disconnect(dispatch_uid=hash_dispatch_uid)
 80
 81        self.assertEqual(password_changed_captured, [])
 82        self.assertEqual(len(password_hash_changed_captured), 1)
 83        ldap_sources_filter.assert_not_called()
 84        kerberos_connections_select.assert_not_called()
 85
 86
 87class TestUserSerializerPasswordHash(TestCase):
 88    """Test UserSerializer password_hash support in blueprint context."""
 89
 90    def test_password_hash_sets_password_directly(self):
 91        """Test a valid password hash is stored without re-hashing."""
 92        password = "test-password-123"  # nosec
 93        password_hash = make_password(password)
 94        serializer = UserSerializer(
 95            data={
 96                "username": generate_id(),
 97                "name": "Test User",
 98                "password_hash": password_hash,
 99            },
100            context={SERIALIZER_CONTEXT_BLUEPRINT: True},
101        )
102
103        self.assertTrue(serializer.is_valid(), serializer.errors)
104        user = serializer.save()
105
106        self.assertEqual(user.password, password_hash)
107        self.assertTrue(user.check_password(password))
108        self.assertIsNotNone(user.password_change_date)
109
110    def test_password_hash_rejects_invalid_format(self):
111        """Test invalid password hash values are rejected."""
112        serializer = UserSerializer(
113            data={
114                "username": generate_id(),
115                "name": "Test User",
116                "password_hash": "not-a-valid-hash",
117            },
118            context={SERIALIZER_CONTEXT_BLUEPRINT: True},
119        )
120
121        self.assertTrue(serializer.is_valid(), serializer.errors)
122        with self.assertRaises(ValidationError) as ctx:
123            serializer.save()
124
125        self.assertIn("Invalid password hash format", str(ctx.exception))
126
127    def test_password_hash_ignored_outside_blueprint_context(self):
128        """Test password_hash is not accepted by the regular serializer."""
129        serializer = UserSerializer(
130            data={
131                "username": generate_id(),
132                "name": "Test User",
133                "password_hash": make_password("test"),  # nosec
134            }
135        )
136
137        self.assertTrue(serializer.is_valid(), serializer.errors)
138        self.assertNotIn("password_hash", serializer.validated_data)
class TestUsers(django.test.testcases.TestCase):
18class TestUsers(TestCase):
19    """Test user"""
20
21    def test_user_managed_role(self):
22        """Test user managed role"""
23        perm = "authentik_core.view_user"
24        user = User.objects.create(username=generate_id())
25        user.assign_perms_to_managed_role(perm)
26        self.assertEqual(user.roles.count(), 1)
27        self.assertTrue(user.has_perm(perm))
28        user.remove_perms_from_managed_role(perm)
29        self.assertFalse(user.has_perm(perm))
30
31    def test_user_ak_groups(self):
32        """Test user.ak_groups is a proxy for user.groups"""
33        user = User.objects.create(username=generate_id())
34        self.assertEqual(user.ak_groups, user.groups)
35
36    def test_user_ak_groups_event(self):
37        """Test user.ak_groups creates exactly one event"""
38        user = User.objects.create(username=generate_id())
39        self.assertEqual(Event.objects.count(), 0)
40        user.ak_groups.all()
41        self.assertEqual(Event.objects.count(), 1)
42        user.ak_groups.all()
43        self.assertEqual(Event.objects.count(), 1)
44
45    def test_set_password_from_hash_signal_skips_source_sync_receivers(self):
46        """Test hash password updates do not expose a raw password to sync receivers."""
47        user = User.objects.create(
48            username=generate_id(),
49            attributes={"distinguishedName": "cn=test,ou=users,dc=example,dc=com"},
50        )
51        password_changed_captured = []
52        password_hash_changed_captured = []
53        dispatch_uid = generate_id()
54        hash_dispatch_uid = generate_id()
55
56        def password_changed_receiver(sender, **kwargs):
57            password_changed_captured.append(kwargs)
58
59        def password_hash_changed_receiver(sender, **kwargs):
60            password_hash_changed_captured.append(kwargs)
61
62        password_changed.connect(password_changed_receiver, dispatch_uid=dispatch_uid)
63        password_hash_changed.connect(
64            password_hash_changed_receiver, dispatch_uid=hash_dispatch_uid
65        )
66        try:
67            with (
68                patch(
69                    "authentik.sources.ldap.signals.LDAPSource.objects.filter"
70                ) as ldap_sources_filter,
71                patch(
72                    "authentik.sources.kerberos.signals."
73                    "UserKerberosSourceConnection.objects.select_related"
74                ) as kerberos_connections_select,
75            ):
76                user.set_password_from_hash(make_password("new-password"))  # nosec
77                user.save()
78        finally:
79            password_changed.disconnect(dispatch_uid=dispatch_uid)
80            password_hash_changed.disconnect(dispatch_uid=hash_dispatch_uid)
81
82        self.assertEqual(password_changed_captured, [])
83        self.assertEqual(len(password_hash_changed_captured), 1)
84        ldap_sources_filter.assert_not_called()
85        kerberos_connections_select.assert_not_called()

Test user

def test_user_managed_role(self):
21    def test_user_managed_role(self):
22        """Test user managed role"""
23        perm = "authentik_core.view_user"
24        user = User.objects.create(username=generate_id())
25        user.assign_perms_to_managed_role(perm)
26        self.assertEqual(user.roles.count(), 1)
27        self.assertTrue(user.has_perm(perm))
28        user.remove_perms_from_managed_role(perm)
29        self.assertFalse(user.has_perm(perm))

Test user managed role

def test_user_ak_groups(self):
31    def test_user_ak_groups(self):
32        """Test user.ak_groups is a proxy for user.groups"""
33        user = User.objects.create(username=generate_id())
34        self.assertEqual(user.ak_groups, user.groups)

Test user.ak_groups is a proxy for user.groups

def test_user_ak_groups_event(self):
36    def test_user_ak_groups_event(self):
37        """Test user.ak_groups creates exactly one event"""
38        user = User.objects.create(username=generate_id())
39        self.assertEqual(Event.objects.count(), 0)
40        user.ak_groups.all()
41        self.assertEqual(Event.objects.count(), 1)
42        user.ak_groups.all()
43        self.assertEqual(Event.objects.count(), 1)

Test user.ak_groups creates exactly one event

def test_set_password_from_hash_signal_skips_source_sync_receivers(self):
45    def test_set_password_from_hash_signal_skips_source_sync_receivers(self):
46        """Test hash password updates do not expose a raw password to sync receivers."""
47        user = User.objects.create(
48            username=generate_id(),
49            attributes={"distinguishedName": "cn=test,ou=users,dc=example,dc=com"},
50        )
51        password_changed_captured = []
52        password_hash_changed_captured = []
53        dispatch_uid = generate_id()
54        hash_dispatch_uid = generate_id()
55
56        def password_changed_receiver(sender, **kwargs):
57            password_changed_captured.append(kwargs)
58
59        def password_hash_changed_receiver(sender, **kwargs):
60            password_hash_changed_captured.append(kwargs)
61
62        password_changed.connect(password_changed_receiver, dispatch_uid=dispatch_uid)
63        password_hash_changed.connect(
64            password_hash_changed_receiver, dispatch_uid=hash_dispatch_uid
65        )
66        try:
67            with (
68                patch(
69                    "authentik.sources.ldap.signals.LDAPSource.objects.filter"
70                ) as ldap_sources_filter,
71                patch(
72                    "authentik.sources.kerberos.signals."
73                    "UserKerberosSourceConnection.objects.select_related"
74                ) as kerberos_connections_select,
75            ):
76                user.set_password_from_hash(make_password("new-password"))  # nosec
77                user.save()
78        finally:
79            password_changed.disconnect(dispatch_uid=dispatch_uid)
80            password_hash_changed.disconnect(dispatch_uid=hash_dispatch_uid)
81
82        self.assertEqual(password_changed_captured, [])
83        self.assertEqual(len(password_hash_changed_captured), 1)
84        ldap_sources_filter.assert_not_called()
85        kerberos_connections_select.assert_not_called()

Test hash password updates do not expose a raw password to sync receivers.

class TestUserSerializerPasswordHash(django.test.testcases.TestCase):
 88class TestUserSerializerPasswordHash(TestCase):
 89    """Test UserSerializer password_hash support in blueprint context."""
 90
 91    def test_password_hash_sets_password_directly(self):
 92        """Test a valid password hash is stored without re-hashing."""
 93        password = "test-password-123"  # nosec
 94        password_hash = make_password(password)
 95        serializer = UserSerializer(
 96            data={
 97                "username": generate_id(),
 98                "name": "Test User",
 99                "password_hash": password_hash,
100            },
101            context={SERIALIZER_CONTEXT_BLUEPRINT: True},
102        )
103
104        self.assertTrue(serializer.is_valid(), serializer.errors)
105        user = serializer.save()
106
107        self.assertEqual(user.password, password_hash)
108        self.assertTrue(user.check_password(password))
109        self.assertIsNotNone(user.password_change_date)
110
111    def test_password_hash_rejects_invalid_format(self):
112        """Test invalid password hash values are rejected."""
113        serializer = UserSerializer(
114            data={
115                "username": generate_id(),
116                "name": "Test User",
117                "password_hash": "not-a-valid-hash",
118            },
119            context={SERIALIZER_CONTEXT_BLUEPRINT: True},
120        )
121
122        self.assertTrue(serializer.is_valid(), serializer.errors)
123        with self.assertRaises(ValidationError) as ctx:
124            serializer.save()
125
126        self.assertIn("Invalid password hash format", str(ctx.exception))
127
128    def test_password_hash_ignored_outside_blueprint_context(self):
129        """Test password_hash is not accepted by the regular serializer."""
130        serializer = UserSerializer(
131            data={
132                "username": generate_id(),
133                "name": "Test User",
134                "password_hash": make_password("test"),  # nosec
135            }
136        )
137
138        self.assertTrue(serializer.is_valid(), serializer.errors)
139        self.assertNotIn("password_hash", serializer.validated_data)

Test UserSerializer password_hash support in blueprint context.

def test_password_hash_sets_password_directly(self):
 91    def test_password_hash_sets_password_directly(self):
 92        """Test a valid password hash is stored without re-hashing."""
 93        password = "test-password-123"  # nosec
 94        password_hash = make_password(password)
 95        serializer = UserSerializer(
 96            data={
 97                "username": generate_id(),
 98                "name": "Test User",
 99                "password_hash": password_hash,
100            },
101            context={SERIALIZER_CONTEXT_BLUEPRINT: True},
102        )
103
104        self.assertTrue(serializer.is_valid(), serializer.errors)
105        user = serializer.save()
106
107        self.assertEqual(user.password, password_hash)
108        self.assertTrue(user.check_password(password))
109        self.assertIsNotNone(user.password_change_date)

Test a valid password hash is stored without re-hashing.

def test_password_hash_rejects_invalid_format(self):
111    def test_password_hash_rejects_invalid_format(self):
112        """Test invalid password hash values are rejected."""
113        serializer = UserSerializer(
114            data={
115                "username": generate_id(),
116                "name": "Test User",
117                "password_hash": "not-a-valid-hash",
118            },
119            context={SERIALIZER_CONTEXT_BLUEPRINT: True},
120        )
121
122        self.assertTrue(serializer.is_valid(), serializer.errors)
123        with self.assertRaises(ValidationError) as ctx:
124            serializer.save()
125
126        self.assertIn("Invalid password hash format", str(ctx.exception))

Test invalid password hash values are rejected.

def test_password_hash_ignored_outside_blueprint_context(self):
128    def test_password_hash_ignored_outside_blueprint_context(self):
129        """Test password_hash is not accepted by the regular serializer."""
130        serializer = UserSerializer(
131            data={
132                "username": generate_id(),
133                "name": "Test User",
134                "password_hash": make_password("test"),  # nosec
135            }
136        )
137
138        self.assertTrue(serializer.is_valid(), serializer.errors)
139        self.assertNotIn("password_hash", serializer.validated_data)

Test password_hash is not accepted by the regular serializer.