authentik.stages.authenticator_email.tests
Test Email Authenticator API
1"""Test Email Authenticator API""" 2 3from datetime import timedelta 4from unittest.mock import PropertyMock, patch 5 6from django.core import mail 7from django.core.mail.backends.locmem import EmailBackend 8from django.core.mail.backends.smtp import EmailBackend as SMTPEmailBackend 9from django.db.utils import IntegrityError 10from django.template.exceptions import TemplateDoesNotExist 11from django.test import TestCase 12from django.urls import reverse 13from django.utils.timezone import now 14 15from authentik.core.tests.utils import create_test_flow, create_test_user 16from authentik.flows.models import FlowStageBinding 17from authentik.flows.tests import FlowTestCase 18from authentik.lib.config import CONFIG 19from authentik.lib.utils.email import mask_email 20from authentik.stages.authenticator.tests import ThrottlingTestMixin 21from authentik.stages.authenticator_email.api import ( 22 AuthenticatorEmailStageSerializer, 23 EmailDeviceSerializer, 24) 25from authentik.stages.authenticator_email.models import AuthenticatorEmailStage, EmailDevice 26from authentik.stages.authenticator_email.stage import PLAN_CONTEXT_EMAIL_DEVICE 27from authentik.stages.email.utils import TemplateEmailMessage 28 29 30class TestAuthenticatorEmailStage(FlowTestCase): 31 """Test Email Authenticator stage""" 32 33 def setUp(self): 34 super().setUp() 35 self.flow = create_test_flow() 36 self.user = create_test_user() 37 self.user_noemail = create_test_user(email="") 38 self.stage = AuthenticatorEmailStage.objects.create( 39 name="email-authenticator", 40 use_global_settings=True, 41 from_address="test@authentik.local", 42 configure_flow=self.flow, 43 token_expiry="minutes=30", 44 ) # nosec 45 self.binding = FlowStageBinding.objects.create(target=self.flow, stage=self.stage, order=0) 46 self.device = EmailDevice.objects.create( 47 user=self.user, 48 stage=self.stage, 49 email="test@authentik.local", 50 ) 51 self.client.force_login(self.user) 52 53 def test_device_str(self): 54 """Test string representation of device""" 55 self.assertEqual(str(self.device), f"Email Device for {self.user.pk}") 56 # Test unsaved device 57 unsaved_device = EmailDevice( 58 user=self.user, 59 stage=self.stage, 60 email="test@authentik.local", 61 ) 62 self.assertEqual(str(unsaved_device), "New Email Device") 63 64 def test_stage_str(self): 65 """Test string representation of stage""" 66 self.assertEqual(str(self.stage), f"Email Authenticator Stage {self.stage.name}") 67 68 def test_token_lifecycle(self): 69 """Test token generation, validation and expiry""" 70 # Initially no token 71 self.assertIsNone(self.device.token) 72 73 # Generate token 74 self.device.generate_token() 75 token = self.device.token 76 self.assertIsNotNone(token) 77 self.assertIsNotNone(self.device.valid_until) 78 self.assertTrue(self.device.valid_until > now()) 79 80 # Verify invalid token 81 self.assertFalse(self.device.verify_token("000000")) 82 83 # Verify correct token (should clear token after verification) 84 self.device.throttle_reset(commit=False) 85 self.assertTrue(self.device.verify_token(token)) 86 self.assertIsNone(self.device.token) 87 88 @patch( 89 "authentik.stages.authenticator_email.models.AuthenticatorEmailStage.backend_class", 90 PropertyMock(return_value=EmailBackend), 91 ) 92 def test_stage_no_prefill(self): 93 """Test stage without prefilled email""" 94 self.client.force_login(self.user_noemail) 95 response = self.client.get( 96 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 97 ) 98 self.assertStageResponse( 99 response, 100 self.flow, 101 self.user_noemail, 102 component="ak-stage-authenticator-email", 103 email_required=True, 104 ) 105 106 @patch( 107 "authentik.stages.authenticator_email.models.AuthenticatorEmailStage.backend_class", 108 PropertyMock(return_value=EmailBackend), 109 ) 110 def test_stage_submit(self): 111 """Test stage email submission""" 112 # test fail because of existing device 113 response = self.client.get( 114 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 115 ) 116 self.assertStageResponse( 117 response, 118 self.flow, 119 self.user, 120 component="ak-stage-access-denied", 121 ) 122 self.device.delete() 123 # Initialize the flow 124 response = self.client.get( 125 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 126 ) 127 self.assertStageResponse( 128 response, 129 self.flow, 130 self.user, 131 component="ak-stage-authenticator-email", 132 email_required=False, 133 ) 134 135 response = self.client.post( 136 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 137 data={"component": "ak-stage-authenticator-email", "email": "test@example.com"}, 138 ) 139 self.assertEqual(response.status_code, 200) 140 self.assertEqual(len(mail.outbox), 2) 141 sent_mail = mail.outbox[1] 142 self.assertEqual(sent_mail.subject, self.stage.subject) 143 self.assertEqual(sent_mail.to, [f"{self.user} <test@example.com>"]) 144 # Get from_address from global email config to test if global settings are being used 145 from_address_global = CONFIG.get("email.from") 146 self.assertEqual(sent_mail.from_email, from_address_global) 147 148 self.assertStageResponse( 149 response, 150 self.flow, 151 self.user, 152 component="ak-stage-authenticator-email", 153 response_errors={}, 154 email_required=False, 155 ) 156 157 def test_email_template(self): 158 """Test email template rendering""" 159 self.device.generate_token() 160 message = self.device._compose_email() 161 162 self.assertIsInstance(message, TemplateEmailMessage) 163 self.assertEqual(message.subject, self.stage.subject) 164 self.assertEqual(message.to, [f"{self.user.name} <{self.device.email}>"]) 165 self.assertTrue(self.device.token in message.body) 166 167 def test_duplicate_email(self): 168 """Test attempting to use same email twice""" 169 email = "test2@authentik.local" 170 # First device 171 EmailDevice.objects.create( 172 user=self.user, 173 stage=self.stage, 174 email=email, 175 ) 176 # Attempt to create second device with same email 177 with self.assertRaises(IntegrityError): 178 EmailDevice.objects.create( 179 user=self.user, 180 stage=self.stage, 181 email=email, 182 ) 183 184 def test_token_expiry(self): 185 """Test token expiration behavior""" 186 self.device.generate_token() 187 token = self.device.token 188 # Set token as expired 189 self.device.valid_until = now() - timedelta(minutes=1) 190 self.device.save() 191 # Verify expired token fails 192 self.assertFalse(self.device.verify_token(token)) 193 194 def test_template_errors(self): 195 """Test handling of template errors""" 196 self.stage.template = "{% invalid template %}" 197 with self.assertRaises(TemplateDoesNotExist): 198 self.stage.send(self.device) 199 200 @patch( 201 "authentik.stages.authenticator_email.models.AuthenticatorEmailStage.backend_class", 202 PropertyMock(return_value=EmailBackend), 203 ) 204 def test_challenge_response_validation(self): 205 """Test challenge response validation""" 206 # Initialize the flow 207 self.client.force_login(self.user_noemail) 208 response = self.client.get( 209 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 210 ) 211 212 # Test missing code and email 213 response = self.client.post( 214 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 215 data={"component": "ak-stage-authenticator-email"}, 216 ) 217 self.assertStageResponse( 218 response, 219 self.flow, 220 response_errors={"non_field_errors": [{"code": "invalid", "string": "email required"}]}, 221 ) 222 223 # Test invalid code 224 response = self.client.post( 225 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 226 data={"component": "ak-stage-authenticator-email", "code": "000000"}, 227 ) 228 self.assertStageResponse( 229 response, 230 self.flow, 231 response_errors={ 232 "non_field_errors": [{"code": "invalid", "string": "Code does not match"}] 233 }, 234 ) 235 236 # Test valid code 237 device = self.device 238 token = device.token 239 response = self.client.post( 240 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 241 data={"component": "ak-stage-authenticator-email", "code": token}, 242 ) 243 self.assertEqual(response.status_code, 200) 244 self.assertTrue(device.confirmed) 245 246 @patch( 247 "authentik.stages.authenticator_email.models.AuthenticatorEmailStage.backend_class", 248 PropertyMock(return_value=EmailBackend), 249 ) 250 def test_challenge_generation(self): 251 """Test challenge generation""" 252 # Test with masked email 253 self.device.delete() 254 response = self.client.get( 255 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 256 ) 257 self.assertStageResponse( 258 response, 259 self.flow, 260 self.user, 261 component="ak-stage-authenticator-email", 262 email_required=False, 263 ) 264 masked_email = mask_email(self.user.email) 265 self.assertEqual(masked_email, response.json()["email"]) 266 self.client.logout() 267 268 # Test without email 269 self.client.force_login(self.user_noemail) 270 response = self.client.get( 271 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 272 ) 273 self.assertStageResponse( 274 response, 275 self.flow, 276 self.user_noemail, 277 component="ak-stage-authenticator-email", 278 email_required=True, 279 ) 280 self.assertIsNone(response.json()["email"]) 281 282 @patch( 283 "authentik.stages.authenticator_email.models.AuthenticatorEmailStage.backend_class", 284 PropertyMock(return_value=EmailBackend), 285 ) 286 def test_session_management(self): 287 """Test session device management""" 288 # Test device creation in session 289 # Delete any existing devices for this test 290 EmailDevice.objects.filter(user=self.user).delete() 291 292 response = self.client.get( 293 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 294 ) 295 device = self.get_flow_plan().context[PLAN_CONTEXT_EMAIL_DEVICE] 296 self.assertIsInstance(device, EmailDevice) 297 self.assertFalse(device.confirmed) 298 self.assertEqual(device.user, self.user) 299 300 # Test device confirmation and cleanup 301 device.confirmed = True 302 device.email = "new_test@authentik.local" # Use a different email 303 response = self.client.post( 304 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 305 data={"component": "ak-stage-authenticator-email", "code": device.token}, 306 ) 307 self.assertEqual(response.status_code, 200) 308 309 def test_model_properties_and_methods(self): 310 """Test model properties""" 311 device = self.device 312 stage = self.stage 313 314 self.assertEqual(stage.serializer, AuthenticatorEmailStageSerializer) 315 self.assertIsInstance(stage.backend, SMTPEmailBackend) 316 self.assertEqual(device.serializer, EmailDeviceSerializer) 317 318 # Test AuthenticatorEmailStage send method 319 self.device.generate_token() 320 # Test EmailDevice _compose_email method 321 message = self.device._compose_email() 322 self.assertIsInstance(message, TemplateEmailMessage) 323 self.assertEqual(message.subject, self.stage.subject) 324 self.assertEqual(message.to, [f"{self.user.name} <{self.device.email}>"]) 325 self.assertTrue(self.device.token in message.body) 326 327 @patch( 328 "authentik.stages.authenticator_email.models.AuthenticatorEmailStage.backend_class", 329 PropertyMock(return_value=EmailBackend), 330 ) 331 def test_email_tasks(self): 332 # Test AuthenticatorEmailStage send method 333 self.stage.send(self.device) 334 self.assertEqual(len(mail.outbox), 1) 335 336 337class TestEmailDeviceThrottling(ThrottlingTestMixin, TestCase): 338 def setUp(self): 339 super().setUp() 340 flow = create_test_flow() 341 user = create_test_user() 342 stage = AuthenticatorEmailStage.objects.create( 343 name="email-authenticator-throttle", 344 use_global_settings=True, 345 from_address="test@authentik.local", 346 configure_flow=flow, 347 token_expiry="minutes=30", 348 ) # nosec 349 self.device = EmailDevice.objects.create( 350 user=user, stage=stage, email="throttle@authentik.local" 351 ) 352 self.device.generate_token() 353 354 def valid_token(self): 355 return self.device.token 356 357 def invalid_token(self): 358 return "000000" if self.device.token != "000000" else "111111"
31class TestAuthenticatorEmailStage(FlowTestCase): 32 """Test Email Authenticator stage""" 33 34 def setUp(self): 35 super().setUp() 36 self.flow = create_test_flow() 37 self.user = create_test_user() 38 self.user_noemail = create_test_user(email="") 39 self.stage = AuthenticatorEmailStage.objects.create( 40 name="email-authenticator", 41 use_global_settings=True, 42 from_address="test@authentik.local", 43 configure_flow=self.flow, 44 token_expiry="minutes=30", 45 ) # nosec 46 self.binding = FlowStageBinding.objects.create(target=self.flow, stage=self.stage, order=0) 47 self.device = EmailDevice.objects.create( 48 user=self.user, 49 stage=self.stage, 50 email="test@authentik.local", 51 ) 52 self.client.force_login(self.user) 53 54 def test_device_str(self): 55 """Test string representation of device""" 56 self.assertEqual(str(self.device), f"Email Device for {self.user.pk}") 57 # Test unsaved device 58 unsaved_device = EmailDevice( 59 user=self.user, 60 stage=self.stage, 61 email="test@authentik.local", 62 ) 63 self.assertEqual(str(unsaved_device), "New Email Device") 64 65 def test_stage_str(self): 66 """Test string representation of stage""" 67 self.assertEqual(str(self.stage), f"Email Authenticator Stage {self.stage.name}") 68 69 def test_token_lifecycle(self): 70 """Test token generation, validation and expiry""" 71 # Initially no token 72 self.assertIsNone(self.device.token) 73 74 # Generate token 75 self.device.generate_token() 76 token = self.device.token 77 self.assertIsNotNone(token) 78 self.assertIsNotNone(self.device.valid_until) 79 self.assertTrue(self.device.valid_until > now()) 80 81 # Verify invalid token 82 self.assertFalse(self.device.verify_token("000000")) 83 84 # Verify correct token (should clear token after verification) 85 self.device.throttle_reset(commit=False) 86 self.assertTrue(self.device.verify_token(token)) 87 self.assertIsNone(self.device.token) 88 89 @patch( 90 "authentik.stages.authenticator_email.models.AuthenticatorEmailStage.backend_class", 91 PropertyMock(return_value=EmailBackend), 92 ) 93 def test_stage_no_prefill(self): 94 """Test stage without prefilled email""" 95 self.client.force_login(self.user_noemail) 96 response = self.client.get( 97 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 98 ) 99 self.assertStageResponse( 100 response, 101 self.flow, 102 self.user_noemail, 103 component="ak-stage-authenticator-email", 104 email_required=True, 105 ) 106 107 @patch( 108 "authentik.stages.authenticator_email.models.AuthenticatorEmailStage.backend_class", 109 PropertyMock(return_value=EmailBackend), 110 ) 111 def test_stage_submit(self): 112 """Test stage email submission""" 113 # test fail because of existing device 114 response = self.client.get( 115 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 116 ) 117 self.assertStageResponse( 118 response, 119 self.flow, 120 self.user, 121 component="ak-stage-access-denied", 122 ) 123 self.device.delete() 124 # Initialize the flow 125 response = self.client.get( 126 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 127 ) 128 self.assertStageResponse( 129 response, 130 self.flow, 131 self.user, 132 component="ak-stage-authenticator-email", 133 email_required=False, 134 ) 135 136 response = self.client.post( 137 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 138 data={"component": "ak-stage-authenticator-email", "email": "test@example.com"}, 139 ) 140 self.assertEqual(response.status_code, 200) 141 self.assertEqual(len(mail.outbox), 2) 142 sent_mail = mail.outbox[1] 143 self.assertEqual(sent_mail.subject, self.stage.subject) 144 self.assertEqual(sent_mail.to, [f"{self.user} <test@example.com>"]) 145 # Get from_address from global email config to test if global settings are being used 146 from_address_global = CONFIG.get("email.from") 147 self.assertEqual(sent_mail.from_email, from_address_global) 148 149 self.assertStageResponse( 150 response, 151 self.flow, 152 self.user, 153 component="ak-stage-authenticator-email", 154 response_errors={}, 155 email_required=False, 156 ) 157 158 def test_email_template(self): 159 """Test email template rendering""" 160 self.device.generate_token() 161 message = self.device._compose_email() 162 163 self.assertIsInstance(message, TemplateEmailMessage) 164 self.assertEqual(message.subject, self.stage.subject) 165 self.assertEqual(message.to, [f"{self.user.name} <{self.device.email}>"]) 166 self.assertTrue(self.device.token in message.body) 167 168 def test_duplicate_email(self): 169 """Test attempting to use same email twice""" 170 email = "test2@authentik.local" 171 # First device 172 EmailDevice.objects.create( 173 user=self.user, 174 stage=self.stage, 175 email=email, 176 ) 177 # Attempt to create second device with same email 178 with self.assertRaises(IntegrityError): 179 EmailDevice.objects.create( 180 user=self.user, 181 stage=self.stage, 182 email=email, 183 ) 184 185 def test_token_expiry(self): 186 """Test token expiration behavior""" 187 self.device.generate_token() 188 token = self.device.token 189 # Set token as expired 190 self.device.valid_until = now() - timedelta(minutes=1) 191 self.device.save() 192 # Verify expired token fails 193 self.assertFalse(self.device.verify_token(token)) 194 195 def test_template_errors(self): 196 """Test handling of template errors""" 197 self.stage.template = "{% invalid template %}" 198 with self.assertRaises(TemplateDoesNotExist): 199 self.stage.send(self.device) 200 201 @patch( 202 "authentik.stages.authenticator_email.models.AuthenticatorEmailStage.backend_class", 203 PropertyMock(return_value=EmailBackend), 204 ) 205 def test_challenge_response_validation(self): 206 """Test challenge response validation""" 207 # Initialize the flow 208 self.client.force_login(self.user_noemail) 209 response = self.client.get( 210 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 211 ) 212 213 # Test missing code and email 214 response = self.client.post( 215 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 216 data={"component": "ak-stage-authenticator-email"}, 217 ) 218 self.assertStageResponse( 219 response, 220 self.flow, 221 response_errors={"non_field_errors": [{"code": "invalid", "string": "email required"}]}, 222 ) 223 224 # Test invalid code 225 response = self.client.post( 226 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 227 data={"component": "ak-stage-authenticator-email", "code": "000000"}, 228 ) 229 self.assertStageResponse( 230 response, 231 self.flow, 232 response_errors={ 233 "non_field_errors": [{"code": "invalid", "string": "Code does not match"}] 234 }, 235 ) 236 237 # Test valid code 238 device = self.device 239 token = device.token 240 response = self.client.post( 241 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 242 data={"component": "ak-stage-authenticator-email", "code": token}, 243 ) 244 self.assertEqual(response.status_code, 200) 245 self.assertTrue(device.confirmed) 246 247 @patch( 248 "authentik.stages.authenticator_email.models.AuthenticatorEmailStage.backend_class", 249 PropertyMock(return_value=EmailBackend), 250 ) 251 def test_challenge_generation(self): 252 """Test challenge generation""" 253 # Test with masked email 254 self.device.delete() 255 response = self.client.get( 256 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 257 ) 258 self.assertStageResponse( 259 response, 260 self.flow, 261 self.user, 262 component="ak-stage-authenticator-email", 263 email_required=False, 264 ) 265 masked_email = mask_email(self.user.email) 266 self.assertEqual(masked_email, response.json()["email"]) 267 self.client.logout() 268 269 # Test without email 270 self.client.force_login(self.user_noemail) 271 response = self.client.get( 272 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 273 ) 274 self.assertStageResponse( 275 response, 276 self.flow, 277 self.user_noemail, 278 component="ak-stage-authenticator-email", 279 email_required=True, 280 ) 281 self.assertIsNone(response.json()["email"]) 282 283 @patch( 284 "authentik.stages.authenticator_email.models.AuthenticatorEmailStage.backend_class", 285 PropertyMock(return_value=EmailBackend), 286 ) 287 def test_session_management(self): 288 """Test session device management""" 289 # Test device creation in session 290 # Delete any existing devices for this test 291 EmailDevice.objects.filter(user=self.user).delete() 292 293 response = self.client.get( 294 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 295 ) 296 device = self.get_flow_plan().context[PLAN_CONTEXT_EMAIL_DEVICE] 297 self.assertIsInstance(device, EmailDevice) 298 self.assertFalse(device.confirmed) 299 self.assertEqual(device.user, self.user) 300 301 # Test device confirmation and cleanup 302 device.confirmed = True 303 device.email = "new_test@authentik.local" # Use a different email 304 response = self.client.post( 305 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 306 data={"component": "ak-stage-authenticator-email", "code": device.token}, 307 ) 308 self.assertEqual(response.status_code, 200) 309 310 def test_model_properties_and_methods(self): 311 """Test model properties""" 312 device = self.device 313 stage = self.stage 314 315 self.assertEqual(stage.serializer, AuthenticatorEmailStageSerializer) 316 self.assertIsInstance(stage.backend, SMTPEmailBackend) 317 self.assertEqual(device.serializer, EmailDeviceSerializer) 318 319 # Test AuthenticatorEmailStage send method 320 self.device.generate_token() 321 # Test EmailDevice _compose_email method 322 message = self.device._compose_email() 323 self.assertIsInstance(message, TemplateEmailMessage) 324 self.assertEqual(message.subject, self.stage.subject) 325 self.assertEqual(message.to, [f"{self.user.name} <{self.device.email}>"]) 326 self.assertTrue(self.device.token in message.body) 327 328 @patch( 329 "authentik.stages.authenticator_email.models.AuthenticatorEmailStage.backend_class", 330 PropertyMock(return_value=EmailBackend), 331 ) 332 def test_email_tasks(self): 333 # Test AuthenticatorEmailStage send method 334 self.stage.send(self.device) 335 self.assertEqual(len(mail.outbox), 1)
Test Email Authenticator stage
34 def setUp(self): 35 super().setUp() 36 self.flow = create_test_flow() 37 self.user = create_test_user() 38 self.user_noemail = create_test_user(email="") 39 self.stage = AuthenticatorEmailStage.objects.create( 40 name="email-authenticator", 41 use_global_settings=True, 42 from_address="test@authentik.local", 43 configure_flow=self.flow, 44 token_expiry="minutes=30", 45 ) # nosec 46 self.binding = FlowStageBinding.objects.create(target=self.flow, stage=self.stage, order=0) 47 self.device = EmailDevice.objects.create( 48 user=self.user, 49 stage=self.stage, 50 email="test@authentik.local", 51 ) 52 self.client.force_login(self.user)
Hook method for setting up the test fixture before exercising it.
54 def test_device_str(self): 55 """Test string representation of device""" 56 self.assertEqual(str(self.device), f"Email Device for {self.user.pk}") 57 # Test unsaved device 58 unsaved_device = EmailDevice( 59 user=self.user, 60 stage=self.stage, 61 email="test@authentik.local", 62 ) 63 self.assertEqual(str(unsaved_device), "New Email Device")
Test string representation of device
65 def test_stage_str(self): 66 """Test string representation of stage""" 67 self.assertEqual(str(self.stage), f"Email Authenticator Stage {self.stage.name}")
Test string representation of stage
69 def test_token_lifecycle(self): 70 """Test token generation, validation and expiry""" 71 # Initially no token 72 self.assertIsNone(self.device.token) 73 74 # Generate token 75 self.device.generate_token() 76 token = self.device.token 77 self.assertIsNotNone(token) 78 self.assertIsNotNone(self.device.valid_until) 79 self.assertTrue(self.device.valid_until > now()) 80 81 # Verify invalid token 82 self.assertFalse(self.device.verify_token("000000")) 83 84 # Verify correct token (should clear token after verification) 85 self.device.throttle_reset(commit=False) 86 self.assertTrue(self.device.verify_token(token)) 87 self.assertIsNone(self.device.token)
Test token generation, validation and expiry
89 @patch( 90 "authentik.stages.authenticator_email.models.AuthenticatorEmailStage.backend_class", 91 PropertyMock(return_value=EmailBackend), 92 ) 93 def test_stage_no_prefill(self): 94 """Test stage without prefilled email""" 95 self.client.force_login(self.user_noemail) 96 response = self.client.get( 97 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 98 ) 99 self.assertStageResponse( 100 response, 101 self.flow, 102 self.user_noemail, 103 component="ak-stage-authenticator-email", 104 email_required=True, 105 )
Test stage without prefilled email
107 @patch( 108 "authentik.stages.authenticator_email.models.AuthenticatorEmailStage.backend_class", 109 PropertyMock(return_value=EmailBackend), 110 ) 111 def test_stage_submit(self): 112 """Test stage email submission""" 113 # test fail because of existing device 114 response = self.client.get( 115 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 116 ) 117 self.assertStageResponse( 118 response, 119 self.flow, 120 self.user, 121 component="ak-stage-access-denied", 122 ) 123 self.device.delete() 124 # Initialize the flow 125 response = self.client.get( 126 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 127 ) 128 self.assertStageResponse( 129 response, 130 self.flow, 131 self.user, 132 component="ak-stage-authenticator-email", 133 email_required=False, 134 ) 135 136 response = self.client.post( 137 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 138 data={"component": "ak-stage-authenticator-email", "email": "test@example.com"}, 139 ) 140 self.assertEqual(response.status_code, 200) 141 self.assertEqual(len(mail.outbox), 2) 142 sent_mail = mail.outbox[1] 143 self.assertEqual(sent_mail.subject, self.stage.subject) 144 self.assertEqual(sent_mail.to, [f"{self.user} <test@example.com>"]) 145 # Get from_address from global email config to test if global settings are being used 146 from_address_global = CONFIG.get("email.from") 147 self.assertEqual(sent_mail.from_email, from_address_global) 148 149 self.assertStageResponse( 150 response, 151 self.flow, 152 self.user, 153 component="ak-stage-authenticator-email", 154 response_errors={}, 155 email_required=False, 156 )
Test stage email submission
158 def test_email_template(self): 159 """Test email template rendering""" 160 self.device.generate_token() 161 message = self.device._compose_email() 162 163 self.assertIsInstance(message, TemplateEmailMessage) 164 self.assertEqual(message.subject, self.stage.subject) 165 self.assertEqual(message.to, [f"{self.user.name} <{self.device.email}>"]) 166 self.assertTrue(self.device.token in message.body)
Test email template rendering
168 def test_duplicate_email(self): 169 """Test attempting to use same email twice""" 170 email = "test2@authentik.local" 171 # First device 172 EmailDevice.objects.create( 173 user=self.user, 174 stage=self.stage, 175 email=email, 176 ) 177 # Attempt to create second device with same email 178 with self.assertRaises(IntegrityError): 179 EmailDevice.objects.create( 180 user=self.user, 181 stage=self.stage, 182 email=email, 183 )
Test attempting to use same email twice
185 def test_token_expiry(self): 186 """Test token expiration behavior""" 187 self.device.generate_token() 188 token = self.device.token 189 # Set token as expired 190 self.device.valid_until = now() - timedelta(minutes=1) 191 self.device.save() 192 # Verify expired token fails 193 self.assertFalse(self.device.verify_token(token))
Test token expiration behavior
195 def test_template_errors(self): 196 """Test handling of template errors""" 197 self.stage.template = "{% invalid template %}" 198 with self.assertRaises(TemplateDoesNotExist): 199 self.stage.send(self.device)
Test handling of template errors
201 @patch( 202 "authentik.stages.authenticator_email.models.AuthenticatorEmailStage.backend_class", 203 PropertyMock(return_value=EmailBackend), 204 ) 205 def test_challenge_response_validation(self): 206 """Test challenge response validation""" 207 # Initialize the flow 208 self.client.force_login(self.user_noemail) 209 response = self.client.get( 210 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 211 ) 212 213 # Test missing code and email 214 response = self.client.post( 215 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 216 data={"component": "ak-stage-authenticator-email"}, 217 ) 218 self.assertStageResponse( 219 response, 220 self.flow, 221 response_errors={"non_field_errors": [{"code": "invalid", "string": "email required"}]}, 222 ) 223 224 # Test invalid code 225 response = self.client.post( 226 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 227 data={"component": "ak-stage-authenticator-email", "code": "000000"}, 228 ) 229 self.assertStageResponse( 230 response, 231 self.flow, 232 response_errors={ 233 "non_field_errors": [{"code": "invalid", "string": "Code does not match"}] 234 }, 235 ) 236 237 # Test valid code 238 device = self.device 239 token = device.token 240 response = self.client.post( 241 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 242 data={"component": "ak-stage-authenticator-email", "code": token}, 243 ) 244 self.assertEqual(response.status_code, 200) 245 self.assertTrue(device.confirmed)
Test challenge response validation
247 @patch( 248 "authentik.stages.authenticator_email.models.AuthenticatorEmailStage.backend_class", 249 PropertyMock(return_value=EmailBackend), 250 ) 251 def test_challenge_generation(self): 252 """Test challenge generation""" 253 # Test with masked email 254 self.device.delete() 255 response = self.client.get( 256 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 257 ) 258 self.assertStageResponse( 259 response, 260 self.flow, 261 self.user, 262 component="ak-stage-authenticator-email", 263 email_required=False, 264 ) 265 masked_email = mask_email(self.user.email) 266 self.assertEqual(masked_email, response.json()["email"]) 267 self.client.logout() 268 269 # Test without email 270 self.client.force_login(self.user_noemail) 271 response = self.client.get( 272 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 273 ) 274 self.assertStageResponse( 275 response, 276 self.flow, 277 self.user_noemail, 278 component="ak-stage-authenticator-email", 279 email_required=True, 280 ) 281 self.assertIsNone(response.json()["email"])
Test challenge generation
283 @patch( 284 "authentik.stages.authenticator_email.models.AuthenticatorEmailStage.backend_class", 285 PropertyMock(return_value=EmailBackend), 286 ) 287 def test_session_management(self): 288 """Test session device management""" 289 # Test device creation in session 290 # Delete any existing devices for this test 291 EmailDevice.objects.filter(user=self.user).delete() 292 293 response = self.client.get( 294 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 295 ) 296 device = self.get_flow_plan().context[PLAN_CONTEXT_EMAIL_DEVICE] 297 self.assertIsInstance(device, EmailDevice) 298 self.assertFalse(device.confirmed) 299 self.assertEqual(device.user, self.user) 300 301 # Test device confirmation and cleanup 302 device.confirmed = True 303 device.email = "new_test@authentik.local" # Use a different email 304 response = self.client.post( 305 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 306 data={"component": "ak-stage-authenticator-email", "code": device.token}, 307 ) 308 self.assertEqual(response.status_code, 200)
Test session device management
310 def test_model_properties_and_methods(self): 311 """Test model properties""" 312 device = self.device 313 stage = self.stage 314 315 self.assertEqual(stage.serializer, AuthenticatorEmailStageSerializer) 316 self.assertIsInstance(stage.backend, SMTPEmailBackend) 317 self.assertEqual(device.serializer, EmailDeviceSerializer) 318 319 # Test AuthenticatorEmailStage send method 320 self.device.generate_token() 321 # Test EmailDevice _compose_email method 322 message = self.device._compose_email() 323 self.assertIsInstance(message, TemplateEmailMessage) 324 self.assertEqual(message.subject, self.stage.subject) 325 self.assertEqual(message.to, [f"{self.user.name} <{self.device.email}>"]) 326 self.assertTrue(self.device.token in message.body)
Test model properties
328 @patch( 329 "authentik.stages.authenticator_email.models.AuthenticatorEmailStage.backend_class", 330 PropertyMock(return_value=EmailBackend), 331 ) 332 def test_email_tasks(self): 333 # Test AuthenticatorEmailStage send method 334 self.stage.send(self.device) 335 self.assertEqual(len(mail.outbox), 1)
338class TestEmailDeviceThrottling(ThrottlingTestMixin, TestCase): 339 def setUp(self): 340 super().setUp() 341 flow = create_test_flow() 342 user = create_test_user() 343 stage = AuthenticatorEmailStage.objects.create( 344 name="email-authenticator-throttle", 345 use_global_settings=True, 346 from_address="test@authentik.local", 347 configure_flow=flow, 348 token_expiry="minutes=30", 349 ) # nosec 350 self.device = EmailDevice.objects.create( 351 user=user, stage=stage, email="throttle@authentik.local" 352 ) 353 self.device.generate_token() 354 355 def valid_token(self): 356 return self.device.token 357 358 def invalid_token(self): 359 return "000000" if self.device.token != "000000" else "111111"
Generic tests for throttled devices.
Any concrete device implementation that uses throttling should define a TestCase subclass that includes this as a base class. This will help verify a correct integration of ThrottlingMixin.
Subclasses are responsible for populating self.device with a device to test as well as implementing methods to generate tokens to test with.
339 def setUp(self): 340 super().setUp() 341 flow = create_test_flow() 342 user = create_test_user() 343 stage = AuthenticatorEmailStage.objects.create( 344 name="email-authenticator-throttle", 345 use_global_settings=True, 346 from_address="test@authentik.local", 347 configure_flow=flow, 348 token_expiry="minutes=30", 349 ) # nosec 350 self.device = EmailDevice.objects.create( 351 user=user, stage=stage, email="throttle@authentik.local" 352 ) 353 self.device.generate_token()
Hook method for setting up the test fixture before exercising it.