authentik.stages.password.tests
password tests
1"""password tests""" 2 3from unittest.mock import MagicMock, patch 4 5from django.core.exceptions import PermissionDenied 6from django.urls import reverse 7 8from authentik.core.tests.utils import create_test_admin_user, create_test_brand, create_test_flow 9from authentik.flows.markers import StageMarker 10from authentik.flows.models import FlowDesignation, FlowStageBinding 11from authentik.flows.planner import PLAN_CONTEXT_PENDING_USER, FlowPlan 12from authentik.flows.tests import FlowTestCase 13from authentik.flows.tests.test_executor import TO_STAGE_RESPONSE_MOCK 14from authentik.flows.views.executor import SESSION_KEY_PLAN 15from authentik.lib.generators import generate_id 16from authentik.stages.password import BACKEND_INBUILT 17from authentik.stages.password.models import PasswordStage 18 19MOCK_BACKEND_AUTHENTICATE = MagicMock(side_effect=PermissionDenied("test")) 20 21 22class TestPasswordStage(FlowTestCase): 23 """Password tests""" 24 25 def setUp(self): 26 super().setUp() 27 self.user = create_test_admin_user() 28 29 self.flow = create_test_flow(FlowDesignation.AUTHENTICATION) 30 self.stage = PasswordStage.objects.create(name=generate_id(), backends=[BACKEND_INBUILT]) 31 self.binding = FlowStageBinding.objects.create(target=self.flow, stage=self.stage, order=2) 32 33 @patch( 34 "authentik.flows.views.executor.to_stage_response", 35 TO_STAGE_RESPONSE_MOCK, 36 ) 37 def test_without_user(self): 38 """Test without user""" 39 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 40 session = self.client.session 41 session[SESSION_KEY_PLAN] = plan 42 session.save() 43 44 response = self.client.post( 45 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 46 # Still have to send the password so the form is valid 47 {"password": self.user.username}, 48 ) 49 50 self.assertStageResponse( 51 response, 52 self.flow, 53 component="ak-stage-access-denied", 54 error_message="Unknown error", 55 ) 56 57 def test_recovery_flow_link(self): 58 """Test link to the default recovery flow""" 59 flow = create_test_flow(designation=FlowDesignation.RECOVERY) 60 brand = create_test_brand() 61 brand.flow_recovery = flow 62 brand.save() 63 64 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 65 session = self.client.session 66 session[SESSION_KEY_PLAN] = plan 67 session.save() 68 69 response = self.client.get( 70 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 71 ) 72 self.assertEqual(response.status_code, 200) 73 self.assertIn(flow.slug, response.content.decode()) 74 75 def test_valid_password(self): 76 """Test with a valid pending user and valid password""" 77 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 78 plan.context[PLAN_CONTEXT_PENDING_USER] = self.user 79 session = self.client.session 80 session[SESSION_KEY_PLAN] = plan 81 session.save() 82 83 response = self.client.post( 84 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 85 # Form data 86 {"password": self.user.username}, 87 ) 88 89 self.assertEqual(response.status_code, 200) 90 self.assertStageRedirects(response, reverse("authentik_core:root-redirect")) 91 92 def test_valid_password_inactive(self): 93 """Test with a valid pending user and valid password""" 94 self.user.is_active = False 95 self.user.save() 96 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 97 plan.context[PLAN_CONTEXT_PENDING_USER] = self.user 98 session = self.client.session 99 session[SESSION_KEY_PLAN] = plan 100 session.save() 101 102 response = self.client.post( 103 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 104 # Form data 105 {"password": self.user.username}, 106 ) 107 108 self.assertEqual(response.status_code, 200) 109 self.assertStageResponse( 110 response, 111 self.flow, 112 response_errors={"password": [{"string": "Invalid password", "code": "invalid"}]}, 113 ) 114 115 def test_invalid_password(self): 116 """Test with a valid pending user and invalid password""" 117 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 118 plan.context[PLAN_CONTEXT_PENDING_USER] = self.user 119 session = self.client.session 120 session[SESSION_KEY_PLAN] = plan 121 session.save() 122 123 response = self.client.post( 124 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 125 # Form data 126 {"password": self.user.username + "test"}, 127 ) 128 self.assertEqual(response.status_code, 200) 129 130 def test_invalid_password_lockout(self): 131 """Test with a valid pending user and invalid password (trigger logout counter)""" 132 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 133 plan.context[PLAN_CONTEXT_PENDING_USER] = self.user 134 session = self.client.session 135 session[SESSION_KEY_PLAN] = plan 136 session.save() 137 138 res = self.client.get( 139 reverse( 140 "authentik_api:flow-executor", 141 kwargs={"flow_slug": self.flow.slug}, 142 ), 143 ) 144 self.assertEqual(res.status_code, 200) 145 for _ in range(self.stage.failed_attempts_before_cancel - 1): 146 response = self.client.post( 147 reverse( 148 "authentik_api:flow-executor", 149 kwargs={"flow_slug": self.flow.slug}, 150 ), 151 # Form data 152 {"password": self.user.username + "test"}, 153 ) 154 self.assertEqual(response.status_code, 200) 155 self.assertStageResponse( 156 response, 157 flow=self.flow, 158 response_errors={"password": [{"string": "Invalid password", "code": "invalid"}]}, 159 ) 160 161 response = self.client.post( 162 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 163 # Form data 164 {"password": self.user.username + "test"}, 165 ) 166 self.assertEqual(response.status_code, 200) 167 # To ensure the plan has been cancelled, check SESSION_KEY_PLAN 168 self.assertNotIn(SESSION_KEY_PLAN, self.client.session) 169 self.assertStageResponse(response, flow=self.flow, error_message="Invalid password") 170 171 @patch( 172 "authentik.flows.views.executor.to_stage_response", 173 TO_STAGE_RESPONSE_MOCK, 174 ) 175 @patch( 176 "authentik.core.auth.InbuiltBackend.authenticate", 177 MOCK_BACKEND_AUTHENTICATE, 178 ) 179 def test_permission_denied(self): 180 """Test with a valid pending user and valid password. 181 Backend is patched to return PermissionError""" 182 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 183 plan.context[PLAN_CONTEXT_PENDING_USER] = self.user 184 session = self.client.session 185 session[SESSION_KEY_PLAN] = plan 186 session.save() 187 188 response = self.client.post( 189 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 190 # Form data 191 {"password": self.user.username + "test"}, 192 ) 193 194 self.assertStageResponse( 195 response, 196 self.flow, 197 component="ak-stage-access-denied", 198 error_message="Unknown error", 199 )
MOCK_BACKEND_AUTHENTICATE =
<MagicMock id='139707749764464'>
23class TestPasswordStage(FlowTestCase): 24 """Password tests""" 25 26 def setUp(self): 27 super().setUp() 28 self.user = create_test_admin_user() 29 30 self.flow = create_test_flow(FlowDesignation.AUTHENTICATION) 31 self.stage = PasswordStage.objects.create(name=generate_id(), backends=[BACKEND_INBUILT]) 32 self.binding = FlowStageBinding.objects.create(target=self.flow, stage=self.stage, order=2) 33 34 @patch( 35 "authentik.flows.views.executor.to_stage_response", 36 TO_STAGE_RESPONSE_MOCK, 37 ) 38 def test_without_user(self): 39 """Test without user""" 40 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 41 session = self.client.session 42 session[SESSION_KEY_PLAN] = plan 43 session.save() 44 45 response = self.client.post( 46 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 47 # Still have to send the password so the form is valid 48 {"password": self.user.username}, 49 ) 50 51 self.assertStageResponse( 52 response, 53 self.flow, 54 component="ak-stage-access-denied", 55 error_message="Unknown error", 56 ) 57 58 def test_recovery_flow_link(self): 59 """Test link to the default recovery flow""" 60 flow = create_test_flow(designation=FlowDesignation.RECOVERY) 61 brand = create_test_brand() 62 brand.flow_recovery = flow 63 brand.save() 64 65 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 66 session = self.client.session 67 session[SESSION_KEY_PLAN] = plan 68 session.save() 69 70 response = self.client.get( 71 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 72 ) 73 self.assertEqual(response.status_code, 200) 74 self.assertIn(flow.slug, response.content.decode()) 75 76 def test_valid_password(self): 77 """Test with a valid pending user and valid password""" 78 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 79 plan.context[PLAN_CONTEXT_PENDING_USER] = self.user 80 session = self.client.session 81 session[SESSION_KEY_PLAN] = plan 82 session.save() 83 84 response = self.client.post( 85 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 86 # Form data 87 {"password": self.user.username}, 88 ) 89 90 self.assertEqual(response.status_code, 200) 91 self.assertStageRedirects(response, reverse("authentik_core:root-redirect")) 92 93 def test_valid_password_inactive(self): 94 """Test with a valid pending user and valid password""" 95 self.user.is_active = False 96 self.user.save() 97 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 98 plan.context[PLAN_CONTEXT_PENDING_USER] = self.user 99 session = self.client.session 100 session[SESSION_KEY_PLAN] = plan 101 session.save() 102 103 response = self.client.post( 104 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 105 # Form data 106 {"password": self.user.username}, 107 ) 108 109 self.assertEqual(response.status_code, 200) 110 self.assertStageResponse( 111 response, 112 self.flow, 113 response_errors={"password": [{"string": "Invalid password", "code": "invalid"}]}, 114 ) 115 116 def test_invalid_password(self): 117 """Test with a valid pending user and invalid password""" 118 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 119 plan.context[PLAN_CONTEXT_PENDING_USER] = self.user 120 session = self.client.session 121 session[SESSION_KEY_PLAN] = plan 122 session.save() 123 124 response = self.client.post( 125 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 126 # Form data 127 {"password": self.user.username + "test"}, 128 ) 129 self.assertEqual(response.status_code, 200) 130 131 def test_invalid_password_lockout(self): 132 """Test with a valid pending user and invalid password (trigger logout counter)""" 133 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 134 plan.context[PLAN_CONTEXT_PENDING_USER] = self.user 135 session = self.client.session 136 session[SESSION_KEY_PLAN] = plan 137 session.save() 138 139 res = self.client.get( 140 reverse( 141 "authentik_api:flow-executor", 142 kwargs={"flow_slug": self.flow.slug}, 143 ), 144 ) 145 self.assertEqual(res.status_code, 200) 146 for _ in range(self.stage.failed_attempts_before_cancel - 1): 147 response = self.client.post( 148 reverse( 149 "authentik_api:flow-executor", 150 kwargs={"flow_slug": self.flow.slug}, 151 ), 152 # Form data 153 {"password": self.user.username + "test"}, 154 ) 155 self.assertEqual(response.status_code, 200) 156 self.assertStageResponse( 157 response, 158 flow=self.flow, 159 response_errors={"password": [{"string": "Invalid password", "code": "invalid"}]}, 160 ) 161 162 response = self.client.post( 163 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 164 # Form data 165 {"password": self.user.username + "test"}, 166 ) 167 self.assertEqual(response.status_code, 200) 168 # To ensure the plan has been cancelled, check SESSION_KEY_PLAN 169 self.assertNotIn(SESSION_KEY_PLAN, self.client.session) 170 self.assertStageResponse(response, flow=self.flow, error_message="Invalid password") 171 172 @patch( 173 "authentik.flows.views.executor.to_stage_response", 174 TO_STAGE_RESPONSE_MOCK, 175 ) 176 @patch( 177 "authentik.core.auth.InbuiltBackend.authenticate", 178 MOCK_BACKEND_AUTHENTICATE, 179 ) 180 def test_permission_denied(self): 181 """Test with a valid pending user and valid password. 182 Backend is patched to return PermissionError""" 183 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 184 plan.context[PLAN_CONTEXT_PENDING_USER] = self.user 185 session = self.client.session 186 session[SESSION_KEY_PLAN] = plan 187 session.save() 188 189 response = self.client.post( 190 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 191 # Form data 192 {"password": self.user.username + "test"}, 193 ) 194 195 self.assertStageResponse( 196 response, 197 self.flow, 198 component="ak-stage-access-denied", 199 error_message="Unknown error", 200 )
Password tests
def
setUp(self):
26 def setUp(self): 27 super().setUp() 28 self.user = create_test_admin_user() 29 30 self.flow = create_test_flow(FlowDesignation.AUTHENTICATION) 31 self.stage = PasswordStage.objects.create(name=generate_id(), backends=[BACKEND_INBUILT]) 32 self.binding = FlowStageBinding.objects.create(target=self.flow, stage=self.stage, order=2)
Hook method for setting up the test fixture before exercising it.
@patch('authentik.flows.views.executor.to_stage_response', TO_STAGE_RESPONSE_MOCK)
def
test_without_user(self):
34 @patch( 35 "authentik.flows.views.executor.to_stage_response", 36 TO_STAGE_RESPONSE_MOCK, 37 ) 38 def test_without_user(self): 39 """Test without user""" 40 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 41 session = self.client.session 42 session[SESSION_KEY_PLAN] = plan 43 session.save() 44 45 response = self.client.post( 46 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 47 # Still have to send the password so the form is valid 48 {"password": self.user.username}, 49 ) 50 51 self.assertStageResponse( 52 response, 53 self.flow, 54 component="ak-stage-access-denied", 55 error_message="Unknown error", 56 )
Test without user
def
test_recovery_flow_link(self):
58 def test_recovery_flow_link(self): 59 """Test link to the default recovery flow""" 60 flow = create_test_flow(designation=FlowDesignation.RECOVERY) 61 brand = create_test_brand() 62 brand.flow_recovery = flow 63 brand.save() 64 65 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 66 session = self.client.session 67 session[SESSION_KEY_PLAN] = plan 68 session.save() 69 70 response = self.client.get( 71 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 72 ) 73 self.assertEqual(response.status_code, 200) 74 self.assertIn(flow.slug, response.content.decode())
Test link to the default recovery flow
def
test_valid_password(self):
76 def test_valid_password(self): 77 """Test with a valid pending user and valid password""" 78 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 79 plan.context[PLAN_CONTEXT_PENDING_USER] = self.user 80 session = self.client.session 81 session[SESSION_KEY_PLAN] = plan 82 session.save() 83 84 response = self.client.post( 85 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 86 # Form data 87 {"password": self.user.username}, 88 ) 89 90 self.assertEqual(response.status_code, 200) 91 self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
Test with a valid pending user and valid password
def
test_valid_password_inactive(self):
93 def test_valid_password_inactive(self): 94 """Test with a valid pending user and valid password""" 95 self.user.is_active = False 96 self.user.save() 97 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 98 plan.context[PLAN_CONTEXT_PENDING_USER] = self.user 99 session = self.client.session 100 session[SESSION_KEY_PLAN] = plan 101 session.save() 102 103 response = self.client.post( 104 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 105 # Form data 106 {"password": self.user.username}, 107 ) 108 109 self.assertEqual(response.status_code, 200) 110 self.assertStageResponse( 111 response, 112 self.flow, 113 response_errors={"password": [{"string": "Invalid password", "code": "invalid"}]}, 114 )
Test with a valid pending user and valid password
def
test_invalid_password(self):
116 def test_invalid_password(self): 117 """Test with a valid pending user and invalid password""" 118 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 119 plan.context[PLAN_CONTEXT_PENDING_USER] = self.user 120 session = self.client.session 121 session[SESSION_KEY_PLAN] = plan 122 session.save() 123 124 response = self.client.post( 125 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 126 # Form data 127 {"password": self.user.username + "test"}, 128 ) 129 self.assertEqual(response.status_code, 200)
Test with a valid pending user and invalid password
def
test_invalid_password_lockout(self):
131 def test_invalid_password_lockout(self): 132 """Test with a valid pending user and invalid password (trigger logout counter)""" 133 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 134 plan.context[PLAN_CONTEXT_PENDING_USER] = self.user 135 session = self.client.session 136 session[SESSION_KEY_PLAN] = plan 137 session.save() 138 139 res = self.client.get( 140 reverse( 141 "authentik_api:flow-executor", 142 kwargs={"flow_slug": self.flow.slug}, 143 ), 144 ) 145 self.assertEqual(res.status_code, 200) 146 for _ in range(self.stage.failed_attempts_before_cancel - 1): 147 response = self.client.post( 148 reverse( 149 "authentik_api:flow-executor", 150 kwargs={"flow_slug": self.flow.slug}, 151 ), 152 # Form data 153 {"password": self.user.username + "test"}, 154 ) 155 self.assertEqual(response.status_code, 200) 156 self.assertStageResponse( 157 response, 158 flow=self.flow, 159 response_errors={"password": [{"string": "Invalid password", "code": "invalid"}]}, 160 ) 161 162 response = self.client.post( 163 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 164 # Form data 165 {"password": self.user.username + "test"}, 166 ) 167 self.assertEqual(response.status_code, 200) 168 # To ensure the plan has been cancelled, check SESSION_KEY_PLAN 169 self.assertNotIn(SESSION_KEY_PLAN, self.client.session) 170 self.assertStageResponse(response, flow=self.flow, error_message="Invalid password")
Test with a valid pending user and invalid password (trigger logout counter)
@patch('authentik.flows.views.executor.to_stage_response', TO_STAGE_RESPONSE_MOCK)
@patch('authentik.core.auth.InbuiltBackend.authenticate', MOCK_BACKEND_AUTHENTICATE)
def
test_permission_denied(self):
172 @patch( 173 "authentik.flows.views.executor.to_stage_response", 174 TO_STAGE_RESPONSE_MOCK, 175 ) 176 @patch( 177 "authentik.core.auth.InbuiltBackend.authenticate", 178 MOCK_BACKEND_AUTHENTICATE, 179 ) 180 def test_permission_denied(self): 181 """Test with a valid pending user and valid password. 182 Backend is patched to return PermissionError""" 183 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 184 plan.context[PLAN_CONTEXT_PENDING_USER] = self.user 185 session = self.client.session 186 session[SESSION_KEY_PLAN] = plan 187 session.save() 188 189 response = self.client.post( 190 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 191 # Form data 192 {"password": self.user.username + "test"}, 193 ) 194 195 self.assertStageResponse( 196 response, 197 self.flow, 198 component="ak-stage-access-denied", 199 error_message="Unknown error", 200 )
Test with a valid pending user and valid password. Backend is patched to return PermissionError