authentik.stages.authenticator_validate.tests.test_throttling
1from django.test import TestCase 2from django.test.client import RequestFactory 3from django.urls.base import reverse 4from rest_framework.exceptions import ValidationError 5 6from authentik.core.tests.utils import create_test_admin_user, create_test_flow 7from authentik.flows.models import FlowStageBinding 8from authentik.flows.stage import StageView 9from authentik.flows.tests import FlowTestCase 10from authentik.flows.views.executor import FlowExecutorView 11from authentik.lib.generators import generate_id 12from authentik.stages.authenticator_email.models import AuthenticatorEmailStage, EmailDevice 13from authentik.stages.authenticator_sms.models import ( 14 AuthenticatorSMSStage, 15 SMSDevice, 16 SMSProviders, 17) 18from authentik.stages.authenticator_validate.challenge import validate_challenge_code 19from authentik.stages.authenticator_validate.models import ( 20 AuthenticatorValidateStage, 21 DeviceClasses, 22) 23from authentik.stages.identification.models import IdentificationStage, UserFields 24 25 26class DeviceClassesHelperTests(TestCase): 27 """Tests for the DeviceClasses.from_model_label helper.""" 28 29 def test_from_model_label_all_classes(self): 30 cases = { 31 "authentik_stages_authenticator_email.emaildevice": DeviceClasses.EMAIL, 32 "authentik_stages_authenticator_sms.smsdevice": DeviceClasses.SMS, 33 "authentik_stages_authenticator_totp.totpdevice": DeviceClasses.TOTP, 34 "authentik_stages_authenticator_static.staticdevice": DeviceClasses.STATIC, 35 "authentik_stages_authenticator_duo.duodevice": DeviceClasses.DUO, 36 "authentik_stages_authenticator_webauthn.webauthndevice": DeviceClasses.WEBAUTHN, 37 } 38 for label, expected in cases.items(): 39 with self.subTest(label=label): 40 self.assertEqual(DeviceClasses.from_model_label(label), expected) 41 42 43class AuthenticatorValidateStageFactorTests(TestCase): 44 """Tests for AuthenticatorValidateStage.get_throttling_factor.""" 45 46 def test_per_class_factors_returned(self): 47 stage = AuthenticatorValidateStage.objects.create( 48 name=generate_id(), 49 email_otp_throttling_factor=5, 50 sms_otp_throttling_factor=6, 51 totp_otp_throttling_factor=7, 52 static_otp_throttling_factor=8, 53 ) 54 self.assertEqual(stage.get_throttling_factor(DeviceClasses.EMAIL), 5) 55 self.assertEqual(stage.get_throttling_factor(DeviceClasses.SMS), 6) 56 self.assertEqual(stage.get_throttling_factor(DeviceClasses.TOTP), 7) 57 self.assertEqual(stage.get_throttling_factor(DeviceClasses.STATIC), 8) 58 59 def test_no_factor_for_webauthn_or_duo(self): 60 stage = AuthenticatorValidateStage.objects.create(name=generate_id()) 61 self.assertIsNone(stage.get_throttling_factor(DeviceClasses.WEBAUTHN)) 62 self.assertIsNone(stage.get_throttling_factor(DeviceClasses.DUO)) 63 64 65class ValidateChallengeCodeThrottlingTests(FlowTestCase): 66 """Tests for validate_challenge_code throttling behavior.""" 67 68 def setUp(self) -> None: 69 super().setUp() 70 self.user = create_test_admin_user() 71 self.request_factory = RequestFactory() 72 self.email_stage = AuthenticatorEmailStage.objects.create( 73 name="email-stage-validate-throttle", 74 use_global_settings=True, 75 from_address="test@authentik.local", 76 token_expiry="minutes=30", 77 ) # nosec 78 self.sms_stage = AuthenticatorSMSStage.objects.create( 79 name="sms-stage-validate-throttle", 80 provider=SMSProviders.GENERIC, 81 from_number="1234", 82 ) 83 84 def _validate_stage(self, **factors) -> AuthenticatorValidateStage: 85 return AuthenticatorValidateStage.objects.create( 86 name=generate_id(), 87 device_classes=[ 88 DeviceClasses.EMAIL, 89 DeviceClasses.SMS, 90 DeviceClasses.TOTP, 91 DeviceClasses.STATIC, 92 ], 93 **factors, 94 ) 95 96 def _stage_view(self, validate_stage: AuthenticatorValidateStage) -> StageView: 97 request = self.request_factory.get("/") 98 return StageView(FlowExecutorView(current_stage=validate_stage), request=request) 99 100 def _email_device(self, email: str = "throttle@authentik.local") -> EmailDevice: 101 return EmailDevice.objects.create( 102 user=self.user, 103 stage=self.email_stage, 104 confirmed=True, 105 email=email, 106 ) 107 108 def _sms_device(self, phone_number: str = "+15551230101") -> SMSDevice: 109 return SMSDevice.objects.create( 110 user=self.user, 111 stage=self.sms_stage, 112 confirmed=True, 113 phone_number=phone_number, 114 ) 115 116 def test_stage_factor_applied_to_email_device(self): 117 """The stage's email_otp_throttling_factor is pushed onto the device before verify.""" 118 stage = self._validate_stage(email_otp_throttling_factor=3) 119 device = self._email_device() 120 device.generate_token() 121 with self.assertRaises(ValidationError): 122 validate_challenge_code("000000", self._stage_view(stage), self.user) 123 device.refresh_from_db() 124 self.assertEqual(device.throttling_failure_count, 1) 125 # verify_is_allowed must compute the delay using factor=3 (3 * 2^0 = 3s). 126 device.set_throttle_factor(3) 127 allowed, data = device.verify_is_allowed() 128 self.assertFalse(allowed) 129 required = data["locked_until"] - device.throttling_failure_timestamp 130 self.assertAlmostEqual(required.total_seconds(), 3, places=3) 131 132 def test_factor_zero_disables_throttling_end_to_end(self): 133 """With email_otp_throttling_factor=0, repeated failures do not lock the device.""" 134 stage = self._validate_stage(email_otp_throttling_factor=0) 135 device = self._email_device() 136 device.generate_token() 137 token = device.token 138 for _ in range(10): 139 with self.assertRaises(ValidationError): 140 validate_challenge_code("000000", self._stage_view(stage), self.user) 141 matched = validate_challenge_code(token, self._stage_view(stage), self.user) 142 self.assertEqual(matched.pk, device.pk) 143 144 def test_lockout_persists_across_calls(self): 145 """ 146 A correct token on the second call is still blocked and does not increment the counter. 147 """ 148 stage = self._validate_stage(email_otp_throttling_factor=1) 149 device = self._email_device() 150 device.generate_token() 151 token = device.token 152 invalid_token = "000000" if token != "000000" else "111111" # nosec 153 with self.assertRaises(ValidationError): 154 validate_challenge_code(invalid_token, self._stage_view(stage), self.user) 155 # Immediately try with the correct token: lockout still active, attempt must be rejected. 156 with self.assertRaises(ValidationError): 157 validate_challenge_code(token, self._stage_view(stage), self.user) 158 device.refresh_from_db() 159 # Token wasn't consumed (verification never ran), and counter didn't get incremented. 160 self.assertEqual(device.token, token) 161 self.assertEqual(device.throttling_failure_count, 1) 162 163 164class ValidateStageThrottlingFlowTests(FlowTestCase): 165 """End-to-end lockout behavior through the flow executor HTTP API.""" 166 167 def setUp(self) -> None: 168 super().setUp() 169 self.user = create_test_admin_user() 170 self.email_stage = AuthenticatorEmailStage.objects.create( 171 name="email-stage-flow-throttle", 172 use_global_settings=True, 173 from_address="test@authentik.local", 174 token_expiry="minutes=30", 175 ) # nosec 176 self.ident_stage = IdentificationStage.objects.create( 177 name=generate_id(), 178 user_fields=[UserFields.USERNAME], 179 ) 180 self.validate_stage = AuthenticatorValidateStage.objects.create( 181 name=generate_id(), 182 device_classes=[DeviceClasses.EMAIL], 183 email_otp_throttling_factor=1, 184 ) 185 self.flow = create_test_flow() 186 FlowStageBinding.objects.create(target=self.flow, stage=self.ident_stage, order=0) 187 FlowStageBinding.objects.create(target=self.flow, stage=self.validate_stage, order=1) 188 189 def _identify(self): 190 response = self.client.post( 191 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 192 {"uid_field": self.user.username}, 193 follow=True, 194 ) 195 self.assertEqual(response.status_code, 200) 196 197 def _select_email(self, device: EmailDevice): 198 self.client.post( 199 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 200 { 201 "component": "ak-stage-authenticator-validate", 202 "selected_challenge": { 203 "device_class": "email", 204 "device_uid": str(device.pk), 205 "challenge": {}, 206 "last_used": None, 207 }, 208 }, 209 ) 210 211 def test_bad_code_then_correct_code_is_still_blocked(self): 212 """After a bad code over HTTP, a subsequent correct code is still rejected 213 because the lockout persists in the database.""" 214 device = EmailDevice.objects.create( 215 user=self.user, 216 confirmed=True, 217 stage=self.email_stage, 218 email="throttle-flow@authentik.local", 219 ) 220 self._identify() 221 self._select_email(device) 222 # Server generated and stored the token - grab it from DB. 223 device.refresh_from_db() 224 token = device.token 225 # First attempt: bad code - must increment the DB counter. 226 self.client.post( 227 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 228 {"component": "ak-stage-authenticator-validate", "code": "000000"}, 229 ) 230 device.refresh_from_db() 231 self.assertEqual(device.throttling_failure_count, 1) 232 self.assertEqual(device.token, token) 233 # Second attempt with the correct token - still blocked. 234 response = self.client.post( 235 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 236 {"component": "ak-stage-authenticator-validate", "code": token}, 237 ) 238 self.assertStageResponse( 239 response, 240 flow=self.flow, 241 component="ak-stage-authenticator-validate", 242 ) 243 device.refresh_from_db() 244 # Counter wasn't incremented on a blocked attempt 245 self.assertEqual(device.throttling_failure_count, 1) 246 # Token wasn't consumed. 247 self.assertEqual(device.token, token)
class
DeviceClassesHelperTests(django.test.testcases.TestCase):
27class DeviceClassesHelperTests(TestCase): 28 """Tests for the DeviceClasses.from_model_label helper.""" 29 30 def test_from_model_label_all_classes(self): 31 cases = { 32 "authentik_stages_authenticator_email.emaildevice": DeviceClasses.EMAIL, 33 "authentik_stages_authenticator_sms.smsdevice": DeviceClasses.SMS, 34 "authentik_stages_authenticator_totp.totpdevice": DeviceClasses.TOTP, 35 "authentik_stages_authenticator_static.staticdevice": DeviceClasses.STATIC, 36 "authentik_stages_authenticator_duo.duodevice": DeviceClasses.DUO, 37 "authentik_stages_authenticator_webauthn.webauthndevice": DeviceClasses.WEBAUTHN, 38 } 39 for label, expected in cases.items(): 40 with self.subTest(label=label): 41 self.assertEqual(DeviceClasses.from_model_label(label), expected)
Tests for the DeviceClasses.from_model_label helper.
def
test_from_model_label_all_classes(self):
30 def test_from_model_label_all_classes(self): 31 cases = { 32 "authentik_stages_authenticator_email.emaildevice": DeviceClasses.EMAIL, 33 "authentik_stages_authenticator_sms.smsdevice": DeviceClasses.SMS, 34 "authentik_stages_authenticator_totp.totpdevice": DeviceClasses.TOTP, 35 "authentik_stages_authenticator_static.staticdevice": DeviceClasses.STATIC, 36 "authentik_stages_authenticator_duo.duodevice": DeviceClasses.DUO, 37 "authentik_stages_authenticator_webauthn.webauthndevice": DeviceClasses.WEBAUTHN, 38 } 39 for label, expected in cases.items(): 40 with self.subTest(label=label): 41 self.assertEqual(DeviceClasses.from_model_label(label), expected)
class
AuthenticatorValidateStageFactorTests(django.test.testcases.TestCase):
44class AuthenticatorValidateStageFactorTests(TestCase): 45 """Tests for AuthenticatorValidateStage.get_throttling_factor.""" 46 47 def test_per_class_factors_returned(self): 48 stage = AuthenticatorValidateStage.objects.create( 49 name=generate_id(), 50 email_otp_throttling_factor=5, 51 sms_otp_throttling_factor=6, 52 totp_otp_throttling_factor=7, 53 static_otp_throttling_factor=8, 54 ) 55 self.assertEqual(stage.get_throttling_factor(DeviceClasses.EMAIL), 5) 56 self.assertEqual(stage.get_throttling_factor(DeviceClasses.SMS), 6) 57 self.assertEqual(stage.get_throttling_factor(DeviceClasses.TOTP), 7) 58 self.assertEqual(stage.get_throttling_factor(DeviceClasses.STATIC), 8) 59 60 def test_no_factor_for_webauthn_or_duo(self): 61 stage = AuthenticatorValidateStage.objects.create(name=generate_id()) 62 self.assertIsNone(stage.get_throttling_factor(DeviceClasses.WEBAUTHN)) 63 self.assertIsNone(stage.get_throttling_factor(DeviceClasses.DUO))
Tests for AuthenticatorValidateStage.get_throttling_factor.
def
test_per_class_factors_returned(self):
47 def test_per_class_factors_returned(self): 48 stage = AuthenticatorValidateStage.objects.create( 49 name=generate_id(), 50 email_otp_throttling_factor=5, 51 sms_otp_throttling_factor=6, 52 totp_otp_throttling_factor=7, 53 static_otp_throttling_factor=8, 54 ) 55 self.assertEqual(stage.get_throttling_factor(DeviceClasses.EMAIL), 5) 56 self.assertEqual(stage.get_throttling_factor(DeviceClasses.SMS), 6) 57 self.assertEqual(stage.get_throttling_factor(DeviceClasses.TOTP), 7) 58 self.assertEqual(stage.get_throttling_factor(DeviceClasses.STATIC), 8)
66class ValidateChallengeCodeThrottlingTests(FlowTestCase): 67 """Tests for validate_challenge_code throttling behavior.""" 68 69 def setUp(self) -> None: 70 super().setUp() 71 self.user = create_test_admin_user() 72 self.request_factory = RequestFactory() 73 self.email_stage = AuthenticatorEmailStage.objects.create( 74 name="email-stage-validate-throttle", 75 use_global_settings=True, 76 from_address="test@authentik.local", 77 token_expiry="minutes=30", 78 ) # nosec 79 self.sms_stage = AuthenticatorSMSStage.objects.create( 80 name="sms-stage-validate-throttle", 81 provider=SMSProviders.GENERIC, 82 from_number="1234", 83 ) 84 85 def _validate_stage(self, **factors) -> AuthenticatorValidateStage: 86 return AuthenticatorValidateStage.objects.create( 87 name=generate_id(), 88 device_classes=[ 89 DeviceClasses.EMAIL, 90 DeviceClasses.SMS, 91 DeviceClasses.TOTP, 92 DeviceClasses.STATIC, 93 ], 94 **factors, 95 ) 96 97 def _stage_view(self, validate_stage: AuthenticatorValidateStage) -> StageView: 98 request = self.request_factory.get("/") 99 return StageView(FlowExecutorView(current_stage=validate_stage), request=request) 100 101 def _email_device(self, email: str = "throttle@authentik.local") -> EmailDevice: 102 return EmailDevice.objects.create( 103 user=self.user, 104 stage=self.email_stage, 105 confirmed=True, 106 email=email, 107 ) 108 109 def _sms_device(self, phone_number: str = "+15551230101") -> SMSDevice: 110 return SMSDevice.objects.create( 111 user=self.user, 112 stage=self.sms_stage, 113 confirmed=True, 114 phone_number=phone_number, 115 ) 116 117 def test_stage_factor_applied_to_email_device(self): 118 """The stage's email_otp_throttling_factor is pushed onto the device before verify.""" 119 stage = self._validate_stage(email_otp_throttling_factor=3) 120 device = self._email_device() 121 device.generate_token() 122 with self.assertRaises(ValidationError): 123 validate_challenge_code("000000", self._stage_view(stage), self.user) 124 device.refresh_from_db() 125 self.assertEqual(device.throttling_failure_count, 1) 126 # verify_is_allowed must compute the delay using factor=3 (3 * 2^0 = 3s). 127 device.set_throttle_factor(3) 128 allowed, data = device.verify_is_allowed() 129 self.assertFalse(allowed) 130 required = data["locked_until"] - device.throttling_failure_timestamp 131 self.assertAlmostEqual(required.total_seconds(), 3, places=3) 132 133 def test_factor_zero_disables_throttling_end_to_end(self): 134 """With email_otp_throttling_factor=0, repeated failures do not lock the device.""" 135 stage = self._validate_stage(email_otp_throttling_factor=0) 136 device = self._email_device() 137 device.generate_token() 138 token = device.token 139 for _ in range(10): 140 with self.assertRaises(ValidationError): 141 validate_challenge_code("000000", self._stage_view(stage), self.user) 142 matched = validate_challenge_code(token, self._stage_view(stage), self.user) 143 self.assertEqual(matched.pk, device.pk) 144 145 def test_lockout_persists_across_calls(self): 146 """ 147 A correct token on the second call is still blocked and does not increment the counter. 148 """ 149 stage = self._validate_stage(email_otp_throttling_factor=1) 150 device = self._email_device() 151 device.generate_token() 152 token = device.token 153 invalid_token = "000000" if token != "000000" else "111111" # nosec 154 with self.assertRaises(ValidationError): 155 validate_challenge_code(invalid_token, self._stage_view(stage), self.user) 156 # Immediately try with the correct token: lockout still active, attempt must be rejected. 157 with self.assertRaises(ValidationError): 158 validate_challenge_code(token, self._stage_view(stage), self.user) 159 device.refresh_from_db() 160 # Token wasn't consumed (verification never ran), and counter didn't get incremented. 161 self.assertEqual(device.token, token) 162 self.assertEqual(device.throttling_failure_count, 1)
Tests for validate_challenge_code throttling behavior.
def
setUp(self) -> None:
69 def setUp(self) -> None: 70 super().setUp() 71 self.user = create_test_admin_user() 72 self.request_factory = RequestFactory() 73 self.email_stage = AuthenticatorEmailStage.objects.create( 74 name="email-stage-validate-throttle", 75 use_global_settings=True, 76 from_address="test@authentik.local", 77 token_expiry="minutes=30", 78 ) # nosec 79 self.sms_stage = AuthenticatorSMSStage.objects.create( 80 name="sms-stage-validate-throttle", 81 provider=SMSProviders.GENERIC, 82 from_number="1234", 83 )
Hook method for setting up the test fixture before exercising it.
def
test_stage_factor_applied_to_email_device(self):
117 def test_stage_factor_applied_to_email_device(self): 118 """The stage's email_otp_throttling_factor is pushed onto the device before verify.""" 119 stage = self._validate_stage(email_otp_throttling_factor=3) 120 device = self._email_device() 121 device.generate_token() 122 with self.assertRaises(ValidationError): 123 validate_challenge_code("000000", self._stage_view(stage), self.user) 124 device.refresh_from_db() 125 self.assertEqual(device.throttling_failure_count, 1) 126 # verify_is_allowed must compute the delay using factor=3 (3 * 2^0 = 3s). 127 device.set_throttle_factor(3) 128 allowed, data = device.verify_is_allowed() 129 self.assertFalse(allowed) 130 required = data["locked_until"] - device.throttling_failure_timestamp 131 self.assertAlmostEqual(required.total_seconds(), 3, places=3)
The stage's email_otp_throttling_factor is pushed onto the device before verify.
def
test_factor_zero_disables_throttling_end_to_end(self):
133 def test_factor_zero_disables_throttling_end_to_end(self): 134 """With email_otp_throttling_factor=0, repeated failures do not lock the device.""" 135 stage = self._validate_stage(email_otp_throttling_factor=0) 136 device = self._email_device() 137 device.generate_token() 138 token = device.token 139 for _ in range(10): 140 with self.assertRaises(ValidationError): 141 validate_challenge_code("000000", self._stage_view(stage), self.user) 142 matched = validate_challenge_code(token, self._stage_view(stage), self.user) 143 self.assertEqual(matched.pk, device.pk)
With email_otp_throttling_factor=0, repeated failures do not lock the device.
def
test_lockout_persists_across_calls(self):
145 def test_lockout_persists_across_calls(self): 146 """ 147 A correct token on the second call is still blocked and does not increment the counter. 148 """ 149 stage = self._validate_stage(email_otp_throttling_factor=1) 150 device = self._email_device() 151 device.generate_token() 152 token = device.token 153 invalid_token = "000000" if token != "000000" else "111111" # nosec 154 with self.assertRaises(ValidationError): 155 validate_challenge_code(invalid_token, self._stage_view(stage), self.user) 156 # Immediately try with the correct token: lockout still active, attempt must be rejected. 157 with self.assertRaises(ValidationError): 158 validate_challenge_code(token, self._stage_view(stage), self.user) 159 device.refresh_from_db() 160 # Token wasn't consumed (verification never ran), and counter didn't get incremented. 161 self.assertEqual(device.token, token) 162 self.assertEqual(device.throttling_failure_count, 1)
A correct token on the second call is still blocked and does not increment the counter.
165class ValidateStageThrottlingFlowTests(FlowTestCase): 166 """End-to-end lockout behavior through the flow executor HTTP API.""" 167 168 def setUp(self) -> None: 169 super().setUp() 170 self.user = create_test_admin_user() 171 self.email_stage = AuthenticatorEmailStage.objects.create( 172 name="email-stage-flow-throttle", 173 use_global_settings=True, 174 from_address="test@authentik.local", 175 token_expiry="minutes=30", 176 ) # nosec 177 self.ident_stage = IdentificationStage.objects.create( 178 name=generate_id(), 179 user_fields=[UserFields.USERNAME], 180 ) 181 self.validate_stage = AuthenticatorValidateStage.objects.create( 182 name=generate_id(), 183 device_classes=[DeviceClasses.EMAIL], 184 email_otp_throttling_factor=1, 185 ) 186 self.flow = create_test_flow() 187 FlowStageBinding.objects.create(target=self.flow, stage=self.ident_stage, order=0) 188 FlowStageBinding.objects.create(target=self.flow, stage=self.validate_stage, order=1) 189 190 def _identify(self): 191 response = self.client.post( 192 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 193 {"uid_field": self.user.username}, 194 follow=True, 195 ) 196 self.assertEqual(response.status_code, 200) 197 198 def _select_email(self, device: EmailDevice): 199 self.client.post( 200 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 201 { 202 "component": "ak-stage-authenticator-validate", 203 "selected_challenge": { 204 "device_class": "email", 205 "device_uid": str(device.pk), 206 "challenge": {}, 207 "last_used": None, 208 }, 209 }, 210 ) 211 212 def test_bad_code_then_correct_code_is_still_blocked(self): 213 """After a bad code over HTTP, a subsequent correct code is still rejected 214 because the lockout persists in the database.""" 215 device = EmailDevice.objects.create( 216 user=self.user, 217 confirmed=True, 218 stage=self.email_stage, 219 email="throttle-flow@authentik.local", 220 ) 221 self._identify() 222 self._select_email(device) 223 # Server generated and stored the token - grab it from DB. 224 device.refresh_from_db() 225 token = device.token 226 # First attempt: bad code - must increment the DB counter. 227 self.client.post( 228 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 229 {"component": "ak-stage-authenticator-validate", "code": "000000"}, 230 ) 231 device.refresh_from_db() 232 self.assertEqual(device.throttling_failure_count, 1) 233 self.assertEqual(device.token, token) 234 # Second attempt with the correct token - still blocked. 235 response = self.client.post( 236 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 237 {"component": "ak-stage-authenticator-validate", "code": token}, 238 ) 239 self.assertStageResponse( 240 response, 241 flow=self.flow, 242 component="ak-stage-authenticator-validate", 243 ) 244 device.refresh_from_db() 245 # Counter wasn't incremented on a blocked attempt 246 self.assertEqual(device.throttling_failure_count, 1) 247 # Token wasn't consumed. 248 self.assertEqual(device.token, token)
End-to-end lockout behavior through the flow executor HTTP API.
def
setUp(self) -> None:
168 def setUp(self) -> None: 169 super().setUp() 170 self.user = create_test_admin_user() 171 self.email_stage = AuthenticatorEmailStage.objects.create( 172 name="email-stage-flow-throttle", 173 use_global_settings=True, 174 from_address="test@authentik.local", 175 token_expiry="minutes=30", 176 ) # nosec 177 self.ident_stage = IdentificationStage.objects.create( 178 name=generate_id(), 179 user_fields=[UserFields.USERNAME], 180 ) 181 self.validate_stage = AuthenticatorValidateStage.objects.create( 182 name=generate_id(), 183 device_classes=[DeviceClasses.EMAIL], 184 email_otp_throttling_factor=1, 185 ) 186 self.flow = create_test_flow() 187 FlowStageBinding.objects.create(target=self.flow, stage=self.ident_stage, order=0) 188 FlowStageBinding.objects.create(target=self.flow, stage=self.validate_stage, order=1)
Hook method for setting up the test fixture before exercising it.
def
test_bad_code_then_correct_code_is_still_blocked(self):
212 def test_bad_code_then_correct_code_is_still_blocked(self): 213 """After a bad code over HTTP, a subsequent correct code is still rejected 214 because the lockout persists in the database.""" 215 device = EmailDevice.objects.create( 216 user=self.user, 217 confirmed=True, 218 stage=self.email_stage, 219 email="throttle-flow@authentik.local", 220 ) 221 self._identify() 222 self._select_email(device) 223 # Server generated and stored the token - grab it from DB. 224 device.refresh_from_db() 225 token = device.token 226 # First attempt: bad code - must increment the DB counter. 227 self.client.post( 228 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 229 {"component": "ak-stage-authenticator-validate", "code": "000000"}, 230 ) 231 device.refresh_from_db() 232 self.assertEqual(device.throttling_failure_count, 1) 233 self.assertEqual(device.token, token) 234 # Second attempt with the correct token - still blocked. 235 response = self.client.post( 236 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 237 {"component": "ak-stage-authenticator-validate", "code": token}, 238 ) 239 self.assertStageResponse( 240 response, 241 flow=self.flow, 242 component="ak-stage-authenticator-validate", 243 ) 244 device.refresh_from_db() 245 # Counter wasn't incremented on a blocked attempt 246 self.assertEqual(device.throttling_failure_count, 1) 247 # Token wasn't consumed. 248 self.assertEqual(device.token, token)
After a bad code over HTTP, a subsequent correct code is still rejected because the lockout persists in the database.