authentik.stages.password.tests

password tests

  1"""password tests"""
  2
  3from unittest.mock import MagicMock, patch
  4
  5from django.core.exceptions import PermissionDenied
  6from django.urls import reverse
  7
  8from authentik.core.tests.utils import create_test_admin_user, create_test_brand, create_test_flow
  9from authentik.flows.markers import StageMarker
 10from authentik.flows.models import FlowDesignation, FlowStageBinding
 11from authentik.flows.planner import PLAN_CONTEXT_PENDING_USER, FlowPlan
 12from authentik.flows.tests import FlowTestCase
 13from authentik.flows.tests.test_executor import TO_STAGE_RESPONSE_MOCK
 14from authentik.flows.views.executor import SESSION_KEY_PLAN
 15from authentik.lib.generators import generate_id
 16from authentik.stages.password import BACKEND_INBUILT
 17from authentik.stages.password.models import PasswordStage
 18
 19MOCK_BACKEND_AUTHENTICATE = MagicMock(side_effect=PermissionDenied("test"))
 20
 21
 22class TestPasswordStage(FlowTestCase):
 23    """Password tests"""
 24
 25    def setUp(self):
 26        super().setUp()
 27        self.user = create_test_admin_user()
 28
 29        self.flow = create_test_flow(FlowDesignation.AUTHENTICATION)
 30        self.stage = PasswordStage.objects.create(name=generate_id(), backends=[BACKEND_INBUILT])
 31        self.binding = FlowStageBinding.objects.create(target=self.flow, stage=self.stage, order=2)
 32
 33    @patch(
 34        "authentik.flows.views.executor.to_stage_response",
 35        TO_STAGE_RESPONSE_MOCK,
 36    )
 37    def test_without_user(self):
 38        """Test without user"""
 39        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
 40        session = self.client.session
 41        session[SESSION_KEY_PLAN] = plan
 42        session.save()
 43
 44        response = self.client.post(
 45            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
 46            # Still have to send the password so the form is valid
 47            {"password": self.user.username},
 48        )
 49
 50        self.assertStageResponse(
 51            response,
 52            self.flow,
 53            component="ak-stage-access-denied",
 54            error_message="Unknown error",
 55        )
 56
 57    def test_recovery_flow_link(self):
 58        """Test link to the default recovery flow"""
 59        flow = create_test_flow(designation=FlowDesignation.RECOVERY)
 60        brand = create_test_brand()
 61        brand.flow_recovery = flow
 62        brand.save()
 63
 64        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
 65        session = self.client.session
 66        session[SESSION_KEY_PLAN] = plan
 67        session.save()
 68
 69        response = self.client.get(
 70            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
 71        )
 72        self.assertEqual(response.status_code, 200)
 73        self.assertIn(flow.slug, response.content.decode())
 74
 75    def test_valid_password(self):
 76        """Test with a valid pending user and valid password"""
 77        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
 78        plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
 79        session = self.client.session
 80        session[SESSION_KEY_PLAN] = plan
 81        session.save()
 82
 83        response = self.client.post(
 84            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
 85            # Form data
 86            {"password": self.user.username},
 87        )
 88
 89        self.assertEqual(response.status_code, 200)
 90        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
 91
 92    def test_valid_password_inactive(self):
 93        """Test with a valid pending user and valid password"""
 94        self.user.is_active = False
 95        self.user.save()
 96        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
 97        plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
 98        session = self.client.session
 99        session[SESSION_KEY_PLAN] = plan
100        session.save()
101
102        response = self.client.post(
103            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
104            # Form data
105            {"password": self.user.username},
106        )
107
108        self.assertEqual(response.status_code, 200)
109        self.assertStageResponse(
110            response,
111            self.flow,
112            response_errors={"password": [{"string": "Invalid password", "code": "invalid"}]},
113        )
114
115    def test_invalid_password(self):
116        """Test with a valid pending user and invalid password"""
117        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
118        plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
119        session = self.client.session
120        session[SESSION_KEY_PLAN] = plan
121        session.save()
122
123        response = self.client.post(
124            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
125            # Form data
126            {"password": self.user.username + "test"},
127        )
128        self.assertEqual(response.status_code, 200)
129
130    def test_invalid_password_lockout(self):
131        """Test with a valid pending user and invalid password (trigger logout counter)"""
132        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
133        plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
134        session = self.client.session
135        session[SESSION_KEY_PLAN] = plan
136        session.save()
137
138        res = self.client.get(
139            reverse(
140                "authentik_api:flow-executor",
141                kwargs={"flow_slug": self.flow.slug},
142            ),
143        )
144        self.assertEqual(res.status_code, 200)
145        for _ in range(self.stage.failed_attempts_before_cancel - 1):
146            response = self.client.post(
147                reverse(
148                    "authentik_api:flow-executor",
149                    kwargs={"flow_slug": self.flow.slug},
150                ),
151                # Form data
152                {"password": self.user.username + "test"},
153            )
154            self.assertEqual(response.status_code, 200)
155            self.assertStageResponse(
156                response,
157                flow=self.flow,
158                response_errors={"password": [{"string": "Invalid password", "code": "invalid"}]},
159            )
160
161        response = self.client.post(
162            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
163            # Form data
164            {"password": self.user.username + "test"},
165        )
166        self.assertEqual(response.status_code, 200)
167        # To ensure the plan has been cancelled, check SESSION_KEY_PLAN
168        self.assertNotIn(SESSION_KEY_PLAN, self.client.session)
169        self.assertStageResponse(response, flow=self.flow, error_message="Invalid password")
170
171    @patch(
172        "authentik.flows.views.executor.to_stage_response",
173        TO_STAGE_RESPONSE_MOCK,
174    )
175    @patch(
176        "authentik.core.auth.InbuiltBackend.authenticate",
177        MOCK_BACKEND_AUTHENTICATE,
178    )
179    def test_permission_denied(self):
180        """Test with a valid pending user and valid password.
181        Backend is patched to return PermissionError"""
182        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
183        plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
184        session = self.client.session
185        session[SESSION_KEY_PLAN] = plan
186        session.save()
187
188        response = self.client.post(
189            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
190            # Form data
191            {"password": self.user.username + "test"},
192        )
193
194        self.assertStageResponse(
195            response,
196            self.flow,
197            component="ak-stage-access-denied",
198            error_message="Unknown error",
199        )
MOCK_BACKEND_AUTHENTICATE = <MagicMock id='139707749764464'>
class TestPasswordStage(authentik.flows.tests.FlowTestCase):
 23class TestPasswordStage(FlowTestCase):
 24    """Password tests"""
 25
 26    def setUp(self):
 27        super().setUp()
 28        self.user = create_test_admin_user()
 29
 30        self.flow = create_test_flow(FlowDesignation.AUTHENTICATION)
 31        self.stage = PasswordStage.objects.create(name=generate_id(), backends=[BACKEND_INBUILT])
 32        self.binding = FlowStageBinding.objects.create(target=self.flow, stage=self.stage, order=2)
 33
 34    @patch(
 35        "authentik.flows.views.executor.to_stage_response",
 36        TO_STAGE_RESPONSE_MOCK,
 37    )
 38    def test_without_user(self):
 39        """Test without user"""
 40        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
 41        session = self.client.session
 42        session[SESSION_KEY_PLAN] = plan
 43        session.save()
 44
 45        response = self.client.post(
 46            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
 47            # Still have to send the password so the form is valid
 48            {"password": self.user.username},
 49        )
 50
 51        self.assertStageResponse(
 52            response,
 53            self.flow,
 54            component="ak-stage-access-denied",
 55            error_message="Unknown error",
 56        )
 57
 58    def test_recovery_flow_link(self):
 59        """Test link to the default recovery flow"""
 60        flow = create_test_flow(designation=FlowDesignation.RECOVERY)
 61        brand = create_test_brand()
 62        brand.flow_recovery = flow
 63        brand.save()
 64
 65        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
 66        session = self.client.session
 67        session[SESSION_KEY_PLAN] = plan
 68        session.save()
 69
 70        response = self.client.get(
 71            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
 72        )
 73        self.assertEqual(response.status_code, 200)
 74        self.assertIn(flow.slug, response.content.decode())
 75
 76    def test_valid_password(self):
 77        """Test with a valid pending user and valid password"""
 78        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
 79        plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
 80        session = self.client.session
 81        session[SESSION_KEY_PLAN] = plan
 82        session.save()
 83
 84        response = self.client.post(
 85            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
 86            # Form data
 87            {"password": self.user.username},
 88        )
 89
 90        self.assertEqual(response.status_code, 200)
 91        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
 92
 93    def test_valid_password_inactive(self):
 94        """Test with a valid pending user and valid password"""
 95        self.user.is_active = False
 96        self.user.save()
 97        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
 98        plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
 99        session = self.client.session
100        session[SESSION_KEY_PLAN] = plan
101        session.save()
102
103        response = self.client.post(
104            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
105            # Form data
106            {"password": self.user.username},
107        )
108
109        self.assertEqual(response.status_code, 200)
110        self.assertStageResponse(
111            response,
112            self.flow,
113            response_errors={"password": [{"string": "Invalid password", "code": "invalid"}]},
114        )
115
116    def test_invalid_password(self):
117        """Test with a valid pending user and invalid password"""
118        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
119        plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
120        session = self.client.session
121        session[SESSION_KEY_PLAN] = plan
122        session.save()
123
124        response = self.client.post(
125            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
126            # Form data
127            {"password": self.user.username + "test"},
128        )
129        self.assertEqual(response.status_code, 200)
130
131    def test_invalid_password_lockout(self):
132        """Test with a valid pending user and invalid password (trigger logout counter)"""
133        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
134        plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
135        session = self.client.session
136        session[SESSION_KEY_PLAN] = plan
137        session.save()
138
139        res = self.client.get(
140            reverse(
141                "authentik_api:flow-executor",
142                kwargs={"flow_slug": self.flow.slug},
143            ),
144        )
145        self.assertEqual(res.status_code, 200)
146        for _ in range(self.stage.failed_attempts_before_cancel - 1):
147            response = self.client.post(
148                reverse(
149                    "authentik_api:flow-executor",
150                    kwargs={"flow_slug": self.flow.slug},
151                ),
152                # Form data
153                {"password": self.user.username + "test"},
154            )
155            self.assertEqual(response.status_code, 200)
156            self.assertStageResponse(
157                response,
158                flow=self.flow,
159                response_errors={"password": [{"string": "Invalid password", "code": "invalid"}]},
160            )
161
162        response = self.client.post(
163            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
164            # Form data
165            {"password": self.user.username + "test"},
166        )
167        self.assertEqual(response.status_code, 200)
168        # To ensure the plan has been cancelled, check SESSION_KEY_PLAN
169        self.assertNotIn(SESSION_KEY_PLAN, self.client.session)
170        self.assertStageResponse(response, flow=self.flow, error_message="Invalid password")
171
172    @patch(
173        "authentik.flows.views.executor.to_stage_response",
174        TO_STAGE_RESPONSE_MOCK,
175    )
176    @patch(
177        "authentik.core.auth.InbuiltBackend.authenticate",
178        MOCK_BACKEND_AUTHENTICATE,
179    )
180    def test_permission_denied(self):
181        """Test with a valid pending user and valid password.
182        Backend is patched to return PermissionError"""
183        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
184        plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
185        session = self.client.session
186        session[SESSION_KEY_PLAN] = plan
187        session.save()
188
189        response = self.client.post(
190            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
191            # Form data
192            {"password": self.user.username + "test"},
193        )
194
195        self.assertStageResponse(
196            response,
197            self.flow,
198            component="ak-stage-access-denied",
199            error_message="Unknown error",
200        )

Password tests

def setUp(self):
26    def setUp(self):
27        super().setUp()
28        self.user = create_test_admin_user()
29
30        self.flow = create_test_flow(FlowDesignation.AUTHENTICATION)
31        self.stage = PasswordStage.objects.create(name=generate_id(), backends=[BACKEND_INBUILT])
32        self.binding = FlowStageBinding.objects.create(target=self.flow, stage=self.stage, order=2)

Hook method for setting up the test fixture before exercising it.

@patch('authentik.flows.views.executor.to_stage_response', TO_STAGE_RESPONSE_MOCK)
def test_without_user(self):
34    @patch(
35        "authentik.flows.views.executor.to_stage_response",
36        TO_STAGE_RESPONSE_MOCK,
37    )
38    def test_without_user(self):
39        """Test without user"""
40        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
41        session = self.client.session
42        session[SESSION_KEY_PLAN] = plan
43        session.save()
44
45        response = self.client.post(
46            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
47            # Still have to send the password so the form is valid
48            {"password": self.user.username},
49        )
50
51        self.assertStageResponse(
52            response,
53            self.flow,
54            component="ak-stage-access-denied",
55            error_message="Unknown error",
56        )

Test without user

def test_valid_password(self):
76    def test_valid_password(self):
77        """Test with a valid pending user and valid password"""
78        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
79        plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
80        session = self.client.session
81        session[SESSION_KEY_PLAN] = plan
82        session.save()
83
84        response = self.client.post(
85            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
86            # Form data
87            {"password": self.user.username},
88        )
89
90        self.assertEqual(response.status_code, 200)
91        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))

Test with a valid pending user and valid password

def test_valid_password_inactive(self):
 93    def test_valid_password_inactive(self):
 94        """Test with a valid pending user and valid password"""
 95        self.user.is_active = False
 96        self.user.save()
 97        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
 98        plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
 99        session = self.client.session
100        session[SESSION_KEY_PLAN] = plan
101        session.save()
102
103        response = self.client.post(
104            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
105            # Form data
106            {"password": self.user.username},
107        )
108
109        self.assertEqual(response.status_code, 200)
110        self.assertStageResponse(
111            response,
112            self.flow,
113            response_errors={"password": [{"string": "Invalid password", "code": "invalid"}]},
114        )

Test with a valid pending user and valid password

def test_invalid_password(self):
116    def test_invalid_password(self):
117        """Test with a valid pending user and invalid password"""
118        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
119        plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
120        session = self.client.session
121        session[SESSION_KEY_PLAN] = plan
122        session.save()
123
124        response = self.client.post(
125            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
126            # Form data
127            {"password": self.user.username + "test"},
128        )
129        self.assertEqual(response.status_code, 200)

Test with a valid pending user and invalid password

def test_invalid_password_lockout(self):
131    def test_invalid_password_lockout(self):
132        """Test with a valid pending user and invalid password (trigger logout counter)"""
133        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
134        plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
135        session = self.client.session
136        session[SESSION_KEY_PLAN] = plan
137        session.save()
138
139        res = self.client.get(
140            reverse(
141                "authentik_api:flow-executor",
142                kwargs={"flow_slug": self.flow.slug},
143            ),
144        )
145        self.assertEqual(res.status_code, 200)
146        for _ in range(self.stage.failed_attempts_before_cancel - 1):
147            response = self.client.post(
148                reverse(
149                    "authentik_api:flow-executor",
150                    kwargs={"flow_slug": self.flow.slug},
151                ),
152                # Form data
153                {"password": self.user.username + "test"},
154            )
155            self.assertEqual(response.status_code, 200)
156            self.assertStageResponse(
157                response,
158                flow=self.flow,
159                response_errors={"password": [{"string": "Invalid password", "code": "invalid"}]},
160            )
161
162        response = self.client.post(
163            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
164            # Form data
165            {"password": self.user.username + "test"},
166        )
167        self.assertEqual(response.status_code, 200)
168        # To ensure the plan has been cancelled, check SESSION_KEY_PLAN
169        self.assertNotIn(SESSION_KEY_PLAN, self.client.session)
170        self.assertStageResponse(response, flow=self.flow, error_message="Invalid password")

Test with a valid pending user and invalid password (trigger logout counter)

@patch('authentik.flows.views.executor.to_stage_response', TO_STAGE_RESPONSE_MOCK)
@patch('authentik.core.auth.InbuiltBackend.authenticate', MOCK_BACKEND_AUTHENTICATE)
def test_permission_denied(self):
172    @patch(
173        "authentik.flows.views.executor.to_stage_response",
174        TO_STAGE_RESPONSE_MOCK,
175    )
176    @patch(
177        "authentik.core.auth.InbuiltBackend.authenticate",
178        MOCK_BACKEND_AUTHENTICATE,
179    )
180    def test_permission_denied(self):
181        """Test with a valid pending user and valid password.
182        Backend is patched to return PermissionError"""
183        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
184        plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
185        session = self.client.session
186        session[SESSION_KEY_PLAN] = plan
187        session.save()
188
189        response = self.client.post(
190            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
191            # Form data
192            {"password": self.user.username + "test"},
193        )
194
195        self.assertStageResponse(
196            response,
197            self.flow,
198            component="ak-stage-access-denied",
199            error_message="Unknown error",
200        )

Test with a valid pending user and valid password. Backend is patched to return PermissionError