authentik.stages.authenticator_sms.tests
Test SMS API
1"""Test SMS API""" 2 3from unittest.mock import MagicMock, patch 4from urllib.parse import parse_qsl 5 6from django.test import TestCase 7from django.urls import reverse 8from requests_mock import Mocker 9 10from authentik.core.tests.utils import create_test_admin_user, create_test_flow 11from authentik.flows.models import FlowStageBinding 12from authentik.flows.planner import FlowPlan 13from authentik.flows.tests import FlowTestCase 14from authentik.flows.views.executor import SESSION_KEY_PLAN 15from authentik.lib.generators import generate_id 16from authentik.stages.authenticator.tests import ThrottlingTestMixin 17from authentik.stages.authenticator_sms.models import ( 18 AuthenticatorSMSStage, 19 SMSDevice, 20 SMSProviders, 21 hash_phone_number, 22) 23from authentik.stages.authenticator_sms.stage import PLAN_CONTEXT_PHONE, PLAN_CONTEXT_SMS_DEVICE 24from authentik.stages.prompt.stage import PLAN_CONTEXT_PROMPT 25 26 27class AuthenticatorSMSStageTests(FlowTestCase): 28 """Test SMS API""" 29 30 def setUp(self) -> None: 31 super().setUp() 32 self.flow = create_test_flow() 33 self.stage: AuthenticatorSMSStage = AuthenticatorSMSStage.objects.create( 34 name="foo", 35 provider=SMSProviders.TWILIO, 36 configure_flow=self.flow, 37 ) 38 FlowStageBinding.objects.create(target=self.flow, stage=self.stage, order=0) 39 self.user = create_test_admin_user() 40 self.client.force_login(self.user) 41 42 def test_stage_no_prefill(self): 43 """test stage""" 44 self.client.get( 45 reverse("authentik_flows:configure", kwargs={"stage_uuid": self.stage.stage_uuid}), 46 ) 47 response = self.client.get( 48 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 49 ) 50 self.assertStageResponse( 51 response, 52 self.flow, 53 self.user, 54 component="ak-stage-authenticator-sms", 55 phone_number_required=True, 56 ) 57 58 def test_stage_submit(self): 59 """test stage (submit)""" 60 self.client.get( 61 reverse("authentik_flows:configure", kwargs={"stage_uuid": self.stage.stage_uuid}), 62 ) 63 response = self.client.get( 64 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 65 ) 66 self.assertStageResponse( 67 response, 68 self.flow, 69 self.user, 70 component="ak-stage-authenticator-sms", 71 phone_number_required=True, 72 ) 73 sms_send_mock = MagicMock() 74 with patch( 75 "authentik.stages.authenticator_sms.models.AuthenticatorSMSStage.send", 76 sms_send_mock, 77 ): 78 response = self.client.post( 79 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 80 data={"component": "ak-stage-authenticator-sms", "phone_number": "foo"}, 81 ) 82 self.assertEqual(response.status_code, 200) 83 sms_send_mock.assert_called_once() 84 self.assertStageResponse( 85 response, 86 self.flow, 87 self.user, 88 component="ak-stage-authenticator-sms", 89 response_errors={}, 90 phone_number_required=False, 91 ) 92 93 def test_stage_submit_twilio(self): 94 """test stage (submit) (twilio)""" 95 self.stage.account_sid = generate_id() 96 self.stage.auth = generate_id() 97 self.stage.from_number = generate_id() 98 self.stage.save() 99 self.client.get( 100 reverse("authentik_flows:configure", kwargs={"stage_uuid": self.stage.stage_uuid}), 101 ) 102 response = self.client.get( 103 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 104 ) 105 self.assertStageResponse( 106 response, 107 self.flow, 108 self.user, 109 component="ak-stage-authenticator-sms", 110 phone_number_required=True, 111 ) 112 number = generate_id() 113 114 with Mocker() as mocker: 115 mocker.post( 116 ( 117 "https://api.twilio.com/2010-04-01/Accounts/" 118 f"{self.stage.account_sid}/Messages.json" 119 ), 120 json={}, 121 ) 122 response = self.client.post( 123 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 124 data={"component": "ak-stage-authenticator-sms", "phone_number": number}, 125 ) 126 self.assertEqual(response.status_code, 200) 127 self.assertEqual(mocker.call_count, 1) 128 self.assertEqual(mocker.request_history[0].method, "POST") 129 request_body = dict(parse_qsl(mocker.request_history[0].body)) 130 device: SMSDevice = self.get_flow_plan().context[PLAN_CONTEXT_SMS_DEVICE] 131 self.assertEqual( 132 request_body, 133 { 134 "To": number, 135 "From": self.stage.from_number, 136 "Body": f"Use this code to authenticate in authentik: {device.token}", 137 }, 138 ) 139 self.assertStageResponse( 140 response, 141 self.flow, 142 self.user, 143 component="ak-stage-authenticator-sms", 144 response_errors={}, 145 phone_number_required=False, 146 ) 147 148 def test_stage_context_data(self): 149 """test stage context data""" 150 self.client.get( 151 reverse("authentik_flows:configure", kwargs={"stage_uuid": self.stage.stage_uuid}), 152 ) 153 sms_send_mock = MagicMock() 154 with ( 155 patch( 156 ( 157 "authentik.stages.authenticator_sms.stage." 158 "AuthenticatorSMSStageView._has_phone_number" 159 ), 160 MagicMock( 161 return_value="1234", 162 ), 163 ), 164 patch( 165 "authentik.stages.authenticator_sms.models.AuthenticatorSMSStage.send", 166 sms_send_mock, 167 ), 168 ): 169 response = self.client.get( 170 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 171 ) 172 sms_send_mock.assert_called_once() 173 self.assertStageResponse( 174 response, 175 self.flow, 176 self.user, 177 component="ak-stage-authenticator-sms", 178 phone_number_required=False, 179 ) 180 181 def test_stage_context_data_duplicate(self): 182 """test stage context data (phone number exists already)""" 183 self.client.get( 184 reverse("authentik_flows:configure", kwargs={"stage_uuid": self.stage.stage_uuid}), 185 ) 186 plan: FlowPlan = self.client.session[SESSION_KEY_PLAN] 187 plan.context[PLAN_CONTEXT_PROMPT] = { 188 PLAN_CONTEXT_PHONE: "1234", 189 } 190 session = self.client.session 191 session[SESSION_KEY_PLAN] = plan 192 session.save() 193 SMSDevice.objects.create( 194 phone_number="1234", 195 user=self.user, 196 stage=self.stage, 197 ) 198 sms_send_mock = MagicMock() 199 with ( 200 patch( 201 "authentik.stages.authenticator_sms.models.AuthenticatorSMSStage.send", 202 sms_send_mock, 203 ), 204 ): 205 response = self.client.get( 206 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 207 ) 208 self.assertStageResponse( 209 response, 210 self.flow, 211 self.user, 212 component="ak-stage-authenticator-sms", 213 phone_number_required=True, 214 ) 215 plan: FlowPlan = self.client.session[SESSION_KEY_PLAN] 216 self.assertEqual(plan.context[PLAN_CONTEXT_PROMPT], {}) 217 218 def test_stage_submit_full(self): 219 """test stage (submit)""" 220 self.client.get( 221 reverse("authentik_flows:configure", kwargs={"stage_uuid": self.stage.stage_uuid}), 222 ) 223 response = self.client.get( 224 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 225 ) 226 self.assertStageResponse( 227 response, 228 self.flow, 229 self.user, 230 component="ak-stage-authenticator-sms", 231 phone_number_required=True, 232 ) 233 sms_send_mock = MagicMock() 234 with patch( 235 "authentik.stages.authenticator_sms.models.AuthenticatorSMSStage.send", 236 sms_send_mock, 237 ): 238 response = self.client.post( 239 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 240 data={"component": "ak-stage-authenticator-sms", "phone_number": "foo"}, 241 ) 242 self.assertEqual(response.status_code, 200) 243 sms_send_mock.assert_called_once() 244 self.assertStageResponse( 245 response, 246 self.flow, 247 self.user, 248 component="ak-stage-authenticator-sms", 249 response_errors={}, 250 phone_number_required=False, 251 ) 252 with patch( 253 "authentik.stages.authenticator_sms.models.SMSDevice.verify_token", 254 MagicMock(return_value=True), 255 ): 256 response = self.client.post( 257 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 258 data={ 259 "component": "ak-stage-authenticator-sms", 260 "phone_number": "foo", 261 "code": "123456", 262 }, 263 ) 264 self.assertEqual(response.status_code, 200) 265 self.assertStageRedirects(response, reverse("authentik_core:root-redirect")) 266 267 def test_stage_hash(self): 268 """test stage (verify_only)""" 269 self.stage.verify_only = True 270 self.stage.save() 271 self.client.get( 272 reverse("authentik_flows:configure", kwargs={"stage_uuid": self.stage.stage_uuid}), 273 ) 274 response = self.client.get( 275 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 276 ) 277 self.assertStageResponse( 278 response, 279 self.flow, 280 self.user, 281 component="ak-stage-authenticator-sms", 282 phone_number_required=True, 283 ) 284 sms_send_mock = MagicMock() 285 with patch( 286 "authentik.stages.authenticator_sms.models.AuthenticatorSMSStage.send", 287 sms_send_mock, 288 ): 289 response = self.client.post( 290 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 291 data={"component": "ak-stage-authenticator-sms", "phone_number": "foo"}, 292 ) 293 self.assertEqual(response.status_code, 200) 294 sms_send_mock.assert_called_once() 295 self.assertStageResponse( 296 response, 297 self.flow, 298 self.user, 299 component="ak-stage-authenticator-sms", 300 response_errors={}, 301 phone_number_required=False, 302 ) 303 with patch( 304 "authentik.stages.authenticator_sms.models.SMSDevice.verify_token", 305 MagicMock(return_value=True), 306 ): 307 response = self.client.post( 308 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 309 data={ 310 "component": "ak-stage-authenticator-sms", 311 "phone_number": "foo", 312 "code": "123456", 313 }, 314 ) 315 self.assertEqual(response.status_code, 200) 316 self.assertStageRedirects(response, reverse("authentik_core:root-redirect")) 317 device: SMSDevice = SMSDevice.objects.filter(user=self.user).first() 318 self.assertTrue(device.is_hashed) 319 320 def test_stage_hash_twice(self): 321 """test stage (hash + duplicate)""" 322 SMSDevice.objects.create( 323 user=create_test_admin_user(), 324 stage=self.stage, 325 phone_number=hash_phone_number("foo"), 326 ) 327 self.stage.verify_only = True 328 self.stage.save() 329 self.client.get( 330 reverse("authentik_flows:configure", kwargs={"stage_uuid": self.stage.stage_uuid}), 331 ) 332 response = self.client.get( 333 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 334 ) 335 self.assertStageResponse( 336 response, 337 self.flow, 338 self.user, 339 component="ak-stage-authenticator-sms", 340 phone_number_required=True, 341 ) 342 sms_send_mock = MagicMock() 343 with patch( 344 "authentik.stages.authenticator_sms.models.AuthenticatorSMSStage.send", 345 sms_send_mock, 346 ): 347 response = self.client.post( 348 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 349 data={"component": "ak-stage-authenticator-sms", "phone_number": "foo"}, 350 ) 351 self.assertEqual(response.status_code, 200) 352 self.assertStageResponse( 353 response, 354 self.flow, 355 self.user, 356 component="ak-stage-authenticator-sms", 357 response_errors={ 358 "non_field_errors": [{"code": "invalid", "string": "Invalid phone number"}] 359 }, 360 phone_number_required=False, 361 ) 362 363 364class TestSMSDeviceThrottling(ThrottlingTestMixin, TestCase): 365 """Test ThrottlingMixin behaviour on SMSDevice.verify_token""" 366 367 def setUp(self): 368 super().setUp() 369 flow = create_test_flow() 370 user = create_test_admin_user() 371 stage = AuthenticatorSMSStage.objects.create( 372 flow=flow, 373 name="sms-throttle", 374 provider=SMSProviders.GENERIC, 375 from_number="1234", 376 ) 377 self.device = SMSDevice.objects.create( 378 user=user, 379 stage=stage, 380 phone_number="+15551230001", 381 ) 382 self.device.generate_token() 383 384 def valid_token(self): 385 return self.device.token 386 387 def invalid_token(self): 388 return "000000" if self.device.token != "000000" else "111111"
28class AuthenticatorSMSStageTests(FlowTestCase): 29 """Test SMS API""" 30 31 def setUp(self) -> None: 32 super().setUp() 33 self.flow = create_test_flow() 34 self.stage: AuthenticatorSMSStage = AuthenticatorSMSStage.objects.create( 35 name="foo", 36 provider=SMSProviders.TWILIO, 37 configure_flow=self.flow, 38 ) 39 FlowStageBinding.objects.create(target=self.flow, stage=self.stage, order=0) 40 self.user = create_test_admin_user() 41 self.client.force_login(self.user) 42 43 def test_stage_no_prefill(self): 44 """test stage""" 45 self.client.get( 46 reverse("authentik_flows:configure", kwargs={"stage_uuid": self.stage.stage_uuid}), 47 ) 48 response = self.client.get( 49 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 50 ) 51 self.assertStageResponse( 52 response, 53 self.flow, 54 self.user, 55 component="ak-stage-authenticator-sms", 56 phone_number_required=True, 57 ) 58 59 def test_stage_submit(self): 60 """test stage (submit)""" 61 self.client.get( 62 reverse("authentik_flows:configure", kwargs={"stage_uuid": self.stage.stage_uuid}), 63 ) 64 response = self.client.get( 65 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 66 ) 67 self.assertStageResponse( 68 response, 69 self.flow, 70 self.user, 71 component="ak-stage-authenticator-sms", 72 phone_number_required=True, 73 ) 74 sms_send_mock = MagicMock() 75 with patch( 76 "authentik.stages.authenticator_sms.models.AuthenticatorSMSStage.send", 77 sms_send_mock, 78 ): 79 response = self.client.post( 80 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 81 data={"component": "ak-stage-authenticator-sms", "phone_number": "foo"}, 82 ) 83 self.assertEqual(response.status_code, 200) 84 sms_send_mock.assert_called_once() 85 self.assertStageResponse( 86 response, 87 self.flow, 88 self.user, 89 component="ak-stage-authenticator-sms", 90 response_errors={}, 91 phone_number_required=False, 92 ) 93 94 def test_stage_submit_twilio(self): 95 """test stage (submit) (twilio)""" 96 self.stage.account_sid = generate_id() 97 self.stage.auth = generate_id() 98 self.stage.from_number = generate_id() 99 self.stage.save() 100 self.client.get( 101 reverse("authentik_flows:configure", kwargs={"stage_uuid": self.stage.stage_uuid}), 102 ) 103 response = self.client.get( 104 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 105 ) 106 self.assertStageResponse( 107 response, 108 self.flow, 109 self.user, 110 component="ak-stage-authenticator-sms", 111 phone_number_required=True, 112 ) 113 number = generate_id() 114 115 with Mocker() as mocker: 116 mocker.post( 117 ( 118 "https://api.twilio.com/2010-04-01/Accounts/" 119 f"{self.stage.account_sid}/Messages.json" 120 ), 121 json={}, 122 ) 123 response = self.client.post( 124 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 125 data={"component": "ak-stage-authenticator-sms", "phone_number": number}, 126 ) 127 self.assertEqual(response.status_code, 200) 128 self.assertEqual(mocker.call_count, 1) 129 self.assertEqual(mocker.request_history[0].method, "POST") 130 request_body = dict(parse_qsl(mocker.request_history[0].body)) 131 device: SMSDevice = self.get_flow_plan().context[PLAN_CONTEXT_SMS_DEVICE] 132 self.assertEqual( 133 request_body, 134 { 135 "To": number, 136 "From": self.stage.from_number, 137 "Body": f"Use this code to authenticate in authentik: {device.token}", 138 }, 139 ) 140 self.assertStageResponse( 141 response, 142 self.flow, 143 self.user, 144 component="ak-stage-authenticator-sms", 145 response_errors={}, 146 phone_number_required=False, 147 ) 148 149 def test_stage_context_data(self): 150 """test stage context data""" 151 self.client.get( 152 reverse("authentik_flows:configure", kwargs={"stage_uuid": self.stage.stage_uuid}), 153 ) 154 sms_send_mock = MagicMock() 155 with ( 156 patch( 157 ( 158 "authentik.stages.authenticator_sms.stage." 159 "AuthenticatorSMSStageView._has_phone_number" 160 ), 161 MagicMock( 162 return_value="1234", 163 ), 164 ), 165 patch( 166 "authentik.stages.authenticator_sms.models.AuthenticatorSMSStage.send", 167 sms_send_mock, 168 ), 169 ): 170 response = self.client.get( 171 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 172 ) 173 sms_send_mock.assert_called_once() 174 self.assertStageResponse( 175 response, 176 self.flow, 177 self.user, 178 component="ak-stage-authenticator-sms", 179 phone_number_required=False, 180 ) 181 182 def test_stage_context_data_duplicate(self): 183 """test stage context data (phone number exists already)""" 184 self.client.get( 185 reverse("authentik_flows:configure", kwargs={"stage_uuid": self.stage.stage_uuid}), 186 ) 187 plan: FlowPlan = self.client.session[SESSION_KEY_PLAN] 188 plan.context[PLAN_CONTEXT_PROMPT] = { 189 PLAN_CONTEXT_PHONE: "1234", 190 } 191 session = self.client.session 192 session[SESSION_KEY_PLAN] = plan 193 session.save() 194 SMSDevice.objects.create( 195 phone_number="1234", 196 user=self.user, 197 stage=self.stage, 198 ) 199 sms_send_mock = MagicMock() 200 with ( 201 patch( 202 "authentik.stages.authenticator_sms.models.AuthenticatorSMSStage.send", 203 sms_send_mock, 204 ), 205 ): 206 response = self.client.get( 207 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 208 ) 209 self.assertStageResponse( 210 response, 211 self.flow, 212 self.user, 213 component="ak-stage-authenticator-sms", 214 phone_number_required=True, 215 ) 216 plan: FlowPlan = self.client.session[SESSION_KEY_PLAN] 217 self.assertEqual(plan.context[PLAN_CONTEXT_PROMPT], {}) 218 219 def test_stage_submit_full(self): 220 """test stage (submit)""" 221 self.client.get( 222 reverse("authentik_flows:configure", kwargs={"stage_uuid": self.stage.stage_uuid}), 223 ) 224 response = self.client.get( 225 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 226 ) 227 self.assertStageResponse( 228 response, 229 self.flow, 230 self.user, 231 component="ak-stage-authenticator-sms", 232 phone_number_required=True, 233 ) 234 sms_send_mock = MagicMock() 235 with patch( 236 "authentik.stages.authenticator_sms.models.AuthenticatorSMSStage.send", 237 sms_send_mock, 238 ): 239 response = self.client.post( 240 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 241 data={"component": "ak-stage-authenticator-sms", "phone_number": "foo"}, 242 ) 243 self.assertEqual(response.status_code, 200) 244 sms_send_mock.assert_called_once() 245 self.assertStageResponse( 246 response, 247 self.flow, 248 self.user, 249 component="ak-stage-authenticator-sms", 250 response_errors={}, 251 phone_number_required=False, 252 ) 253 with patch( 254 "authentik.stages.authenticator_sms.models.SMSDevice.verify_token", 255 MagicMock(return_value=True), 256 ): 257 response = self.client.post( 258 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 259 data={ 260 "component": "ak-stage-authenticator-sms", 261 "phone_number": "foo", 262 "code": "123456", 263 }, 264 ) 265 self.assertEqual(response.status_code, 200) 266 self.assertStageRedirects(response, reverse("authentik_core:root-redirect")) 267 268 def test_stage_hash(self): 269 """test stage (verify_only)""" 270 self.stage.verify_only = True 271 self.stage.save() 272 self.client.get( 273 reverse("authentik_flows:configure", kwargs={"stage_uuid": self.stage.stage_uuid}), 274 ) 275 response = self.client.get( 276 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 277 ) 278 self.assertStageResponse( 279 response, 280 self.flow, 281 self.user, 282 component="ak-stage-authenticator-sms", 283 phone_number_required=True, 284 ) 285 sms_send_mock = MagicMock() 286 with patch( 287 "authentik.stages.authenticator_sms.models.AuthenticatorSMSStage.send", 288 sms_send_mock, 289 ): 290 response = self.client.post( 291 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 292 data={"component": "ak-stage-authenticator-sms", "phone_number": "foo"}, 293 ) 294 self.assertEqual(response.status_code, 200) 295 sms_send_mock.assert_called_once() 296 self.assertStageResponse( 297 response, 298 self.flow, 299 self.user, 300 component="ak-stage-authenticator-sms", 301 response_errors={}, 302 phone_number_required=False, 303 ) 304 with patch( 305 "authentik.stages.authenticator_sms.models.SMSDevice.verify_token", 306 MagicMock(return_value=True), 307 ): 308 response = self.client.post( 309 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 310 data={ 311 "component": "ak-stage-authenticator-sms", 312 "phone_number": "foo", 313 "code": "123456", 314 }, 315 ) 316 self.assertEqual(response.status_code, 200) 317 self.assertStageRedirects(response, reverse("authentik_core:root-redirect")) 318 device: SMSDevice = SMSDevice.objects.filter(user=self.user).first() 319 self.assertTrue(device.is_hashed) 320 321 def test_stage_hash_twice(self): 322 """test stage (hash + duplicate)""" 323 SMSDevice.objects.create( 324 user=create_test_admin_user(), 325 stage=self.stage, 326 phone_number=hash_phone_number("foo"), 327 ) 328 self.stage.verify_only = True 329 self.stage.save() 330 self.client.get( 331 reverse("authentik_flows:configure", kwargs={"stage_uuid": self.stage.stage_uuid}), 332 ) 333 response = self.client.get( 334 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 335 ) 336 self.assertStageResponse( 337 response, 338 self.flow, 339 self.user, 340 component="ak-stage-authenticator-sms", 341 phone_number_required=True, 342 ) 343 sms_send_mock = MagicMock() 344 with patch( 345 "authentik.stages.authenticator_sms.models.AuthenticatorSMSStage.send", 346 sms_send_mock, 347 ): 348 response = self.client.post( 349 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 350 data={"component": "ak-stage-authenticator-sms", "phone_number": "foo"}, 351 ) 352 self.assertEqual(response.status_code, 200) 353 self.assertStageResponse( 354 response, 355 self.flow, 356 self.user, 357 component="ak-stage-authenticator-sms", 358 response_errors={ 359 "non_field_errors": [{"code": "invalid", "string": "Invalid phone number"}] 360 }, 361 phone_number_required=False, 362 )
Test SMS API
def
setUp(self) -> None:
31 def setUp(self) -> None: 32 super().setUp() 33 self.flow = create_test_flow() 34 self.stage: AuthenticatorSMSStage = AuthenticatorSMSStage.objects.create( 35 name="foo", 36 provider=SMSProviders.TWILIO, 37 configure_flow=self.flow, 38 ) 39 FlowStageBinding.objects.create(target=self.flow, stage=self.stage, order=0) 40 self.user = create_test_admin_user() 41 self.client.force_login(self.user)
Hook method for setting up the test fixture before exercising it.
def
test_stage_no_prefill(self):
43 def test_stage_no_prefill(self): 44 """test stage""" 45 self.client.get( 46 reverse("authentik_flows:configure", kwargs={"stage_uuid": self.stage.stage_uuid}), 47 ) 48 response = self.client.get( 49 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 50 ) 51 self.assertStageResponse( 52 response, 53 self.flow, 54 self.user, 55 component="ak-stage-authenticator-sms", 56 phone_number_required=True, 57 )
test stage
def
test_stage_submit(self):
59 def test_stage_submit(self): 60 """test stage (submit)""" 61 self.client.get( 62 reverse("authentik_flows:configure", kwargs={"stage_uuid": self.stage.stage_uuid}), 63 ) 64 response = self.client.get( 65 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 66 ) 67 self.assertStageResponse( 68 response, 69 self.flow, 70 self.user, 71 component="ak-stage-authenticator-sms", 72 phone_number_required=True, 73 ) 74 sms_send_mock = MagicMock() 75 with patch( 76 "authentik.stages.authenticator_sms.models.AuthenticatorSMSStage.send", 77 sms_send_mock, 78 ): 79 response = self.client.post( 80 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 81 data={"component": "ak-stage-authenticator-sms", "phone_number": "foo"}, 82 ) 83 self.assertEqual(response.status_code, 200) 84 sms_send_mock.assert_called_once() 85 self.assertStageResponse( 86 response, 87 self.flow, 88 self.user, 89 component="ak-stage-authenticator-sms", 90 response_errors={}, 91 phone_number_required=False, 92 )
test stage (submit)
def
test_stage_submit_twilio(self):
94 def test_stage_submit_twilio(self): 95 """test stage (submit) (twilio)""" 96 self.stage.account_sid = generate_id() 97 self.stage.auth = generate_id() 98 self.stage.from_number = generate_id() 99 self.stage.save() 100 self.client.get( 101 reverse("authentik_flows:configure", kwargs={"stage_uuid": self.stage.stage_uuid}), 102 ) 103 response = self.client.get( 104 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 105 ) 106 self.assertStageResponse( 107 response, 108 self.flow, 109 self.user, 110 component="ak-stage-authenticator-sms", 111 phone_number_required=True, 112 ) 113 number = generate_id() 114 115 with Mocker() as mocker: 116 mocker.post( 117 ( 118 "https://api.twilio.com/2010-04-01/Accounts/" 119 f"{self.stage.account_sid}/Messages.json" 120 ), 121 json={}, 122 ) 123 response = self.client.post( 124 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 125 data={"component": "ak-stage-authenticator-sms", "phone_number": number}, 126 ) 127 self.assertEqual(response.status_code, 200) 128 self.assertEqual(mocker.call_count, 1) 129 self.assertEqual(mocker.request_history[0].method, "POST") 130 request_body = dict(parse_qsl(mocker.request_history[0].body)) 131 device: SMSDevice = self.get_flow_plan().context[PLAN_CONTEXT_SMS_DEVICE] 132 self.assertEqual( 133 request_body, 134 { 135 "To": number, 136 "From": self.stage.from_number, 137 "Body": f"Use this code to authenticate in authentik: {device.token}", 138 }, 139 ) 140 self.assertStageResponse( 141 response, 142 self.flow, 143 self.user, 144 component="ak-stage-authenticator-sms", 145 response_errors={}, 146 phone_number_required=False, 147 )
test stage (submit) (twilio)
def
test_stage_context_data(self):
149 def test_stage_context_data(self): 150 """test stage context data""" 151 self.client.get( 152 reverse("authentik_flows:configure", kwargs={"stage_uuid": self.stage.stage_uuid}), 153 ) 154 sms_send_mock = MagicMock() 155 with ( 156 patch( 157 ( 158 "authentik.stages.authenticator_sms.stage." 159 "AuthenticatorSMSStageView._has_phone_number" 160 ), 161 MagicMock( 162 return_value="1234", 163 ), 164 ), 165 patch( 166 "authentik.stages.authenticator_sms.models.AuthenticatorSMSStage.send", 167 sms_send_mock, 168 ), 169 ): 170 response = self.client.get( 171 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 172 ) 173 sms_send_mock.assert_called_once() 174 self.assertStageResponse( 175 response, 176 self.flow, 177 self.user, 178 component="ak-stage-authenticator-sms", 179 phone_number_required=False, 180 )
test stage context data
def
test_stage_context_data_duplicate(self):
182 def test_stage_context_data_duplicate(self): 183 """test stage context data (phone number exists already)""" 184 self.client.get( 185 reverse("authentik_flows:configure", kwargs={"stage_uuid": self.stage.stage_uuid}), 186 ) 187 plan: FlowPlan = self.client.session[SESSION_KEY_PLAN] 188 plan.context[PLAN_CONTEXT_PROMPT] = { 189 PLAN_CONTEXT_PHONE: "1234", 190 } 191 session = self.client.session 192 session[SESSION_KEY_PLAN] = plan 193 session.save() 194 SMSDevice.objects.create( 195 phone_number="1234", 196 user=self.user, 197 stage=self.stage, 198 ) 199 sms_send_mock = MagicMock() 200 with ( 201 patch( 202 "authentik.stages.authenticator_sms.models.AuthenticatorSMSStage.send", 203 sms_send_mock, 204 ), 205 ): 206 response = self.client.get( 207 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 208 ) 209 self.assertStageResponse( 210 response, 211 self.flow, 212 self.user, 213 component="ak-stage-authenticator-sms", 214 phone_number_required=True, 215 ) 216 plan: FlowPlan = self.client.session[SESSION_KEY_PLAN] 217 self.assertEqual(plan.context[PLAN_CONTEXT_PROMPT], {})
test stage context data (phone number exists already)
def
test_stage_submit_full(self):
219 def test_stage_submit_full(self): 220 """test stage (submit)""" 221 self.client.get( 222 reverse("authentik_flows:configure", kwargs={"stage_uuid": self.stage.stage_uuid}), 223 ) 224 response = self.client.get( 225 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 226 ) 227 self.assertStageResponse( 228 response, 229 self.flow, 230 self.user, 231 component="ak-stage-authenticator-sms", 232 phone_number_required=True, 233 ) 234 sms_send_mock = MagicMock() 235 with patch( 236 "authentik.stages.authenticator_sms.models.AuthenticatorSMSStage.send", 237 sms_send_mock, 238 ): 239 response = self.client.post( 240 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 241 data={"component": "ak-stage-authenticator-sms", "phone_number": "foo"}, 242 ) 243 self.assertEqual(response.status_code, 200) 244 sms_send_mock.assert_called_once() 245 self.assertStageResponse( 246 response, 247 self.flow, 248 self.user, 249 component="ak-stage-authenticator-sms", 250 response_errors={}, 251 phone_number_required=False, 252 ) 253 with patch( 254 "authentik.stages.authenticator_sms.models.SMSDevice.verify_token", 255 MagicMock(return_value=True), 256 ): 257 response = self.client.post( 258 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 259 data={ 260 "component": "ak-stage-authenticator-sms", 261 "phone_number": "foo", 262 "code": "123456", 263 }, 264 ) 265 self.assertEqual(response.status_code, 200) 266 self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
test stage (submit)
def
test_stage_hash(self):
268 def test_stage_hash(self): 269 """test stage (verify_only)""" 270 self.stage.verify_only = True 271 self.stage.save() 272 self.client.get( 273 reverse("authentik_flows:configure", kwargs={"stage_uuid": self.stage.stage_uuid}), 274 ) 275 response = self.client.get( 276 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 277 ) 278 self.assertStageResponse( 279 response, 280 self.flow, 281 self.user, 282 component="ak-stage-authenticator-sms", 283 phone_number_required=True, 284 ) 285 sms_send_mock = MagicMock() 286 with patch( 287 "authentik.stages.authenticator_sms.models.AuthenticatorSMSStage.send", 288 sms_send_mock, 289 ): 290 response = self.client.post( 291 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 292 data={"component": "ak-stage-authenticator-sms", "phone_number": "foo"}, 293 ) 294 self.assertEqual(response.status_code, 200) 295 sms_send_mock.assert_called_once() 296 self.assertStageResponse( 297 response, 298 self.flow, 299 self.user, 300 component="ak-stage-authenticator-sms", 301 response_errors={}, 302 phone_number_required=False, 303 ) 304 with patch( 305 "authentik.stages.authenticator_sms.models.SMSDevice.verify_token", 306 MagicMock(return_value=True), 307 ): 308 response = self.client.post( 309 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 310 data={ 311 "component": "ak-stage-authenticator-sms", 312 "phone_number": "foo", 313 "code": "123456", 314 }, 315 ) 316 self.assertEqual(response.status_code, 200) 317 self.assertStageRedirects(response, reverse("authentik_core:root-redirect")) 318 device: SMSDevice = SMSDevice.objects.filter(user=self.user).first() 319 self.assertTrue(device.is_hashed)
test stage (verify_only)
def
test_stage_hash_twice(self):
321 def test_stage_hash_twice(self): 322 """test stage (hash + duplicate)""" 323 SMSDevice.objects.create( 324 user=create_test_admin_user(), 325 stage=self.stage, 326 phone_number=hash_phone_number("foo"), 327 ) 328 self.stage.verify_only = True 329 self.stage.save() 330 self.client.get( 331 reverse("authentik_flows:configure", kwargs={"stage_uuid": self.stage.stage_uuid}), 332 ) 333 response = self.client.get( 334 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 335 ) 336 self.assertStageResponse( 337 response, 338 self.flow, 339 self.user, 340 component="ak-stage-authenticator-sms", 341 phone_number_required=True, 342 ) 343 sms_send_mock = MagicMock() 344 with patch( 345 "authentik.stages.authenticator_sms.models.AuthenticatorSMSStage.send", 346 sms_send_mock, 347 ): 348 response = self.client.post( 349 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 350 data={"component": "ak-stage-authenticator-sms", "phone_number": "foo"}, 351 ) 352 self.assertEqual(response.status_code, 200) 353 self.assertStageResponse( 354 response, 355 self.flow, 356 self.user, 357 component="ak-stage-authenticator-sms", 358 response_errors={ 359 "non_field_errors": [{"code": "invalid", "string": "Invalid phone number"}] 360 }, 361 phone_number_required=False, 362 )
test stage (hash + duplicate)
class
TestSMSDeviceThrottling(authentik.stages.authenticator.tests.ThrottlingTestMixin, django.test.testcases.TestCase):
365class TestSMSDeviceThrottling(ThrottlingTestMixin, TestCase): 366 """Test ThrottlingMixin behaviour on SMSDevice.verify_token""" 367 368 def setUp(self): 369 super().setUp() 370 flow = create_test_flow() 371 user = create_test_admin_user() 372 stage = AuthenticatorSMSStage.objects.create( 373 flow=flow, 374 name="sms-throttle", 375 provider=SMSProviders.GENERIC, 376 from_number="1234", 377 ) 378 self.device = SMSDevice.objects.create( 379 user=user, 380 stage=stage, 381 phone_number="+15551230001", 382 ) 383 self.device.generate_token() 384 385 def valid_token(self): 386 return self.device.token 387 388 def invalid_token(self): 389 return "000000" if self.device.token != "000000" else "111111"
Test ThrottlingMixin behaviour on SMSDevice.verify_token
def
setUp(self):
368 def setUp(self): 369 super().setUp() 370 flow = create_test_flow() 371 user = create_test_admin_user() 372 stage = AuthenticatorSMSStage.objects.create( 373 flow=flow, 374 name="sms-throttle", 375 provider=SMSProviders.GENERIC, 376 from_number="1234", 377 ) 378 self.device = SMSDevice.objects.create( 379 user=user, 380 stage=stage, 381 phone_number="+15551230001", 382 ) 383 self.device.generate_token()
Hook method for setting up the test fixture before exercising it.