authentik.stages.authenticator_validate.tests.test_throttling

  1from django.test import TestCase
  2from django.test.client import RequestFactory
  3from django.urls.base import reverse
  4from rest_framework.exceptions import ValidationError
  5
  6from authentik.core.tests.utils import create_test_admin_user, create_test_flow
  7from authentik.flows.models import FlowStageBinding
  8from authentik.flows.stage import StageView
  9from authentik.flows.tests import FlowTestCase
 10from authentik.flows.views.executor import FlowExecutorView
 11from authentik.lib.generators import generate_id
 12from authentik.stages.authenticator_email.models import AuthenticatorEmailStage, EmailDevice
 13from authentik.stages.authenticator_sms.models import (
 14    AuthenticatorSMSStage,
 15    SMSDevice,
 16    SMSProviders,
 17)
 18from authentik.stages.authenticator_validate.challenge import validate_challenge_code
 19from authentik.stages.authenticator_validate.models import (
 20    AuthenticatorValidateStage,
 21    DeviceClasses,
 22)
 23from authentik.stages.identification.models import IdentificationStage, UserFields
 24
 25
 26class DeviceClassesHelperTests(TestCase):
 27    """Tests for the DeviceClasses.from_model_label helper."""
 28
 29    def test_from_model_label_all_classes(self):
 30        cases = {
 31            "authentik_stages_authenticator_email.emaildevice": DeviceClasses.EMAIL,
 32            "authentik_stages_authenticator_sms.smsdevice": DeviceClasses.SMS,
 33            "authentik_stages_authenticator_totp.totpdevice": DeviceClasses.TOTP,
 34            "authentik_stages_authenticator_static.staticdevice": DeviceClasses.STATIC,
 35            "authentik_stages_authenticator_duo.duodevice": DeviceClasses.DUO,
 36            "authentik_stages_authenticator_webauthn.webauthndevice": DeviceClasses.WEBAUTHN,
 37        }
 38        for label, expected in cases.items():
 39            with self.subTest(label=label):
 40                self.assertEqual(DeviceClasses.from_model_label(label), expected)
 41
 42
 43class AuthenticatorValidateStageFactorTests(TestCase):
 44    """Tests for AuthenticatorValidateStage.get_throttling_factor."""
 45
 46    def test_per_class_factors_returned(self):
 47        stage = AuthenticatorValidateStage.objects.create(
 48            name=generate_id(),
 49            email_otp_throttling_factor=5,
 50            sms_otp_throttling_factor=6,
 51            totp_otp_throttling_factor=7,
 52            static_otp_throttling_factor=8,
 53        )
 54        self.assertEqual(stage.get_throttling_factor(DeviceClasses.EMAIL), 5)
 55        self.assertEqual(stage.get_throttling_factor(DeviceClasses.SMS), 6)
 56        self.assertEqual(stage.get_throttling_factor(DeviceClasses.TOTP), 7)
 57        self.assertEqual(stage.get_throttling_factor(DeviceClasses.STATIC), 8)
 58
 59    def test_no_factor_for_webauthn_or_duo(self):
 60        stage = AuthenticatorValidateStage.objects.create(name=generate_id())
 61        self.assertIsNone(stage.get_throttling_factor(DeviceClasses.WEBAUTHN))
 62        self.assertIsNone(stage.get_throttling_factor(DeviceClasses.DUO))
 63
 64
 65class ValidateChallengeCodeThrottlingTests(FlowTestCase):
 66    """Tests for validate_challenge_code throttling behavior."""
 67
 68    def setUp(self) -> None:
 69        super().setUp()
 70        self.user = create_test_admin_user()
 71        self.request_factory = RequestFactory()
 72        self.email_stage = AuthenticatorEmailStage.objects.create(
 73            name="email-stage-validate-throttle",
 74            use_global_settings=True,
 75            from_address="test@authentik.local",
 76            token_expiry="minutes=30",
 77        )  # nosec
 78        self.sms_stage = AuthenticatorSMSStage.objects.create(
 79            name="sms-stage-validate-throttle",
 80            provider=SMSProviders.GENERIC,
 81            from_number="1234",
 82        )
 83
 84    def _validate_stage(self, **factors) -> AuthenticatorValidateStage:
 85        return AuthenticatorValidateStage.objects.create(
 86            name=generate_id(),
 87            device_classes=[
 88                DeviceClasses.EMAIL,
 89                DeviceClasses.SMS,
 90                DeviceClasses.TOTP,
 91                DeviceClasses.STATIC,
 92            ],
 93            **factors,
 94        )
 95
 96    def _stage_view(self, validate_stage: AuthenticatorValidateStage) -> StageView:
 97        request = self.request_factory.get("/")
 98        return StageView(FlowExecutorView(current_stage=validate_stage), request=request)
 99
100    def _email_device(self, email: str = "throttle@authentik.local") -> EmailDevice:
101        return EmailDevice.objects.create(
102            user=self.user,
103            stage=self.email_stage,
104            confirmed=True,
105            email=email,
106        )
107
108    def _sms_device(self, phone_number: str = "+15551230101") -> SMSDevice:
109        return SMSDevice.objects.create(
110            user=self.user,
111            stage=self.sms_stage,
112            confirmed=True,
113            phone_number=phone_number,
114        )
115
116    def test_stage_factor_applied_to_email_device(self):
117        """The stage's email_otp_throttling_factor is pushed onto the device before verify."""
118        stage = self._validate_stage(email_otp_throttling_factor=3)
119        device = self._email_device()
120        device.generate_token()
121        with self.assertRaises(ValidationError):
122            validate_challenge_code("000000", self._stage_view(stage), self.user)
123        device.refresh_from_db()
124        self.assertEqual(device.throttling_failure_count, 1)
125        # verify_is_allowed must compute the delay using factor=3 (3 * 2^0 = 3s).
126        device.set_throttle_factor(3)
127        allowed, data = device.verify_is_allowed()
128        self.assertFalse(allowed)
129        required = data["locked_until"] - device.throttling_failure_timestamp
130        self.assertAlmostEqual(required.total_seconds(), 3, places=3)
131
132    def test_factor_zero_disables_throttling_end_to_end(self):
133        """With email_otp_throttling_factor=0, repeated failures do not lock the device."""
134        stage = self._validate_stage(email_otp_throttling_factor=0)
135        device = self._email_device()
136        device.generate_token()
137        token = device.token
138        for _ in range(10):
139            with self.assertRaises(ValidationError):
140                validate_challenge_code("000000", self._stage_view(stage), self.user)
141        matched = validate_challenge_code(token, self._stage_view(stage), self.user)
142        self.assertEqual(matched.pk, device.pk)
143
144    def test_lockout_persists_across_calls(self):
145        """
146        A correct token on the second call is still blocked and does not increment the counter.
147        """
148        stage = self._validate_stage(email_otp_throttling_factor=1)
149        device = self._email_device()
150        device.generate_token()
151        token = device.token
152        invalid_token = "000000" if token != "000000" else "111111"  # nosec
153        with self.assertRaises(ValidationError):
154            validate_challenge_code(invalid_token, self._stage_view(stage), self.user)
155        # Immediately try with the correct token: lockout still active, attempt must be rejected.
156        with self.assertRaises(ValidationError):
157            validate_challenge_code(token, self._stage_view(stage), self.user)
158        device.refresh_from_db()
159        # Token wasn't consumed (verification never ran), and counter didn't get incremented.
160        self.assertEqual(device.token, token)
161        self.assertEqual(device.throttling_failure_count, 1)
162
163
164class ValidateStageThrottlingFlowTests(FlowTestCase):
165    """End-to-end lockout behavior through the flow executor HTTP API."""
166
167    def setUp(self) -> None:
168        super().setUp()
169        self.user = create_test_admin_user()
170        self.email_stage = AuthenticatorEmailStage.objects.create(
171            name="email-stage-flow-throttle",
172            use_global_settings=True,
173            from_address="test@authentik.local",
174            token_expiry="minutes=30",
175        )  # nosec
176        self.ident_stage = IdentificationStage.objects.create(
177            name=generate_id(),
178            user_fields=[UserFields.USERNAME],
179        )
180        self.validate_stage = AuthenticatorValidateStage.objects.create(
181            name=generate_id(),
182            device_classes=[DeviceClasses.EMAIL],
183            email_otp_throttling_factor=1,
184        )
185        self.flow = create_test_flow()
186        FlowStageBinding.objects.create(target=self.flow, stage=self.ident_stage, order=0)
187        FlowStageBinding.objects.create(target=self.flow, stage=self.validate_stage, order=1)
188
189    def _identify(self):
190        response = self.client.post(
191            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
192            {"uid_field": self.user.username},
193            follow=True,
194        )
195        self.assertEqual(response.status_code, 200)
196
197    def _select_email(self, device: EmailDevice):
198        self.client.post(
199            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
200            {
201                "component": "ak-stage-authenticator-validate",
202                "selected_challenge": {
203                    "device_class": "email",
204                    "device_uid": str(device.pk),
205                    "challenge": {},
206                    "last_used": None,
207                },
208            },
209        )
210
211    def test_bad_code_then_correct_code_is_still_blocked(self):
212        """After a bad code over HTTP, a subsequent correct code is still rejected
213        because the lockout persists in the database."""
214        device = EmailDevice.objects.create(
215            user=self.user,
216            confirmed=True,
217            stage=self.email_stage,
218            email="throttle-flow@authentik.local",
219        )
220        self._identify()
221        self._select_email(device)
222        # Server generated and stored the token - grab it from DB.
223        device.refresh_from_db()
224        token = device.token
225        # First attempt: bad code - must increment the DB counter.
226        self.client.post(
227            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
228            {"component": "ak-stage-authenticator-validate", "code": "000000"},
229        )
230        device.refresh_from_db()
231        self.assertEqual(device.throttling_failure_count, 1)
232        self.assertEqual(device.token, token)
233        # Second attempt with the correct token - still blocked.
234        response = self.client.post(
235            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
236            {"component": "ak-stage-authenticator-validate", "code": token},
237        )
238        self.assertStageResponse(
239            response,
240            flow=self.flow,
241            component="ak-stage-authenticator-validate",
242        )
243        device.refresh_from_db()
244        # Counter wasn't incremented on a blocked attempt
245        self.assertEqual(device.throttling_failure_count, 1)
246        # Token wasn't consumed.
247        self.assertEqual(device.token, token)
class DeviceClassesHelperTests(django.test.testcases.TestCase):
27class DeviceClassesHelperTests(TestCase):
28    """Tests for the DeviceClasses.from_model_label helper."""
29
30    def test_from_model_label_all_classes(self):
31        cases = {
32            "authentik_stages_authenticator_email.emaildevice": DeviceClasses.EMAIL,
33            "authentik_stages_authenticator_sms.smsdevice": DeviceClasses.SMS,
34            "authentik_stages_authenticator_totp.totpdevice": DeviceClasses.TOTP,
35            "authentik_stages_authenticator_static.staticdevice": DeviceClasses.STATIC,
36            "authentik_stages_authenticator_duo.duodevice": DeviceClasses.DUO,
37            "authentik_stages_authenticator_webauthn.webauthndevice": DeviceClasses.WEBAUTHN,
38        }
39        for label, expected in cases.items():
40            with self.subTest(label=label):
41                self.assertEqual(DeviceClasses.from_model_label(label), expected)

Tests for the DeviceClasses.from_model_label helper.

def test_from_model_label_all_classes(self):
30    def test_from_model_label_all_classes(self):
31        cases = {
32            "authentik_stages_authenticator_email.emaildevice": DeviceClasses.EMAIL,
33            "authentik_stages_authenticator_sms.smsdevice": DeviceClasses.SMS,
34            "authentik_stages_authenticator_totp.totpdevice": DeviceClasses.TOTP,
35            "authentik_stages_authenticator_static.staticdevice": DeviceClasses.STATIC,
36            "authentik_stages_authenticator_duo.duodevice": DeviceClasses.DUO,
37            "authentik_stages_authenticator_webauthn.webauthndevice": DeviceClasses.WEBAUTHN,
38        }
39        for label, expected in cases.items():
40            with self.subTest(label=label):
41                self.assertEqual(DeviceClasses.from_model_label(label), expected)
class AuthenticatorValidateStageFactorTests(django.test.testcases.TestCase):
44class AuthenticatorValidateStageFactorTests(TestCase):
45    """Tests for AuthenticatorValidateStage.get_throttling_factor."""
46
47    def test_per_class_factors_returned(self):
48        stage = AuthenticatorValidateStage.objects.create(
49            name=generate_id(),
50            email_otp_throttling_factor=5,
51            sms_otp_throttling_factor=6,
52            totp_otp_throttling_factor=7,
53            static_otp_throttling_factor=8,
54        )
55        self.assertEqual(stage.get_throttling_factor(DeviceClasses.EMAIL), 5)
56        self.assertEqual(stage.get_throttling_factor(DeviceClasses.SMS), 6)
57        self.assertEqual(stage.get_throttling_factor(DeviceClasses.TOTP), 7)
58        self.assertEqual(stage.get_throttling_factor(DeviceClasses.STATIC), 8)
59
60    def test_no_factor_for_webauthn_or_duo(self):
61        stage = AuthenticatorValidateStage.objects.create(name=generate_id())
62        self.assertIsNone(stage.get_throttling_factor(DeviceClasses.WEBAUTHN))
63        self.assertIsNone(stage.get_throttling_factor(DeviceClasses.DUO))

Tests for AuthenticatorValidateStage.get_throttling_factor.

def test_per_class_factors_returned(self):
47    def test_per_class_factors_returned(self):
48        stage = AuthenticatorValidateStage.objects.create(
49            name=generate_id(),
50            email_otp_throttling_factor=5,
51            sms_otp_throttling_factor=6,
52            totp_otp_throttling_factor=7,
53            static_otp_throttling_factor=8,
54        )
55        self.assertEqual(stage.get_throttling_factor(DeviceClasses.EMAIL), 5)
56        self.assertEqual(stage.get_throttling_factor(DeviceClasses.SMS), 6)
57        self.assertEqual(stage.get_throttling_factor(DeviceClasses.TOTP), 7)
58        self.assertEqual(stage.get_throttling_factor(DeviceClasses.STATIC), 8)
def test_no_factor_for_webauthn_or_duo(self):
60    def test_no_factor_for_webauthn_or_duo(self):
61        stage = AuthenticatorValidateStage.objects.create(name=generate_id())
62        self.assertIsNone(stage.get_throttling_factor(DeviceClasses.WEBAUTHN))
63        self.assertIsNone(stage.get_throttling_factor(DeviceClasses.DUO))
class ValidateChallengeCodeThrottlingTests(authentik.flows.tests.FlowTestCase):
 66class ValidateChallengeCodeThrottlingTests(FlowTestCase):
 67    """Tests for validate_challenge_code throttling behavior."""
 68
 69    def setUp(self) -> None:
 70        super().setUp()
 71        self.user = create_test_admin_user()
 72        self.request_factory = RequestFactory()
 73        self.email_stage = AuthenticatorEmailStage.objects.create(
 74            name="email-stage-validate-throttle",
 75            use_global_settings=True,
 76            from_address="test@authentik.local",
 77            token_expiry="minutes=30",
 78        )  # nosec
 79        self.sms_stage = AuthenticatorSMSStage.objects.create(
 80            name="sms-stage-validate-throttle",
 81            provider=SMSProviders.GENERIC,
 82            from_number="1234",
 83        )
 84
 85    def _validate_stage(self, **factors) -> AuthenticatorValidateStage:
 86        return AuthenticatorValidateStage.objects.create(
 87            name=generate_id(),
 88            device_classes=[
 89                DeviceClasses.EMAIL,
 90                DeviceClasses.SMS,
 91                DeviceClasses.TOTP,
 92                DeviceClasses.STATIC,
 93            ],
 94            **factors,
 95        )
 96
 97    def _stage_view(self, validate_stage: AuthenticatorValidateStage) -> StageView:
 98        request = self.request_factory.get("/")
 99        return StageView(FlowExecutorView(current_stage=validate_stage), request=request)
100
101    def _email_device(self, email: str = "throttle@authentik.local") -> EmailDevice:
102        return EmailDevice.objects.create(
103            user=self.user,
104            stage=self.email_stage,
105            confirmed=True,
106            email=email,
107        )
108
109    def _sms_device(self, phone_number: str = "+15551230101") -> SMSDevice:
110        return SMSDevice.objects.create(
111            user=self.user,
112            stage=self.sms_stage,
113            confirmed=True,
114            phone_number=phone_number,
115        )
116
117    def test_stage_factor_applied_to_email_device(self):
118        """The stage's email_otp_throttling_factor is pushed onto the device before verify."""
119        stage = self._validate_stage(email_otp_throttling_factor=3)
120        device = self._email_device()
121        device.generate_token()
122        with self.assertRaises(ValidationError):
123            validate_challenge_code("000000", self._stage_view(stage), self.user)
124        device.refresh_from_db()
125        self.assertEqual(device.throttling_failure_count, 1)
126        # verify_is_allowed must compute the delay using factor=3 (3 * 2^0 = 3s).
127        device.set_throttle_factor(3)
128        allowed, data = device.verify_is_allowed()
129        self.assertFalse(allowed)
130        required = data["locked_until"] - device.throttling_failure_timestamp
131        self.assertAlmostEqual(required.total_seconds(), 3, places=3)
132
133    def test_factor_zero_disables_throttling_end_to_end(self):
134        """With email_otp_throttling_factor=0, repeated failures do not lock the device."""
135        stage = self._validate_stage(email_otp_throttling_factor=0)
136        device = self._email_device()
137        device.generate_token()
138        token = device.token
139        for _ in range(10):
140            with self.assertRaises(ValidationError):
141                validate_challenge_code("000000", self._stage_view(stage), self.user)
142        matched = validate_challenge_code(token, self._stage_view(stage), self.user)
143        self.assertEqual(matched.pk, device.pk)
144
145    def test_lockout_persists_across_calls(self):
146        """
147        A correct token on the second call is still blocked and does not increment the counter.
148        """
149        stage = self._validate_stage(email_otp_throttling_factor=1)
150        device = self._email_device()
151        device.generate_token()
152        token = device.token
153        invalid_token = "000000" if token != "000000" else "111111"  # nosec
154        with self.assertRaises(ValidationError):
155            validate_challenge_code(invalid_token, self._stage_view(stage), self.user)
156        # Immediately try with the correct token: lockout still active, attempt must be rejected.
157        with self.assertRaises(ValidationError):
158            validate_challenge_code(token, self._stage_view(stage), self.user)
159        device.refresh_from_db()
160        # Token wasn't consumed (verification never ran), and counter didn't get incremented.
161        self.assertEqual(device.token, token)
162        self.assertEqual(device.throttling_failure_count, 1)

Tests for validate_challenge_code throttling behavior.

def setUp(self) -> None:
69    def setUp(self) -> None:
70        super().setUp()
71        self.user = create_test_admin_user()
72        self.request_factory = RequestFactory()
73        self.email_stage = AuthenticatorEmailStage.objects.create(
74            name="email-stage-validate-throttle",
75            use_global_settings=True,
76            from_address="test@authentik.local",
77            token_expiry="minutes=30",
78        )  # nosec
79        self.sms_stage = AuthenticatorSMSStage.objects.create(
80            name="sms-stage-validate-throttle",
81            provider=SMSProviders.GENERIC,
82            from_number="1234",
83        )

Hook method for setting up the test fixture before exercising it.

def test_stage_factor_applied_to_email_device(self):
117    def test_stage_factor_applied_to_email_device(self):
118        """The stage's email_otp_throttling_factor is pushed onto the device before verify."""
119        stage = self._validate_stage(email_otp_throttling_factor=3)
120        device = self._email_device()
121        device.generate_token()
122        with self.assertRaises(ValidationError):
123            validate_challenge_code("000000", self._stage_view(stage), self.user)
124        device.refresh_from_db()
125        self.assertEqual(device.throttling_failure_count, 1)
126        # verify_is_allowed must compute the delay using factor=3 (3 * 2^0 = 3s).
127        device.set_throttle_factor(3)
128        allowed, data = device.verify_is_allowed()
129        self.assertFalse(allowed)
130        required = data["locked_until"] - device.throttling_failure_timestamp
131        self.assertAlmostEqual(required.total_seconds(), 3, places=3)

The stage's email_otp_throttling_factor is pushed onto the device before verify.

def test_factor_zero_disables_throttling_end_to_end(self):
133    def test_factor_zero_disables_throttling_end_to_end(self):
134        """With email_otp_throttling_factor=0, repeated failures do not lock the device."""
135        stage = self._validate_stage(email_otp_throttling_factor=0)
136        device = self._email_device()
137        device.generate_token()
138        token = device.token
139        for _ in range(10):
140            with self.assertRaises(ValidationError):
141                validate_challenge_code("000000", self._stage_view(stage), self.user)
142        matched = validate_challenge_code(token, self._stage_view(stage), self.user)
143        self.assertEqual(matched.pk, device.pk)

With email_otp_throttling_factor=0, repeated failures do not lock the device.

def test_lockout_persists_across_calls(self):
145    def test_lockout_persists_across_calls(self):
146        """
147        A correct token on the second call is still blocked and does not increment the counter.
148        """
149        stage = self._validate_stage(email_otp_throttling_factor=1)
150        device = self._email_device()
151        device.generate_token()
152        token = device.token
153        invalid_token = "000000" if token != "000000" else "111111"  # nosec
154        with self.assertRaises(ValidationError):
155            validate_challenge_code(invalid_token, self._stage_view(stage), self.user)
156        # Immediately try with the correct token: lockout still active, attempt must be rejected.
157        with self.assertRaises(ValidationError):
158            validate_challenge_code(token, self._stage_view(stage), self.user)
159        device.refresh_from_db()
160        # Token wasn't consumed (verification never ran), and counter didn't get incremented.
161        self.assertEqual(device.token, token)
162        self.assertEqual(device.throttling_failure_count, 1)

A correct token on the second call is still blocked and does not increment the counter.

class ValidateStageThrottlingFlowTests(authentik.flows.tests.FlowTestCase):
165class ValidateStageThrottlingFlowTests(FlowTestCase):
166    """End-to-end lockout behavior through the flow executor HTTP API."""
167
168    def setUp(self) -> None:
169        super().setUp()
170        self.user = create_test_admin_user()
171        self.email_stage = AuthenticatorEmailStage.objects.create(
172            name="email-stage-flow-throttle",
173            use_global_settings=True,
174            from_address="test@authentik.local",
175            token_expiry="minutes=30",
176        )  # nosec
177        self.ident_stage = IdentificationStage.objects.create(
178            name=generate_id(),
179            user_fields=[UserFields.USERNAME],
180        )
181        self.validate_stage = AuthenticatorValidateStage.objects.create(
182            name=generate_id(),
183            device_classes=[DeviceClasses.EMAIL],
184            email_otp_throttling_factor=1,
185        )
186        self.flow = create_test_flow()
187        FlowStageBinding.objects.create(target=self.flow, stage=self.ident_stage, order=0)
188        FlowStageBinding.objects.create(target=self.flow, stage=self.validate_stage, order=1)
189
190    def _identify(self):
191        response = self.client.post(
192            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
193            {"uid_field": self.user.username},
194            follow=True,
195        )
196        self.assertEqual(response.status_code, 200)
197
198    def _select_email(self, device: EmailDevice):
199        self.client.post(
200            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
201            {
202                "component": "ak-stage-authenticator-validate",
203                "selected_challenge": {
204                    "device_class": "email",
205                    "device_uid": str(device.pk),
206                    "challenge": {},
207                    "last_used": None,
208                },
209            },
210        )
211
212    def test_bad_code_then_correct_code_is_still_blocked(self):
213        """After a bad code over HTTP, a subsequent correct code is still rejected
214        because the lockout persists in the database."""
215        device = EmailDevice.objects.create(
216            user=self.user,
217            confirmed=True,
218            stage=self.email_stage,
219            email="throttle-flow@authentik.local",
220        )
221        self._identify()
222        self._select_email(device)
223        # Server generated and stored the token - grab it from DB.
224        device.refresh_from_db()
225        token = device.token
226        # First attempt: bad code - must increment the DB counter.
227        self.client.post(
228            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
229            {"component": "ak-stage-authenticator-validate", "code": "000000"},
230        )
231        device.refresh_from_db()
232        self.assertEqual(device.throttling_failure_count, 1)
233        self.assertEqual(device.token, token)
234        # Second attempt with the correct token - still blocked.
235        response = self.client.post(
236            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
237            {"component": "ak-stage-authenticator-validate", "code": token},
238        )
239        self.assertStageResponse(
240            response,
241            flow=self.flow,
242            component="ak-stage-authenticator-validate",
243        )
244        device.refresh_from_db()
245        # Counter wasn't incremented on a blocked attempt
246        self.assertEqual(device.throttling_failure_count, 1)
247        # Token wasn't consumed.
248        self.assertEqual(device.token, token)

End-to-end lockout behavior through the flow executor HTTP API.

def setUp(self) -> None:
168    def setUp(self) -> None:
169        super().setUp()
170        self.user = create_test_admin_user()
171        self.email_stage = AuthenticatorEmailStage.objects.create(
172            name="email-stage-flow-throttle",
173            use_global_settings=True,
174            from_address="test@authentik.local",
175            token_expiry="minutes=30",
176        )  # nosec
177        self.ident_stage = IdentificationStage.objects.create(
178            name=generate_id(),
179            user_fields=[UserFields.USERNAME],
180        )
181        self.validate_stage = AuthenticatorValidateStage.objects.create(
182            name=generate_id(),
183            device_classes=[DeviceClasses.EMAIL],
184            email_otp_throttling_factor=1,
185        )
186        self.flow = create_test_flow()
187        FlowStageBinding.objects.create(target=self.flow, stage=self.ident_stage, order=0)
188        FlowStageBinding.objects.create(target=self.flow, stage=self.validate_stage, order=1)

Hook method for setting up the test fixture before exercising it.

def test_bad_code_then_correct_code_is_still_blocked(self):
212    def test_bad_code_then_correct_code_is_still_blocked(self):
213        """After a bad code over HTTP, a subsequent correct code is still rejected
214        because the lockout persists in the database."""
215        device = EmailDevice.objects.create(
216            user=self.user,
217            confirmed=True,
218            stage=self.email_stage,
219            email="throttle-flow@authentik.local",
220        )
221        self._identify()
222        self._select_email(device)
223        # Server generated and stored the token - grab it from DB.
224        device.refresh_from_db()
225        token = device.token
226        # First attempt: bad code - must increment the DB counter.
227        self.client.post(
228            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
229            {"component": "ak-stage-authenticator-validate", "code": "000000"},
230        )
231        device.refresh_from_db()
232        self.assertEqual(device.throttling_failure_count, 1)
233        self.assertEqual(device.token, token)
234        # Second attempt with the correct token - still blocked.
235        response = self.client.post(
236            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
237            {"component": "ak-stage-authenticator-validate", "code": token},
238        )
239        self.assertStageResponse(
240            response,
241            flow=self.flow,
242            component="ak-stage-authenticator-validate",
243        )
244        device.refresh_from_db()
245        # Counter wasn't incremented on a blocked attempt
246        self.assertEqual(device.throttling_failure_count, 1)
247        # Token wasn't consumed.
248        self.assertEqual(device.token, token)

After a bad code over HTTP, a subsequent correct code is still rejected because the lockout persists in the database.