authentik.stages.authenticator_email.tests

Test Email Authenticator API

  1"""Test Email Authenticator API"""
  2
  3from datetime import timedelta
  4from unittest.mock import PropertyMock, patch
  5
  6from django.core import mail
  7from django.core.mail.backends.locmem import EmailBackend
  8from django.core.mail.backends.smtp import EmailBackend as SMTPEmailBackend
  9from django.db.utils import IntegrityError
 10from django.template.exceptions import TemplateDoesNotExist
 11from django.test import TestCase
 12from django.urls import reverse
 13from django.utils.timezone import now
 14
 15from authentik.core.tests.utils import create_test_flow, create_test_user
 16from authentik.flows.models import FlowStageBinding
 17from authentik.flows.tests import FlowTestCase
 18from authentik.lib.config import CONFIG
 19from authentik.lib.utils.email import mask_email
 20from authentik.stages.authenticator.tests import ThrottlingTestMixin
 21from authentik.stages.authenticator_email.api import (
 22    AuthenticatorEmailStageSerializer,
 23    EmailDeviceSerializer,
 24)
 25from authentik.stages.authenticator_email.models import AuthenticatorEmailStage, EmailDevice
 26from authentik.stages.authenticator_email.stage import PLAN_CONTEXT_EMAIL_DEVICE
 27from authentik.stages.email.utils import TemplateEmailMessage
 28
 29
 30class TestAuthenticatorEmailStage(FlowTestCase):
 31    """Test Email Authenticator stage"""
 32
 33    def setUp(self):
 34        super().setUp()
 35        self.flow = create_test_flow()
 36        self.user = create_test_user()
 37        self.user_noemail = create_test_user(email="")
 38        self.stage = AuthenticatorEmailStage.objects.create(
 39            name="email-authenticator",
 40            use_global_settings=True,
 41            from_address="test@authentik.local",
 42            configure_flow=self.flow,
 43            token_expiry="minutes=30",
 44        )  # nosec
 45        self.binding = FlowStageBinding.objects.create(target=self.flow, stage=self.stage, order=0)
 46        self.device = EmailDevice.objects.create(
 47            user=self.user,
 48            stage=self.stage,
 49            email="test@authentik.local",
 50        )
 51        self.client.force_login(self.user)
 52
 53    def test_device_str(self):
 54        """Test string representation of device"""
 55        self.assertEqual(str(self.device), f"Email Device for {self.user.pk}")
 56        # Test unsaved device
 57        unsaved_device = EmailDevice(
 58            user=self.user,
 59            stage=self.stage,
 60            email="test@authentik.local",
 61        )
 62        self.assertEqual(str(unsaved_device), "New Email Device")
 63
 64    def test_stage_str(self):
 65        """Test string representation of stage"""
 66        self.assertEqual(str(self.stage), f"Email Authenticator Stage {self.stage.name}")
 67
 68    def test_token_lifecycle(self):
 69        """Test token generation, validation and expiry"""
 70        # Initially no token
 71        self.assertIsNone(self.device.token)
 72
 73        # Generate token
 74        self.device.generate_token()
 75        token = self.device.token
 76        self.assertIsNotNone(token)
 77        self.assertIsNotNone(self.device.valid_until)
 78        self.assertTrue(self.device.valid_until > now())
 79
 80        # Verify invalid token
 81        self.assertFalse(self.device.verify_token("000000"))
 82
 83        # Verify correct token (should clear token after verification)
 84        self.device.throttle_reset(commit=False)
 85        self.assertTrue(self.device.verify_token(token))
 86        self.assertIsNone(self.device.token)
 87
 88    @patch(
 89        "authentik.stages.authenticator_email.models.AuthenticatorEmailStage.backend_class",
 90        PropertyMock(return_value=EmailBackend),
 91    )
 92    def test_stage_no_prefill(self):
 93        """Test stage without prefilled email"""
 94        self.client.force_login(self.user_noemail)
 95        response = self.client.get(
 96            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
 97        )
 98        self.assertStageResponse(
 99            response,
100            self.flow,
101            self.user_noemail,
102            component="ak-stage-authenticator-email",
103            email_required=True,
104        )
105
106    @patch(
107        "authentik.stages.authenticator_email.models.AuthenticatorEmailStage.backend_class",
108        PropertyMock(return_value=EmailBackend),
109    )
110    def test_stage_submit(self):
111        """Test stage email submission"""
112        # test fail because of existing device
113        response = self.client.get(
114            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
115        )
116        self.assertStageResponse(
117            response,
118            self.flow,
119            self.user,
120            component="ak-stage-access-denied",
121        )
122        self.device.delete()
123        # Initialize the flow
124        response = self.client.get(
125            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
126        )
127        self.assertStageResponse(
128            response,
129            self.flow,
130            self.user,
131            component="ak-stage-authenticator-email",
132            email_required=False,
133        )
134
135        response = self.client.post(
136            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
137            data={"component": "ak-stage-authenticator-email", "email": "test@example.com"},
138        )
139        self.assertEqual(response.status_code, 200)
140        self.assertEqual(len(mail.outbox), 2)
141        sent_mail = mail.outbox[1]
142        self.assertEqual(sent_mail.subject, self.stage.subject)
143        self.assertEqual(sent_mail.to, [f"{self.user} <test@example.com>"])
144        # Get from_address from global email config to test if global settings are being used
145        from_address_global = CONFIG.get("email.from")
146        self.assertEqual(sent_mail.from_email, from_address_global)
147
148        self.assertStageResponse(
149            response,
150            self.flow,
151            self.user,
152            component="ak-stage-authenticator-email",
153            response_errors={},
154            email_required=False,
155        )
156
157    def test_email_template(self):
158        """Test email template rendering"""
159        self.device.generate_token()
160        message = self.device._compose_email()
161
162        self.assertIsInstance(message, TemplateEmailMessage)
163        self.assertEqual(message.subject, self.stage.subject)
164        self.assertEqual(message.to, [f"{self.user.name} <{self.device.email}>"])
165        self.assertTrue(self.device.token in message.body)
166
167    def test_duplicate_email(self):
168        """Test attempting to use same email twice"""
169        email = "test2@authentik.local"
170        # First device
171        EmailDevice.objects.create(
172            user=self.user,
173            stage=self.stage,
174            email=email,
175        )
176        # Attempt to create second device with same email
177        with self.assertRaises(IntegrityError):
178            EmailDevice.objects.create(
179                user=self.user,
180                stage=self.stage,
181                email=email,
182            )
183
184    def test_token_expiry(self):
185        """Test token expiration behavior"""
186        self.device.generate_token()
187        token = self.device.token
188        # Set token as expired
189        self.device.valid_until = now() - timedelta(minutes=1)
190        self.device.save()
191        # Verify expired token fails
192        self.assertFalse(self.device.verify_token(token))
193
194    def test_template_errors(self):
195        """Test handling of template errors"""
196        self.stage.template = "{% invalid template %}"
197        with self.assertRaises(TemplateDoesNotExist):
198            self.stage.send(self.device)
199
200    @patch(
201        "authentik.stages.authenticator_email.models.AuthenticatorEmailStage.backend_class",
202        PropertyMock(return_value=EmailBackend),
203    )
204    def test_challenge_response_validation(self):
205        """Test challenge response validation"""
206        # Initialize the flow
207        self.client.force_login(self.user_noemail)
208        response = self.client.get(
209            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
210        )
211
212        # Test missing code and email
213        response = self.client.post(
214            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
215            data={"component": "ak-stage-authenticator-email"},
216        )
217        self.assertStageResponse(
218            response,
219            self.flow,
220            response_errors={"non_field_errors": [{"code": "invalid", "string": "email required"}]},
221        )
222
223        # Test invalid code
224        response = self.client.post(
225            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
226            data={"component": "ak-stage-authenticator-email", "code": "000000"},
227        )
228        self.assertStageResponse(
229            response,
230            self.flow,
231            response_errors={
232                "non_field_errors": [{"code": "invalid", "string": "Code does not match"}]
233            },
234        )
235
236        # Test valid code
237        device = self.device
238        token = device.token
239        response = self.client.post(
240            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
241            data={"component": "ak-stage-authenticator-email", "code": token},
242        )
243        self.assertEqual(response.status_code, 200)
244        self.assertTrue(device.confirmed)
245
246    @patch(
247        "authentik.stages.authenticator_email.models.AuthenticatorEmailStage.backend_class",
248        PropertyMock(return_value=EmailBackend),
249    )
250    def test_challenge_generation(self):
251        """Test challenge generation"""
252        # Test with masked email
253        self.device.delete()
254        response = self.client.get(
255            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
256        )
257        self.assertStageResponse(
258            response,
259            self.flow,
260            self.user,
261            component="ak-stage-authenticator-email",
262            email_required=False,
263        )
264        masked_email = mask_email(self.user.email)
265        self.assertEqual(masked_email, response.json()["email"])
266        self.client.logout()
267
268        # Test without email
269        self.client.force_login(self.user_noemail)
270        response = self.client.get(
271            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
272        )
273        self.assertStageResponse(
274            response,
275            self.flow,
276            self.user_noemail,
277            component="ak-stage-authenticator-email",
278            email_required=True,
279        )
280        self.assertIsNone(response.json()["email"])
281
282    @patch(
283        "authentik.stages.authenticator_email.models.AuthenticatorEmailStage.backend_class",
284        PropertyMock(return_value=EmailBackend),
285    )
286    def test_session_management(self):
287        """Test session device management"""
288        # Test device creation in session
289        # Delete any existing devices for this test
290        EmailDevice.objects.filter(user=self.user).delete()
291
292        response = self.client.get(
293            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
294        )
295        device = self.get_flow_plan().context[PLAN_CONTEXT_EMAIL_DEVICE]
296        self.assertIsInstance(device, EmailDevice)
297        self.assertFalse(device.confirmed)
298        self.assertEqual(device.user, self.user)
299
300        # Test device confirmation and cleanup
301        device.confirmed = True
302        device.email = "new_test@authentik.local"  # Use a different email
303        response = self.client.post(
304            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
305            data={"component": "ak-stage-authenticator-email", "code": device.token},
306        )
307        self.assertEqual(response.status_code, 200)
308
309    def test_model_properties_and_methods(self):
310        """Test model properties"""
311        device = self.device
312        stage = self.stage
313
314        self.assertEqual(stage.serializer, AuthenticatorEmailStageSerializer)
315        self.assertIsInstance(stage.backend, SMTPEmailBackend)
316        self.assertEqual(device.serializer, EmailDeviceSerializer)
317
318        # Test AuthenticatorEmailStage send method
319        self.device.generate_token()
320        # Test EmailDevice _compose_email method
321        message = self.device._compose_email()
322        self.assertIsInstance(message, TemplateEmailMessage)
323        self.assertEqual(message.subject, self.stage.subject)
324        self.assertEqual(message.to, [f"{self.user.name} <{self.device.email}>"])
325        self.assertTrue(self.device.token in message.body)
326
327    @patch(
328        "authentik.stages.authenticator_email.models.AuthenticatorEmailStage.backend_class",
329        PropertyMock(return_value=EmailBackend),
330    )
331    def test_email_tasks(self):
332        # Test AuthenticatorEmailStage send method
333        self.stage.send(self.device)
334        self.assertEqual(len(mail.outbox), 1)
335
336
337class TestEmailDeviceThrottling(ThrottlingTestMixin, TestCase):
338    def setUp(self):
339        super().setUp()
340        flow = create_test_flow()
341        user = create_test_user()
342        stage = AuthenticatorEmailStage.objects.create(
343            name="email-authenticator-throttle",
344            use_global_settings=True,
345            from_address="test@authentik.local",
346            configure_flow=flow,
347            token_expiry="minutes=30",
348        )  # nosec
349        self.device = EmailDevice.objects.create(
350            user=user, stage=stage, email="throttle@authentik.local"
351        )
352        self.device.generate_token()
353
354    def valid_token(self):
355        return self.device.token
356
357    def invalid_token(self):
358        return "000000" if self.device.token != "000000" else "111111"
class TestAuthenticatorEmailStage(authentik.flows.tests.FlowTestCase):
 31class TestAuthenticatorEmailStage(FlowTestCase):
 32    """Test Email Authenticator stage"""
 33
 34    def setUp(self):
 35        super().setUp()
 36        self.flow = create_test_flow()
 37        self.user = create_test_user()
 38        self.user_noemail = create_test_user(email="")
 39        self.stage = AuthenticatorEmailStage.objects.create(
 40            name="email-authenticator",
 41            use_global_settings=True,
 42            from_address="test@authentik.local",
 43            configure_flow=self.flow,
 44            token_expiry="minutes=30",
 45        )  # nosec
 46        self.binding = FlowStageBinding.objects.create(target=self.flow, stage=self.stage, order=0)
 47        self.device = EmailDevice.objects.create(
 48            user=self.user,
 49            stage=self.stage,
 50            email="test@authentik.local",
 51        )
 52        self.client.force_login(self.user)
 53
 54    def test_device_str(self):
 55        """Test string representation of device"""
 56        self.assertEqual(str(self.device), f"Email Device for {self.user.pk}")
 57        # Test unsaved device
 58        unsaved_device = EmailDevice(
 59            user=self.user,
 60            stage=self.stage,
 61            email="test@authentik.local",
 62        )
 63        self.assertEqual(str(unsaved_device), "New Email Device")
 64
 65    def test_stage_str(self):
 66        """Test string representation of stage"""
 67        self.assertEqual(str(self.stage), f"Email Authenticator Stage {self.stage.name}")
 68
 69    def test_token_lifecycle(self):
 70        """Test token generation, validation and expiry"""
 71        # Initially no token
 72        self.assertIsNone(self.device.token)
 73
 74        # Generate token
 75        self.device.generate_token()
 76        token = self.device.token
 77        self.assertIsNotNone(token)
 78        self.assertIsNotNone(self.device.valid_until)
 79        self.assertTrue(self.device.valid_until > now())
 80
 81        # Verify invalid token
 82        self.assertFalse(self.device.verify_token("000000"))
 83
 84        # Verify correct token (should clear token after verification)
 85        self.device.throttle_reset(commit=False)
 86        self.assertTrue(self.device.verify_token(token))
 87        self.assertIsNone(self.device.token)
 88
 89    @patch(
 90        "authentik.stages.authenticator_email.models.AuthenticatorEmailStage.backend_class",
 91        PropertyMock(return_value=EmailBackend),
 92    )
 93    def test_stage_no_prefill(self):
 94        """Test stage without prefilled email"""
 95        self.client.force_login(self.user_noemail)
 96        response = self.client.get(
 97            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
 98        )
 99        self.assertStageResponse(
100            response,
101            self.flow,
102            self.user_noemail,
103            component="ak-stage-authenticator-email",
104            email_required=True,
105        )
106
107    @patch(
108        "authentik.stages.authenticator_email.models.AuthenticatorEmailStage.backend_class",
109        PropertyMock(return_value=EmailBackend),
110    )
111    def test_stage_submit(self):
112        """Test stage email submission"""
113        # test fail because of existing device
114        response = self.client.get(
115            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
116        )
117        self.assertStageResponse(
118            response,
119            self.flow,
120            self.user,
121            component="ak-stage-access-denied",
122        )
123        self.device.delete()
124        # Initialize the flow
125        response = self.client.get(
126            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
127        )
128        self.assertStageResponse(
129            response,
130            self.flow,
131            self.user,
132            component="ak-stage-authenticator-email",
133            email_required=False,
134        )
135
136        response = self.client.post(
137            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
138            data={"component": "ak-stage-authenticator-email", "email": "test@example.com"},
139        )
140        self.assertEqual(response.status_code, 200)
141        self.assertEqual(len(mail.outbox), 2)
142        sent_mail = mail.outbox[1]
143        self.assertEqual(sent_mail.subject, self.stage.subject)
144        self.assertEqual(sent_mail.to, [f"{self.user} <test@example.com>"])
145        # Get from_address from global email config to test if global settings are being used
146        from_address_global = CONFIG.get("email.from")
147        self.assertEqual(sent_mail.from_email, from_address_global)
148
149        self.assertStageResponse(
150            response,
151            self.flow,
152            self.user,
153            component="ak-stage-authenticator-email",
154            response_errors={},
155            email_required=False,
156        )
157
158    def test_email_template(self):
159        """Test email template rendering"""
160        self.device.generate_token()
161        message = self.device._compose_email()
162
163        self.assertIsInstance(message, TemplateEmailMessage)
164        self.assertEqual(message.subject, self.stage.subject)
165        self.assertEqual(message.to, [f"{self.user.name} <{self.device.email}>"])
166        self.assertTrue(self.device.token in message.body)
167
168    def test_duplicate_email(self):
169        """Test attempting to use same email twice"""
170        email = "test2@authentik.local"
171        # First device
172        EmailDevice.objects.create(
173            user=self.user,
174            stage=self.stage,
175            email=email,
176        )
177        # Attempt to create second device with same email
178        with self.assertRaises(IntegrityError):
179            EmailDevice.objects.create(
180                user=self.user,
181                stage=self.stage,
182                email=email,
183            )
184
185    def test_token_expiry(self):
186        """Test token expiration behavior"""
187        self.device.generate_token()
188        token = self.device.token
189        # Set token as expired
190        self.device.valid_until = now() - timedelta(minutes=1)
191        self.device.save()
192        # Verify expired token fails
193        self.assertFalse(self.device.verify_token(token))
194
195    def test_template_errors(self):
196        """Test handling of template errors"""
197        self.stage.template = "{% invalid template %}"
198        with self.assertRaises(TemplateDoesNotExist):
199            self.stage.send(self.device)
200
201    @patch(
202        "authentik.stages.authenticator_email.models.AuthenticatorEmailStage.backend_class",
203        PropertyMock(return_value=EmailBackend),
204    )
205    def test_challenge_response_validation(self):
206        """Test challenge response validation"""
207        # Initialize the flow
208        self.client.force_login(self.user_noemail)
209        response = self.client.get(
210            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
211        )
212
213        # Test missing code and email
214        response = self.client.post(
215            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
216            data={"component": "ak-stage-authenticator-email"},
217        )
218        self.assertStageResponse(
219            response,
220            self.flow,
221            response_errors={"non_field_errors": [{"code": "invalid", "string": "email required"}]},
222        )
223
224        # Test invalid code
225        response = self.client.post(
226            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
227            data={"component": "ak-stage-authenticator-email", "code": "000000"},
228        )
229        self.assertStageResponse(
230            response,
231            self.flow,
232            response_errors={
233                "non_field_errors": [{"code": "invalid", "string": "Code does not match"}]
234            },
235        )
236
237        # Test valid code
238        device = self.device
239        token = device.token
240        response = self.client.post(
241            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
242            data={"component": "ak-stage-authenticator-email", "code": token},
243        )
244        self.assertEqual(response.status_code, 200)
245        self.assertTrue(device.confirmed)
246
247    @patch(
248        "authentik.stages.authenticator_email.models.AuthenticatorEmailStage.backend_class",
249        PropertyMock(return_value=EmailBackend),
250    )
251    def test_challenge_generation(self):
252        """Test challenge generation"""
253        # Test with masked email
254        self.device.delete()
255        response = self.client.get(
256            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
257        )
258        self.assertStageResponse(
259            response,
260            self.flow,
261            self.user,
262            component="ak-stage-authenticator-email",
263            email_required=False,
264        )
265        masked_email = mask_email(self.user.email)
266        self.assertEqual(masked_email, response.json()["email"])
267        self.client.logout()
268
269        # Test without email
270        self.client.force_login(self.user_noemail)
271        response = self.client.get(
272            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
273        )
274        self.assertStageResponse(
275            response,
276            self.flow,
277            self.user_noemail,
278            component="ak-stage-authenticator-email",
279            email_required=True,
280        )
281        self.assertIsNone(response.json()["email"])
282
283    @patch(
284        "authentik.stages.authenticator_email.models.AuthenticatorEmailStage.backend_class",
285        PropertyMock(return_value=EmailBackend),
286    )
287    def test_session_management(self):
288        """Test session device management"""
289        # Test device creation in session
290        # Delete any existing devices for this test
291        EmailDevice.objects.filter(user=self.user).delete()
292
293        response = self.client.get(
294            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
295        )
296        device = self.get_flow_plan().context[PLAN_CONTEXT_EMAIL_DEVICE]
297        self.assertIsInstance(device, EmailDevice)
298        self.assertFalse(device.confirmed)
299        self.assertEqual(device.user, self.user)
300
301        # Test device confirmation and cleanup
302        device.confirmed = True
303        device.email = "new_test@authentik.local"  # Use a different email
304        response = self.client.post(
305            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
306            data={"component": "ak-stage-authenticator-email", "code": device.token},
307        )
308        self.assertEqual(response.status_code, 200)
309
310    def test_model_properties_and_methods(self):
311        """Test model properties"""
312        device = self.device
313        stage = self.stage
314
315        self.assertEqual(stage.serializer, AuthenticatorEmailStageSerializer)
316        self.assertIsInstance(stage.backend, SMTPEmailBackend)
317        self.assertEqual(device.serializer, EmailDeviceSerializer)
318
319        # Test AuthenticatorEmailStage send method
320        self.device.generate_token()
321        # Test EmailDevice _compose_email method
322        message = self.device._compose_email()
323        self.assertIsInstance(message, TemplateEmailMessage)
324        self.assertEqual(message.subject, self.stage.subject)
325        self.assertEqual(message.to, [f"{self.user.name} <{self.device.email}>"])
326        self.assertTrue(self.device.token in message.body)
327
328    @patch(
329        "authentik.stages.authenticator_email.models.AuthenticatorEmailStage.backend_class",
330        PropertyMock(return_value=EmailBackend),
331    )
332    def test_email_tasks(self):
333        # Test AuthenticatorEmailStage send method
334        self.stage.send(self.device)
335        self.assertEqual(len(mail.outbox), 1)

Test Email Authenticator stage

def setUp(self):
34    def setUp(self):
35        super().setUp()
36        self.flow = create_test_flow()
37        self.user = create_test_user()
38        self.user_noemail = create_test_user(email="")
39        self.stage = AuthenticatorEmailStage.objects.create(
40            name="email-authenticator",
41            use_global_settings=True,
42            from_address="test@authentik.local",
43            configure_flow=self.flow,
44            token_expiry="minutes=30",
45        )  # nosec
46        self.binding = FlowStageBinding.objects.create(target=self.flow, stage=self.stage, order=0)
47        self.device = EmailDevice.objects.create(
48            user=self.user,
49            stage=self.stage,
50            email="test@authentik.local",
51        )
52        self.client.force_login(self.user)

Hook method for setting up the test fixture before exercising it.

def test_device_str(self):
54    def test_device_str(self):
55        """Test string representation of device"""
56        self.assertEqual(str(self.device), f"Email Device for {self.user.pk}")
57        # Test unsaved device
58        unsaved_device = EmailDevice(
59            user=self.user,
60            stage=self.stage,
61            email="test@authentik.local",
62        )
63        self.assertEqual(str(unsaved_device), "New Email Device")

Test string representation of device

def test_stage_str(self):
65    def test_stage_str(self):
66        """Test string representation of stage"""
67        self.assertEqual(str(self.stage), f"Email Authenticator Stage {self.stage.name}")

Test string representation of stage

def test_token_lifecycle(self):
69    def test_token_lifecycle(self):
70        """Test token generation, validation and expiry"""
71        # Initially no token
72        self.assertIsNone(self.device.token)
73
74        # Generate token
75        self.device.generate_token()
76        token = self.device.token
77        self.assertIsNotNone(token)
78        self.assertIsNotNone(self.device.valid_until)
79        self.assertTrue(self.device.valid_until > now())
80
81        # Verify invalid token
82        self.assertFalse(self.device.verify_token("000000"))
83
84        # Verify correct token (should clear token after verification)
85        self.device.throttle_reset(commit=False)
86        self.assertTrue(self.device.verify_token(token))
87        self.assertIsNone(self.device.token)

Test token generation, validation and expiry

@patch('authentik.stages.authenticator_email.models.AuthenticatorEmailStage.backend_class', PropertyMock(return_value=EmailBackend))
def test_stage_no_prefill(self):
 89    @patch(
 90        "authentik.stages.authenticator_email.models.AuthenticatorEmailStage.backend_class",
 91        PropertyMock(return_value=EmailBackend),
 92    )
 93    def test_stage_no_prefill(self):
 94        """Test stage without prefilled email"""
 95        self.client.force_login(self.user_noemail)
 96        response = self.client.get(
 97            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
 98        )
 99        self.assertStageResponse(
100            response,
101            self.flow,
102            self.user_noemail,
103            component="ak-stage-authenticator-email",
104            email_required=True,
105        )

Test stage without prefilled email

@patch('authentik.stages.authenticator_email.models.AuthenticatorEmailStage.backend_class', PropertyMock(return_value=EmailBackend))
def test_stage_submit(self):
107    @patch(
108        "authentik.stages.authenticator_email.models.AuthenticatorEmailStage.backend_class",
109        PropertyMock(return_value=EmailBackend),
110    )
111    def test_stage_submit(self):
112        """Test stage email submission"""
113        # test fail because of existing device
114        response = self.client.get(
115            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
116        )
117        self.assertStageResponse(
118            response,
119            self.flow,
120            self.user,
121            component="ak-stage-access-denied",
122        )
123        self.device.delete()
124        # Initialize the flow
125        response = self.client.get(
126            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
127        )
128        self.assertStageResponse(
129            response,
130            self.flow,
131            self.user,
132            component="ak-stage-authenticator-email",
133            email_required=False,
134        )
135
136        response = self.client.post(
137            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
138            data={"component": "ak-stage-authenticator-email", "email": "test@example.com"},
139        )
140        self.assertEqual(response.status_code, 200)
141        self.assertEqual(len(mail.outbox), 2)
142        sent_mail = mail.outbox[1]
143        self.assertEqual(sent_mail.subject, self.stage.subject)
144        self.assertEqual(sent_mail.to, [f"{self.user} <test@example.com>"])
145        # Get from_address from global email config to test if global settings are being used
146        from_address_global = CONFIG.get("email.from")
147        self.assertEqual(sent_mail.from_email, from_address_global)
148
149        self.assertStageResponse(
150            response,
151            self.flow,
152            self.user,
153            component="ak-stage-authenticator-email",
154            response_errors={},
155            email_required=False,
156        )

Test stage email submission

def test_email_template(self):
158    def test_email_template(self):
159        """Test email template rendering"""
160        self.device.generate_token()
161        message = self.device._compose_email()
162
163        self.assertIsInstance(message, TemplateEmailMessage)
164        self.assertEqual(message.subject, self.stage.subject)
165        self.assertEqual(message.to, [f"{self.user.name} <{self.device.email}>"])
166        self.assertTrue(self.device.token in message.body)

Test email template rendering

def test_duplicate_email(self):
168    def test_duplicate_email(self):
169        """Test attempting to use same email twice"""
170        email = "test2@authentik.local"
171        # First device
172        EmailDevice.objects.create(
173            user=self.user,
174            stage=self.stage,
175            email=email,
176        )
177        # Attempt to create second device with same email
178        with self.assertRaises(IntegrityError):
179            EmailDevice.objects.create(
180                user=self.user,
181                stage=self.stage,
182                email=email,
183            )

Test attempting to use same email twice

def test_token_expiry(self):
185    def test_token_expiry(self):
186        """Test token expiration behavior"""
187        self.device.generate_token()
188        token = self.device.token
189        # Set token as expired
190        self.device.valid_until = now() - timedelta(minutes=1)
191        self.device.save()
192        # Verify expired token fails
193        self.assertFalse(self.device.verify_token(token))

Test token expiration behavior

def test_template_errors(self):
195    def test_template_errors(self):
196        """Test handling of template errors"""
197        self.stage.template = "{% invalid template %}"
198        with self.assertRaises(TemplateDoesNotExist):
199            self.stage.send(self.device)

Test handling of template errors

@patch('authentik.stages.authenticator_email.models.AuthenticatorEmailStage.backend_class', PropertyMock(return_value=EmailBackend))
def test_challenge_response_validation(self):
201    @patch(
202        "authentik.stages.authenticator_email.models.AuthenticatorEmailStage.backend_class",
203        PropertyMock(return_value=EmailBackend),
204    )
205    def test_challenge_response_validation(self):
206        """Test challenge response validation"""
207        # Initialize the flow
208        self.client.force_login(self.user_noemail)
209        response = self.client.get(
210            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
211        )
212
213        # Test missing code and email
214        response = self.client.post(
215            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
216            data={"component": "ak-stage-authenticator-email"},
217        )
218        self.assertStageResponse(
219            response,
220            self.flow,
221            response_errors={"non_field_errors": [{"code": "invalid", "string": "email required"}]},
222        )
223
224        # Test invalid code
225        response = self.client.post(
226            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
227            data={"component": "ak-stage-authenticator-email", "code": "000000"},
228        )
229        self.assertStageResponse(
230            response,
231            self.flow,
232            response_errors={
233                "non_field_errors": [{"code": "invalid", "string": "Code does not match"}]
234            },
235        )
236
237        # Test valid code
238        device = self.device
239        token = device.token
240        response = self.client.post(
241            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
242            data={"component": "ak-stage-authenticator-email", "code": token},
243        )
244        self.assertEqual(response.status_code, 200)
245        self.assertTrue(device.confirmed)

Test challenge response validation

@patch('authentik.stages.authenticator_email.models.AuthenticatorEmailStage.backend_class', PropertyMock(return_value=EmailBackend))
def test_challenge_generation(self):
247    @patch(
248        "authentik.stages.authenticator_email.models.AuthenticatorEmailStage.backend_class",
249        PropertyMock(return_value=EmailBackend),
250    )
251    def test_challenge_generation(self):
252        """Test challenge generation"""
253        # Test with masked email
254        self.device.delete()
255        response = self.client.get(
256            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
257        )
258        self.assertStageResponse(
259            response,
260            self.flow,
261            self.user,
262            component="ak-stage-authenticator-email",
263            email_required=False,
264        )
265        masked_email = mask_email(self.user.email)
266        self.assertEqual(masked_email, response.json()["email"])
267        self.client.logout()
268
269        # Test without email
270        self.client.force_login(self.user_noemail)
271        response = self.client.get(
272            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
273        )
274        self.assertStageResponse(
275            response,
276            self.flow,
277            self.user_noemail,
278            component="ak-stage-authenticator-email",
279            email_required=True,
280        )
281        self.assertIsNone(response.json()["email"])

Test challenge generation

@patch('authentik.stages.authenticator_email.models.AuthenticatorEmailStage.backend_class', PropertyMock(return_value=EmailBackend))
def test_session_management(self):
283    @patch(
284        "authentik.stages.authenticator_email.models.AuthenticatorEmailStage.backend_class",
285        PropertyMock(return_value=EmailBackend),
286    )
287    def test_session_management(self):
288        """Test session device management"""
289        # Test device creation in session
290        # Delete any existing devices for this test
291        EmailDevice.objects.filter(user=self.user).delete()
292
293        response = self.client.get(
294            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
295        )
296        device = self.get_flow_plan().context[PLAN_CONTEXT_EMAIL_DEVICE]
297        self.assertIsInstance(device, EmailDevice)
298        self.assertFalse(device.confirmed)
299        self.assertEqual(device.user, self.user)
300
301        # Test device confirmation and cleanup
302        device.confirmed = True
303        device.email = "new_test@authentik.local"  # Use a different email
304        response = self.client.post(
305            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
306            data={"component": "ak-stage-authenticator-email", "code": device.token},
307        )
308        self.assertEqual(response.status_code, 200)

Test session device management

def test_model_properties_and_methods(self):
310    def test_model_properties_and_methods(self):
311        """Test model properties"""
312        device = self.device
313        stage = self.stage
314
315        self.assertEqual(stage.serializer, AuthenticatorEmailStageSerializer)
316        self.assertIsInstance(stage.backend, SMTPEmailBackend)
317        self.assertEqual(device.serializer, EmailDeviceSerializer)
318
319        # Test AuthenticatorEmailStage send method
320        self.device.generate_token()
321        # Test EmailDevice _compose_email method
322        message = self.device._compose_email()
323        self.assertIsInstance(message, TemplateEmailMessage)
324        self.assertEqual(message.subject, self.stage.subject)
325        self.assertEqual(message.to, [f"{self.user.name} <{self.device.email}>"])
326        self.assertTrue(self.device.token in message.body)

Test model properties

@patch('authentik.stages.authenticator_email.models.AuthenticatorEmailStage.backend_class', PropertyMock(return_value=EmailBackend))
def test_email_tasks(self):
328    @patch(
329        "authentik.stages.authenticator_email.models.AuthenticatorEmailStage.backend_class",
330        PropertyMock(return_value=EmailBackend),
331    )
332    def test_email_tasks(self):
333        # Test AuthenticatorEmailStage send method
334        self.stage.send(self.device)
335        self.assertEqual(len(mail.outbox), 1)
class TestEmailDeviceThrottling(authentik.stages.authenticator.tests.ThrottlingTestMixin, django.test.testcases.TestCase):
338class TestEmailDeviceThrottling(ThrottlingTestMixin, TestCase):
339    def setUp(self):
340        super().setUp()
341        flow = create_test_flow()
342        user = create_test_user()
343        stage = AuthenticatorEmailStage.objects.create(
344            name="email-authenticator-throttle",
345            use_global_settings=True,
346            from_address="test@authentik.local",
347            configure_flow=flow,
348            token_expiry="minutes=30",
349        )  # nosec
350        self.device = EmailDevice.objects.create(
351            user=user, stage=stage, email="throttle@authentik.local"
352        )
353        self.device.generate_token()
354
355    def valid_token(self):
356        return self.device.token
357
358    def invalid_token(self):
359        return "000000" if self.device.token != "000000" else "111111"

Generic tests for throttled devices.

Any concrete device implementation that uses throttling should define a TestCase subclass that includes this as a base class. This will help verify a correct integration of ThrottlingMixin.

Subclasses are responsible for populating self.device with a device to test as well as implementing methods to generate tokens to test with.

def setUp(self):
339    def setUp(self):
340        super().setUp()
341        flow = create_test_flow()
342        user = create_test_user()
343        stage = AuthenticatorEmailStage.objects.create(
344            name="email-authenticator-throttle",
345            use_global_settings=True,
346            from_address="test@authentik.local",
347            configure_flow=flow,
348            token_expiry="minutes=30",
349        )  # nosec
350        self.device = EmailDevice.objects.create(
351            user=user, stage=stage, email="throttle@authentik.local"
352        )
353        self.device.generate_token()

Hook method for setting up the test fixture before exercising it.

def valid_token(self):
355    def valid_token(self):
356        return self.device.token

Returns a valid token to pass to our device under test.

def invalid_token(self):
358    def invalid_token(self):
359        return "000000" if self.device.token != "000000" else "111111"

Returns an invalid token to pass to our device under test.