authentik.stages.authenticator_sms.tests

Test SMS API

  1"""Test SMS API"""
  2
  3from unittest.mock import MagicMock, patch
  4from urllib.parse import parse_qsl
  5
  6from django.test import TestCase
  7from django.urls import reverse
  8from requests_mock import Mocker
  9
 10from authentik.core.tests.utils import create_test_admin_user, create_test_flow
 11from authentik.flows.models import FlowStageBinding
 12from authentik.flows.planner import FlowPlan
 13from authentik.flows.tests import FlowTestCase
 14from authentik.flows.views.executor import SESSION_KEY_PLAN
 15from authentik.lib.generators import generate_id
 16from authentik.stages.authenticator.tests import ThrottlingTestMixin
 17from authentik.stages.authenticator_sms.models import (
 18    AuthenticatorSMSStage,
 19    SMSDevice,
 20    SMSProviders,
 21    hash_phone_number,
 22)
 23from authentik.stages.authenticator_sms.stage import PLAN_CONTEXT_PHONE, PLAN_CONTEXT_SMS_DEVICE
 24from authentik.stages.prompt.stage import PLAN_CONTEXT_PROMPT
 25
 26
 27class AuthenticatorSMSStageTests(FlowTestCase):
 28    """Test SMS API"""
 29
 30    def setUp(self) -> None:
 31        super().setUp()
 32        self.flow = create_test_flow()
 33        self.stage: AuthenticatorSMSStage = AuthenticatorSMSStage.objects.create(
 34            name="foo",
 35            provider=SMSProviders.TWILIO,
 36            configure_flow=self.flow,
 37        )
 38        FlowStageBinding.objects.create(target=self.flow, stage=self.stage, order=0)
 39        self.user = create_test_admin_user()
 40        self.client.force_login(self.user)
 41
 42    def test_stage_no_prefill(self):
 43        """test stage"""
 44        self.client.get(
 45            reverse("authentik_flows:configure", kwargs={"stage_uuid": self.stage.stage_uuid}),
 46        )
 47        response = self.client.get(
 48            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
 49        )
 50        self.assertStageResponse(
 51            response,
 52            self.flow,
 53            self.user,
 54            component="ak-stage-authenticator-sms",
 55            phone_number_required=True,
 56        )
 57
 58    def test_stage_submit(self):
 59        """test stage (submit)"""
 60        self.client.get(
 61            reverse("authentik_flows:configure", kwargs={"stage_uuid": self.stage.stage_uuid}),
 62        )
 63        response = self.client.get(
 64            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
 65        )
 66        self.assertStageResponse(
 67            response,
 68            self.flow,
 69            self.user,
 70            component="ak-stage-authenticator-sms",
 71            phone_number_required=True,
 72        )
 73        sms_send_mock = MagicMock()
 74        with patch(
 75            "authentik.stages.authenticator_sms.models.AuthenticatorSMSStage.send",
 76            sms_send_mock,
 77        ):
 78            response = self.client.post(
 79                reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
 80                data={"component": "ak-stage-authenticator-sms", "phone_number": "foo"},
 81            )
 82            self.assertEqual(response.status_code, 200)
 83            sms_send_mock.assert_called_once()
 84        self.assertStageResponse(
 85            response,
 86            self.flow,
 87            self.user,
 88            component="ak-stage-authenticator-sms",
 89            response_errors={},
 90            phone_number_required=False,
 91        )
 92
 93    def test_stage_submit_twilio(self):
 94        """test stage (submit) (twilio)"""
 95        self.stage.account_sid = generate_id()
 96        self.stage.auth = generate_id()
 97        self.stage.from_number = generate_id()
 98        self.stage.save()
 99        self.client.get(
100            reverse("authentik_flows:configure", kwargs={"stage_uuid": self.stage.stage_uuid}),
101        )
102        response = self.client.get(
103            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
104        )
105        self.assertStageResponse(
106            response,
107            self.flow,
108            self.user,
109            component="ak-stage-authenticator-sms",
110            phone_number_required=True,
111        )
112        number = generate_id()
113
114        with Mocker() as mocker:
115            mocker.post(
116                (
117                    "https://api.twilio.com/2010-04-01/Accounts/"
118                    f"{self.stage.account_sid}/Messages.json"
119                ),
120                json={},
121            )
122            response = self.client.post(
123                reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
124                data={"component": "ak-stage-authenticator-sms", "phone_number": number},
125            )
126            self.assertEqual(response.status_code, 200)
127            self.assertEqual(mocker.call_count, 1)
128            self.assertEqual(mocker.request_history[0].method, "POST")
129            request_body = dict(parse_qsl(mocker.request_history[0].body))
130            device: SMSDevice = self.get_flow_plan().context[PLAN_CONTEXT_SMS_DEVICE]
131            self.assertEqual(
132                request_body,
133                {
134                    "To": number,
135                    "From": self.stage.from_number,
136                    "Body": f"Use this code to authenticate in authentik: {device.token}",
137                },
138            )
139        self.assertStageResponse(
140            response,
141            self.flow,
142            self.user,
143            component="ak-stage-authenticator-sms",
144            response_errors={},
145            phone_number_required=False,
146        )
147
148    def test_stage_context_data(self):
149        """test stage context data"""
150        self.client.get(
151            reverse("authentik_flows:configure", kwargs={"stage_uuid": self.stage.stage_uuid}),
152        )
153        sms_send_mock = MagicMock()
154        with (
155            patch(
156                (
157                    "authentik.stages.authenticator_sms.stage."
158                    "AuthenticatorSMSStageView._has_phone_number"
159                ),
160                MagicMock(
161                    return_value="1234",
162                ),
163            ),
164            patch(
165                "authentik.stages.authenticator_sms.models.AuthenticatorSMSStage.send",
166                sms_send_mock,
167            ),
168        ):
169            response = self.client.get(
170                reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
171            )
172            sms_send_mock.assert_called_once()
173        self.assertStageResponse(
174            response,
175            self.flow,
176            self.user,
177            component="ak-stage-authenticator-sms",
178            phone_number_required=False,
179        )
180
181    def test_stage_context_data_duplicate(self):
182        """test stage context data (phone number exists already)"""
183        self.client.get(
184            reverse("authentik_flows:configure", kwargs={"stage_uuid": self.stage.stage_uuid}),
185        )
186        plan: FlowPlan = self.client.session[SESSION_KEY_PLAN]
187        plan.context[PLAN_CONTEXT_PROMPT] = {
188            PLAN_CONTEXT_PHONE: "1234",
189        }
190        session = self.client.session
191        session[SESSION_KEY_PLAN] = plan
192        session.save()
193        SMSDevice.objects.create(
194            phone_number="1234",
195            user=self.user,
196            stage=self.stage,
197        )
198        sms_send_mock = MagicMock()
199        with (
200            patch(
201                "authentik.stages.authenticator_sms.models.AuthenticatorSMSStage.send",
202                sms_send_mock,
203            ),
204        ):
205            response = self.client.get(
206                reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
207            )
208        self.assertStageResponse(
209            response,
210            self.flow,
211            self.user,
212            component="ak-stage-authenticator-sms",
213            phone_number_required=True,
214        )
215        plan: FlowPlan = self.client.session[SESSION_KEY_PLAN]
216        self.assertEqual(plan.context[PLAN_CONTEXT_PROMPT], {})
217
218    def test_stage_submit_full(self):
219        """test stage (submit)"""
220        self.client.get(
221            reverse("authentik_flows:configure", kwargs={"stage_uuid": self.stage.stage_uuid}),
222        )
223        response = self.client.get(
224            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
225        )
226        self.assertStageResponse(
227            response,
228            self.flow,
229            self.user,
230            component="ak-stage-authenticator-sms",
231            phone_number_required=True,
232        )
233        sms_send_mock = MagicMock()
234        with patch(
235            "authentik.stages.authenticator_sms.models.AuthenticatorSMSStage.send",
236            sms_send_mock,
237        ):
238            response = self.client.post(
239                reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
240                data={"component": "ak-stage-authenticator-sms", "phone_number": "foo"},
241            )
242            self.assertEqual(response.status_code, 200)
243            sms_send_mock.assert_called_once()
244        self.assertStageResponse(
245            response,
246            self.flow,
247            self.user,
248            component="ak-stage-authenticator-sms",
249            response_errors={},
250            phone_number_required=False,
251        )
252        with patch(
253            "authentik.stages.authenticator_sms.models.SMSDevice.verify_token",
254            MagicMock(return_value=True),
255        ):
256            response = self.client.post(
257                reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
258                data={
259                    "component": "ak-stage-authenticator-sms",
260                    "phone_number": "foo",
261                    "code": "123456",
262                },
263            )
264        self.assertEqual(response.status_code, 200)
265        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
266
267    def test_stage_hash(self):
268        """test stage (verify_only)"""
269        self.stage.verify_only = True
270        self.stage.save()
271        self.client.get(
272            reverse("authentik_flows:configure", kwargs={"stage_uuid": self.stage.stage_uuid}),
273        )
274        response = self.client.get(
275            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
276        )
277        self.assertStageResponse(
278            response,
279            self.flow,
280            self.user,
281            component="ak-stage-authenticator-sms",
282            phone_number_required=True,
283        )
284        sms_send_mock = MagicMock()
285        with patch(
286            "authentik.stages.authenticator_sms.models.AuthenticatorSMSStage.send",
287            sms_send_mock,
288        ):
289            response = self.client.post(
290                reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
291                data={"component": "ak-stage-authenticator-sms", "phone_number": "foo"},
292            )
293            self.assertEqual(response.status_code, 200)
294            sms_send_mock.assert_called_once()
295        self.assertStageResponse(
296            response,
297            self.flow,
298            self.user,
299            component="ak-stage-authenticator-sms",
300            response_errors={},
301            phone_number_required=False,
302        )
303        with patch(
304            "authentik.stages.authenticator_sms.models.SMSDevice.verify_token",
305            MagicMock(return_value=True),
306        ):
307            response = self.client.post(
308                reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
309                data={
310                    "component": "ak-stage-authenticator-sms",
311                    "phone_number": "foo",
312                    "code": "123456",
313                },
314            )
315        self.assertEqual(response.status_code, 200)
316        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
317        device: SMSDevice = SMSDevice.objects.filter(user=self.user).first()
318        self.assertTrue(device.is_hashed)
319
320    def test_stage_hash_twice(self):
321        """test stage (hash + duplicate)"""
322        SMSDevice.objects.create(
323            user=create_test_admin_user(),
324            stage=self.stage,
325            phone_number=hash_phone_number("foo"),
326        )
327        self.stage.verify_only = True
328        self.stage.save()
329        self.client.get(
330            reverse("authentik_flows:configure", kwargs={"stage_uuid": self.stage.stage_uuid}),
331        )
332        response = self.client.get(
333            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
334        )
335        self.assertStageResponse(
336            response,
337            self.flow,
338            self.user,
339            component="ak-stage-authenticator-sms",
340            phone_number_required=True,
341        )
342        sms_send_mock = MagicMock()
343        with patch(
344            "authentik.stages.authenticator_sms.models.AuthenticatorSMSStage.send",
345            sms_send_mock,
346        ):
347            response = self.client.post(
348                reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
349                data={"component": "ak-stage-authenticator-sms", "phone_number": "foo"},
350            )
351            self.assertEqual(response.status_code, 200)
352        self.assertStageResponse(
353            response,
354            self.flow,
355            self.user,
356            component="ak-stage-authenticator-sms",
357            response_errors={
358                "non_field_errors": [{"code": "invalid", "string": "Invalid phone number"}]
359            },
360            phone_number_required=False,
361        )
362
363
364class TestSMSDeviceThrottling(ThrottlingTestMixin, TestCase):
365    """Test ThrottlingMixin behaviour on SMSDevice.verify_token"""
366
367    def setUp(self):
368        super().setUp()
369        flow = create_test_flow()
370        user = create_test_admin_user()
371        stage = AuthenticatorSMSStage.objects.create(
372            flow=flow,
373            name="sms-throttle",
374            provider=SMSProviders.GENERIC,
375            from_number="1234",
376        )
377        self.device = SMSDevice.objects.create(
378            user=user,
379            stage=stage,
380            phone_number="+15551230001",
381        )
382        self.device.generate_token()
383
384    def valid_token(self):
385        return self.device.token
386
387    def invalid_token(self):
388        return "000000" if self.device.token != "000000" else "111111"
class AuthenticatorSMSStageTests(authentik.flows.tests.FlowTestCase):
 28class AuthenticatorSMSStageTests(FlowTestCase):
 29    """Test SMS API"""
 30
 31    def setUp(self) -> None:
 32        super().setUp()
 33        self.flow = create_test_flow()
 34        self.stage: AuthenticatorSMSStage = AuthenticatorSMSStage.objects.create(
 35            name="foo",
 36            provider=SMSProviders.TWILIO,
 37            configure_flow=self.flow,
 38        )
 39        FlowStageBinding.objects.create(target=self.flow, stage=self.stage, order=0)
 40        self.user = create_test_admin_user()
 41        self.client.force_login(self.user)
 42
 43    def test_stage_no_prefill(self):
 44        """test stage"""
 45        self.client.get(
 46            reverse("authentik_flows:configure", kwargs={"stage_uuid": self.stage.stage_uuid}),
 47        )
 48        response = self.client.get(
 49            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
 50        )
 51        self.assertStageResponse(
 52            response,
 53            self.flow,
 54            self.user,
 55            component="ak-stage-authenticator-sms",
 56            phone_number_required=True,
 57        )
 58
 59    def test_stage_submit(self):
 60        """test stage (submit)"""
 61        self.client.get(
 62            reverse("authentik_flows:configure", kwargs={"stage_uuid": self.stage.stage_uuid}),
 63        )
 64        response = self.client.get(
 65            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
 66        )
 67        self.assertStageResponse(
 68            response,
 69            self.flow,
 70            self.user,
 71            component="ak-stage-authenticator-sms",
 72            phone_number_required=True,
 73        )
 74        sms_send_mock = MagicMock()
 75        with patch(
 76            "authentik.stages.authenticator_sms.models.AuthenticatorSMSStage.send",
 77            sms_send_mock,
 78        ):
 79            response = self.client.post(
 80                reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
 81                data={"component": "ak-stage-authenticator-sms", "phone_number": "foo"},
 82            )
 83            self.assertEqual(response.status_code, 200)
 84            sms_send_mock.assert_called_once()
 85        self.assertStageResponse(
 86            response,
 87            self.flow,
 88            self.user,
 89            component="ak-stage-authenticator-sms",
 90            response_errors={},
 91            phone_number_required=False,
 92        )
 93
 94    def test_stage_submit_twilio(self):
 95        """test stage (submit) (twilio)"""
 96        self.stage.account_sid = generate_id()
 97        self.stage.auth = generate_id()
 98        self.stage.from_number = generate_id()
 99        self.stage.save()
100        self.client.get(
101            reverse("authentik_flows:configure", kwargs={"stage_uuid": self.stage.stage_uuid}),
102        )
103        response = self.client.get(
104            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
105        )
106        self.assertStageResponse(
107            response,
108            self.flow,
109            self.user,
110            component="ak-stage-authenticator-sms",
111            phone_number_required=True,
112        )
113        number = generate_id()
114
115        with Mocker() as mocker:
116            mocker.post(
117                (
118                    "https://api.twilio.com/2010-04-01/Accounts/"
119                    f"{self.stage.account_sid}/Messages.json"
120                ),
121                json={},
122            )
123            response = self.client.post(
124                reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
125                data={"component": "ak-stage-authenticator-sms", "phone_number": number},
126            )
127            self.assertEqual(response.status_code, 200)
128            self.assertEqual(mocker.call_count, 1)
129            self.assertEqual(mocker.request_history[0].method, "POST")
130            request_body = dict(parse_qsl(mocker.request_history[0].body))
131            device: SMSDevice = self.get_flow_plan().context[PLAN_CONTEXT_SMS_DEVICE]
132            self.assertEqual(
133                request_body,
134                {
135                    "To": number,
136                    "From": self.stage.from_number,
137                    "Body": f"Use this code to authenticate in authentik: {device.token}",
138                },
139            )
140        self.assertStageResponse(
141            response,
142            self.flow,
143            self.user,
144            component="ak-stage-authenticator-sms",
145            response_errors={},
146            phone_number_required=False,
147        )
148
149    def test_stage_context_data(self):
150        """test stage context data"""
151        self.client.get(
152            reverse("authentik_flows:configure", kwargs={"stage_uuid": self.stage.stage_uuid}),
153        )
154        sms_send_mock = MagicMock()
155        with (
156            patch(
157                (
158                    "authentik.stages.authenticator_sms.stage."
159                    "AuthenticatorSMSStageView._has_phone_number"
160                ),
161                MagicMock(
162                    return_value="1234",
163                ),
164            ),
165            patch(
166                "authentik.stages.authenticator_sms.models.AuthenticatorSMSStage.send",
167                sms_send_mock,
168            ),
169        ):
170            response = self.client.get(
171                reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
172            )
173            sms_send_mock.assert_called_once()
174        self.assertStageResponse(
175            response,
176            self.flow,
177            self.user,
178            component="ak-stage-authenticator-sms",
179            phone_number_required=False,
180        )
181
182    def test_stage_context_data_duplicate(self):
183        """test stage context data (phone number exists already)"""
184        self.client.get(
185            reverse("authentik_flows:configure", kwargs={"stage_uuid": self.stage.stage_uuid}),
186        )
187        plan: FlowPlan = self.client.session[SESSION_KEY_PLAN]
188        plan.context[PLAN_CONTEXT_PROMPT] = {
189            PLAN_CONTEXT_PHONE: "1234",
190        }
191        session = self.client.session
192        session[SESSION_KEY_PLAN] = plan
193        session.save()
194        SMSDevice.objects.create(
195            phone_number="1234",
196            user=self.user,
197            stage=self.stage,
198        )
199        sms_send_mock = MagicMock()
200        with (
201            patch(
202                "authentik.stages.authenticator_sms.models.AuthenticatorSMSStage.send",
203                sms_send_mock,
204            ),
205        ):
206            response = self.client.get(
207                reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
208            )
209        self.assertStageResponse(
210            response,
211            self.flow,
212            self.user,
213            component="ak-stage-authenticator-sms",
214            phone_number_required=True,
215        )
216        plan: FlowPlan = self.client.session[SESSION_KEY_PLAN]
217        self.assertEqual(plan.context[PLAN_CONTEXT_PROMPT], {})
218
219    def test_stage_submit_full(self):
220        """test stage (submit)"""
221        self.client.get(
222            reverse("authentik_flows:configure", kwargs={"stage_uuid": self.stage.stage_uuid}),
223        )
224        response = self.client.get(
225            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
226        )
227        self.assertStageResponse(
228            response,
229            self.flow,
230            self.user,
231            component="ak-stage-authenticator-sms",
232            phone_number_required=True,
233        )
234        sms_send_mock = MagicMock()
235        with patch(
236            "authentik.stages.authenticator_sms.models.AuthenticatorSMSStage.send",
237            sms_send_mock,
238        ):
239            response = self.client.post(
240                reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
241                data={"component": "ak-stage-authenticator-sms", "phone_number": "foo"},
242            )
243            self.assertEqual(response.status_code, 200)
244            sms_send_mock.assert_called_once()
245        self.assertStageResponse(
246            response,
247            self.flow,
248            self.user,
249            component="ak-stage-authenticator-sms",
250            response_errors={},
251            phone_number_required=False,
252        )
253        with patch(
254            "authentik.stages.authenticator_sms.models.SMSDevice.verify_token",
255            MagicMock(return_value=True),
256        ):
257            response = self.client.post(
258                reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
259                data={
260                    "component": "ak-stage-authenticator-sms",
261                    "phone_number": "foo",
262                    "code": "123456",
263                },
264            )
265        self.assertEqual(response.status_code, 200)
266        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
267
268    def test_stage_hash(self):
269        """test stage (verify_only)"""
270        self.stage.verify_only = True
271        self.stage.save()
272        self.client.get(
273            reverse("authentik_flows:configure", kwargs={"stage_uuid": self.stage.stage_uuid}),
274        )
275        response = self.client.get(
276            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
277        )
278        self.assertStageResponse(
279            response,
280            self.flow,
281            self.user,
282            component="ak-stage-authenticator-sms",
283            phone_number_required=True,
284        )
285        sms_send_mock = MagicMock()
286        with patch(
287            "authentik.stages.authenticator_sms.models.AuthenticatorSMSStage.send",
288            sms_send_mock,
289        ):
290            response = self.client.post(
291                reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
292                data={"component": "ak-stage-authenticator-sms", "phone_number": "foo"},
293            )
294            self.assertEqual(response.status_code, 200)
295            sms_send_mock.assert_called_once()
296        self.assertStageResponse(
297            response,
298            self.flow,
299            self.user,
300            component="ak-stage-authenticator-sms",
301            response_errors={},
302            phone_number_required=False,
303        )
304        with patch(
305            "authentik.stages.authenticator_sms.models.SMSDevice.verify_token",
306            MagicMock(return_value=True),
307        ):
308            response = self.client.post(
309                reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
310                data={
311                    "component": "ak-stage-authenticator-sms",
312                    "phone_number": "foo",
313                    "code": "123456",
314                },
315            )
316        self.assertEqual(response.status_code, 200)
317        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
318        device: SMSDevice = SMSDevice.objects.filter(user=self.user).first()
319        self.assertTrue(device.is_hashed)
320
321    def test_stage_hash_twice(self):
322        """test stage (hash + duplicate)"""
323        SMSDevice.objects.create(
324            user=create_test_admin_user(),
325            stage=self.stage,
326            phone_number=hash_phone_number("foo"),
327        )
328        self.stage.verify_only = True
329        self.stage.save()
330        self.client.get(
331            reverse("authentik_flows:configure", kwargs={"stage_uuid": self.stage.stage_uuid}),
332        )
333        response = self.client.get(
334            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
335        )
336        self.assertStageResponse(
337            response,
338            self.flow,
339            self.user,
340            component="ak-stage-authenticator-sms",
341            phone_number_required=True,
342        )
343        sms_send_mock = MagicMock()
344        with patch(
345            "authentik.stages.authenticator_sms.models.AuthenticatorSMSStage.send",
346            sms_send_mock,
347        ):
348            response = self.client.post(
349                reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
350                data={"component": "ak-stage-authenticator-sms", "phone_number": "foo"},
351            )
352            self.assertEqual(response.status_code, 200)
353        self.assertStageResponse(
354            response,
355            self.flow,
356            self.user,
357            component="ak-stage-authenticator-sms",
358            response_errors={
359                "non_field_errors": [{"code": "invalid", "string": "Invalid phone number"}]
360            },
361            phone_number_required=False,
362        )

Test SMS API

def setUp(self) -> None:
31    def setUp(self) -> None:
32        super().setUp()
33        self.flow = create_test_flow()
34        self.stage: AuthenticatorSMSStage = AuthenticatorSMSStage.objects.create(
35            name="foo",
36            provider=SMSProviders.TWILIO,
37            configure_flow=self.flow,
38        )
39        FlowStageBinding.objects.create(target=self.flow, stage=self.stage, order=0)
40        self.user = create_test_admin_user()
41        self.client.force_login(self.user)

Hook method for setting up the test fixture before exercising it.

def test_stage_no_prefill(self):
43    def test_stage_no_prefill(self):
44        """test stage"""
45        self.client.get(
46            reverse("authentik_flows:configure", kwargs={"stage_uuid": self.stage.stage_uuid}),
47        )
48        response = self.client.get(
49            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
50        )
51        self.assertStageResponse(
52            response,
53            self.flow,
54            self.user,
55            component="ak-stage-authenticator-sms",
56            phone_number_required=True,
57        )

test stage

def test_stage_submit(self):
59    def test_stage_submit(self):
60        """test stage (submit)"""
61        self.client.get(
62            reverse("authentik_flows:configure", kwargs={"stage_uuid": self.stage.stage_uuid}),
63        )
64        response = self.client.get(
65            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
66        )
67        self.assertStageResponse(
68            response,
69            self.flow,
70            self.user,
71            component="ak-stage-authenticator-sms",
72            phone_number_required=True,
73        )
74        sms_send_mock = MagicMock()
75        with patch(
76            "authentik.stages.authenticator_sms.models.AuthenticatorSMSStage.send",
77            sms_send_mock,
78        ):
79            response = self.client.post(
80                reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
81                data={"component": "ak-stage-authenticator-sms", "phone_number": "foo"},
82            )
83            self.assertEqual(response.status_code, 200)
84            sms_send_mock.assert_called_once()
85        self.assertStageResponse(
86            response,
87            self.flow,
88            self.user,
89            component="ak-stage-authenticator-sms",
90            response_errors={},
91            phone_number_required=False,
92        )

test stage (submit)

def test_stage_submit_twilio(self):
 94    def test_stage_submit_twilio(self):
 95        """test stage (submit) (twilio)"""
 96        self.stage.account_sid = generate_id()
 97        self.stage.auth = generate_id()
 98        self.stage.from_number = generate_id()
 99        self.stage.save()
100        self.client.get(
101            reverse("authentik_flows:configure", kwargs={"stage_uuid": self.stage.stage_uuid}),
102        )
103        response = self.client.get(
104            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
105        )
106        self.assertStageResponse(
107            response,
108            self.flow,
109            self.user,
110            component="ak-stage-authenticator-sms",
111            phone_number_required=True,
112        )
113        number = generate_id()
114
115        with Mocker() as mocker:
116            mocker.post(
117                (
118                    "https://api.twilio.com/2010-04-01/Accounts/"
119                    f"{self.stage.account_sid}/Messages.json"
120                ),
121                json={},
122            )
123            response = self.client.post(
124                reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
125                data={"component": "ak-stage-authenticator-sms", "phone_number": number},
126            )
127            self.assertEqual(response.status_code, 200)
128            self.assertEqual(mocker.call_count, 1)
129            self.assertEqual(mocker.request_history[0].method, "POST")
130            request_body = dict(parse_qsl(mocker.request_history[0].body))
131            device: SMSDevice = self.get_flow_plan().context[PLAN_CONTEXT_SMS_DEVICE]
132            self.assertEqual(
133                request_body,
134                {
135                    "To": number,
136                    "From": self.stage.from_number,
137                    "Body": f"Use this code to authenticate in authentik: {device.token}",
138                },
139            )
140        self.assertStageResponse(
141            response,
142            self.flow,
143            self.user,
144            component="ak-stage-authenticator-sms",
145            response_errors={},
146            phone_number_required=False,
147        )

test stage (submit) (twilio)

def test_stage_context_data(self):
149    def test_stage_context_data(self):
150        """test stage context data"""
151        self.client.get(
152            reverse("authentik_flows:configure", kwargs={"stage_uuid": self.stage.stage_uuid}),
153        )
154        sms_send_mock = MagicMock()
155        with (
156            patch(
157                (
158                    "authentik.stages.authenticator_sms.stage."
159                    "AuthenticatorSMSStageView._has_phone_number"
160                ),
161                MagicMock(
162                    return_value="1234",
163                ),
164            ),
165            patch(
166                "authentik.stages.authenticator_sms.models.AuthenticatorSMSStage.send",
167                sms_send_mock,
168            ),
169        ):
170            response = self.client.get(
171                reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
172            )
173            sms_send_mock.assert_called_once()
174        self.assertStageResponse(
175            response,
176            self.flow,
177            self.user,
178            component="ak-stage-authenticator-sms",
179            phone_number_required=False,
180        )

test stage context data

def test_stage_context_data_duplicate(self):
182    def test_stage_context_data_duplicate(self):
183        """test stage context data (phone number exists already)"""
184        self.client.get(
185            reverse("authentik_flows:configure", kwargs={"stage_uuid": self.stage.stage_uuid}),
186        )
187        plan: FlowPlan = self.client.session[SESSION_KEY_PLAN]
188        plan.context[PLAN_CONTEXT_PROMPT] = {
189            PLAN_CONTEXT_PHONE: "1234",
190        }
191        session = self.client.session
192        session[SESSION_KEY_PLAN] = plan
193        session.save()
194        SMSDevice.objects.create(
195            phone_number="1234",
196            user=self.user,
197            stage=self.stage,
198        )
199        sms_send_mock = MagicMock()
200        with (
201            patch(
202                "authentik.stages.authenticator_sms.models.AuthenticatorSMSStage.send",
203                sms_send_mock,
204            ),
205        ):
206            response = self.client.get(
207                reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
208            )
209        self.assertStageResponse(
210            response,
211            self.flow,
212            self.user,
213            component="ak-stage-authenticator-sms",
214            phone_number_required=True,
215        )
216        plan: FlowPlan = self.client.session[SESSION_KEY_PLAN]
217        self.assertEqual(plan.context[PLAN_CONTEXT_PROMPT], {})

test stage context data (phone number exists already)

def test_stage_submit_full(self):
219    def test_stage_submit_full(self):
220        """test stage (submit)"""
221        self.client.get(
222            reverse("authentik_flows:configure", kwargs={"stage_uuid": self.stage.stage_uuid}),
223        )
224        response = self.client.get(
225            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
226        )
227        self.assertStageResponse(
228            response,
229            self.flow,
230            self.user,
231            component="ak-stage-authenticator-sms",
232            phone_number_required=True,
233        )
234        sms_send_mock = MagicMock()
235        with patch(
236            "authentik.stages.authenticator_sms.models.AuthenticatorSMSStage.send",
237            sms_send_mock,
238        ):
239            response = self.client.post(
240                reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
241                data={"component": "ak-stage-authenticator-sms", "phone_number": "foo"},
242            )
243            self.assertEqual(response.status_code, 200)
244            sms_send_mock.assert_called_once()
245        self.assertStageResponse(
246            response,
247            self.flow,
248            self.user,
249            component="ak-stage-authenticator-sms",
250            response_errors={},
251            phone_number_required=False,
252        )
253        with patch(
254            "authentik.stages.authenticator_sms.models.SMSDevice.verify_token",
255            MagicMock(return_value=True),
256        ):
257            response = self.client.post(
258                reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
259                data={
260                    "component": "ak-stage-authenticator-sms",
261                    "phone_number": "foo",
262                    "code": "123456",
263                },
264            )
265        self.assertEqual(response.status_code, 200)
266        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))

test stage (submit)

def test_stage_hash(self):
268    def test_stage_hash(self):
269        """test stage (verify_only)"""
270        self.stage.verify_only = True
271        self.stage.save()
272        self.client.get(
273            reverse("authentik_flows:configure", kwargs={"stage_uuid": self.stage.stage_uuid}),
274        )
275        response = self.client.get(
276            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
277        )
278        self.assertStageResponse(
279            response,
280            self.flow,
281            self.user,
282            component="ak-stage-authenticator-sms",
283            phone_number_required=True,
284        )
285        sms_send_mock = MagicMock()
286        with patch(
287            "authentik.stages.authenticator_sms.models.AuthenticatorSMSStage.send",
288            sms_send_mock,
289        ):
290            response = self.client.post(
291                reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
292                data={"component": "ak-stage-authenticator-sms", "phone_number": "foo"},
293            )
294            self.assertEqual(response.status_code, 200)
295            sms_send_mock.assert_called_once()
296        self.assertStageResponse(
297            response,
298            self.flow,
299            self.user,
300            component="ak-stage-authenticator-sms",
301            response_errors={},
302            phone_number_required=False,
303        )
304        with patch(
305            "authentik.stages.authenticator_sms.models.SMSDevice.verify_token",
306            MagicMock(return_value=True),
307        ):
308            response = self.client.post(
309                reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
310                data={
311                    "component": "ak-stage-authenticator-sms",
312                    "phone_number": "foo",
313                    "code": "123456",
314                },
315            )
316        self.assertEqual(response.status_code, 200)
317        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
318        device: SMSDevice = SMSDevice.objects.filter(user=self.user).first()
319        self.assertTrue(device.is_hashed)

test stage (verify_only)

def test_stage_hash_twice(self):
321    def test_stage_hash_twice(self):
322        """test stage (hash + duplicate)"""
323        SMSDevice.objects.create(
324            user=create_test_admin_user(),
325            stage=self.stage,
326            phone_number=hash_phone_number("foo"),
327        )
328        self.stage.verify_only = True
329        self.stage.save()
330        self.client.get(
331            reverse("authentik_flows:configure", kwargs={"stage_uuid": self.stage.stage_uuid}),
332        )
333        response = self.client.get(
334            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
335        )
336        self.assertStageResponse(
337            response,
338            self.flow,
339            self.user,
340            component="ak-stage-authenticator-sms",
341            phone_number_required=True,
342        )
343        sms_send_mock = MagicMock()
344        with patch(
345            "authentik.stages.authenticator_sms.models.AuthenticatorSMSStage.send",
346            sms_send_mock,
347        ):
348            response = self.client.post(
349                reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
350                data={"component": "ak-stage-authenticator-sms", "phone_number": "foo"},
351            )
352            self.assertEqual(response.status_code, 200)
353        self.assertStageResponse(
354            response,
355            self.flow,
356            self.user,
357            component="ak-stage-authenticator-sms",
358            response_errors={
359                "non_field_errors": [{"code": "invalid", "string": "Invalid phone number"}]
360            },
361            phone_number_required=False,
362        )

test stage (hash + duplicate)

class TestSMSDeviceThrottling(authentik.stages.authenticator.tests.ThrottlingTestMixin, django.test.testcases.TestCase):
365class TestSMSDeviceThrottling(ThrottlingTestMixin, TestCase):
366    """Test ThrottlingMixin behaviour on SMSDevice.verify_token"""
367
368    def setUp(self):
369        super().setUp()
370        flow = create_test_flow()
371        user = create_test_admin_user()
372        stage = AuthenticatorSMSStage.objects.create(
373            flow=flow,
374            name="sms-throttle",
375            provider=SMSProviders.GENERIC,
376            from_number="1234",
377        )
378        self.device = SMSDevice.objects.create(
379            user=user,
380            stage=stage,
381            phone_number="+15551230001",
382        )
383        self.device.generate_token()
384
385    def valid_token(self):
386        return self.device.token
387
388    def invalid_token(self):
389        return "000000" if self.device.token != "000000" else "111111"

Test ThrottlingMixin behaviour on SMSDevice.verify_token

def setUp(self):
368    def setUp(self):
369        super().setUp()
370        flow = create_test_flow()
371        user = create_test_admin_user()
372        stage = AuthenticatorSMSStage.objects.create(
373            flow=flow,
374            name="sms-throttle",
375            provider=SMSProviders.GENERIC,
376            from_number="1234",
377        )
378        self.device = SMSDevice.objects.create(
379            user=user,
380            stage=stage,
381            phone_number="+15551230001",
382        )
383        self.device.generate_token()

Hook method for setting up the test fixture before exercising it.

def valid_token(self):
385    def valid_token(self):
386        return self.device.token

Returns a valid token to pass to our device under test.

def invalid_token(self):
388    def invalid_token(self):
389        return "000000" if self.device.token != "000000" else "111111"

Returns an invalid token to pass to our device under test.