authentik.stages.authenticator_email.tests

Test Email Authenticator API

  1"""Test Email Authenticator API"""
  2
  3from datetime import timedelta
  4from unittest.mock import PropertyMock, patch
  5
  6from django.core import mail
  7from django.core.mail.backends.locmem import EmailBackend
  8from django.core.mail.backends.smtp import EmailBackend as SMTPEmailBackend
  9from django.db.utils import IntegrityError
 10from django.template.exceptions import TemplateDoesNotExist
 11from django.urls import reverse
 12from django.utils.timezone import now
 13
 14from authentik.core.tests.utils import create_test_flow, create_test_user
 15from authentik.flows.models import FlowStageBinding
 16from authentik.flows.tests import FlowTestCase
 17from authentik.lib.config import CONFIG
 18from authentik.lib.utils.email import mask_email
 19from authentik.stages.authenticator_email.api import (
 20    AuthenticatorEmailStageSerializer,
 21    EmailDeviceSerializer,
 22)
 23from authentik.stages.authenticator_email.models import AuthenticatorEmailStage, EmailDevice
 24from authentik.stages.authenticator_email.stage import PLAN_CONTEXT_EMAIL_DEVICE
 25from authentik.stages.email.utils import TemplateEmailMessage
 26
 27
 28class TestAuthenticatorEmailStage(FlowTestCase):
 29    """Test Email Authenticator stage"""
 30
 31    def setUp(self):
 32        super().setUp()
 33        self.flow = create_test_flow()
 34        self.user = create_test_user()
 35        self.user_noemail = create_test_user(email="")
 36        self.stage = AuthenticatorEmailStage.objects.create(
 37            name="email-authenticator",
 38            use_global_settings=True,
 39            from_address="test@authentik.local",
 40            configure_flow=self.flow,
 41            token_expiry="minutes=30",
 42        )  # nosec
 43        self.binding = FlowStageBinding.objects.create(target=self.flow, stage=self.stage, order=0)
 44        self.device = EmailDevice.objects.create(
 45            user=self.user,
 46            stage=self.stage,
 47            email="test@authentik.local",
 48        )
 49        self.client.force_login(self.user)
 50
 51    def test_device_str(self):
 52        """Test string representation of device"""
 53        self.assertEqual(str(self.device), f"Email Device for {self.user.pk}")
 54        # Test unsaved device
 55        unsaved_device = EmailDevice(
 56            user=self.user,
 57            stage=self.stage,
 58            email="test@authentik.local",
 59        )
 60        self.assertEqual(str(unsaved_device), "New Email Device")
 61
 62    def test_stage_str(self):
 63        """Test string representation of stage"""
 64        self.assertEqual(str(self.stage), f"Email Authenticator Stage {self.stage.name}")
 65
 66    def test_token_lifecycle(self):
 67        """Test token generation, validation and expiry"""
 68        # Initially no token
 69        self.assertIsNone(self.device.token)
 70
 71        # Generate token
 72        self.device.generate_token()
 73        token = self.device.token
 74        self.assertIsNotNone(token)
 75        self.assertIsNotNone(self.device.valid_until)
 76        self.assertTrue(self.device.valid_until > now())
 77
 78        # Verify invalid token
 79        self.assertFalse(self.device.verify_token("000000"))
 80
 81        # Verify correct token (should clear token after verification)
 82        self.assertTrue(self.device.verify_token(token))
 83        self.assertIsNone(self.device.token)
 84
 85    @patch(
 86        "authentik.stages.authenticator_email.models.AuthenticatorEmailStage.backend_class",
 87        PropertyMock(return_value=EmailBackend),
 88    )
 89    def test_stage_no_prefill(self):
 90        """Test stage without prefilled email"""
 91        self.client.force_login(self.user_noemail)
 92        response = self.client.get(
 93            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
 94        )
 95        self.assertStageResponse(
 96            response,
 97            self.flow,
 98            self.user_noemail,
 99            component="ak-stage-authenticator-email",
100            email_required=True,
101        )
102
103    @patch(
104        "authentik.stages.authenticator_email.models.AuthenticatorEmailStage.backend_class",
105        PropertyMock(return_value=EmailBackend),
106    )
107    def test_stage_submit(self):
108        """Test stage email submission"""
109        # test fail because of existing device
110        response = self.client.get(
111            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
112        )
113        self.assertStageResponse(
114            response,
115            self.flow,
116            self.user,
117            component="ak-stage-access-denied",
118        )
119        self.device.delete()
120        # Initialize the flow
121        response = self.client.get(
122            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
123        )
124        self.assertStageResponse(
125            response,
126            self.flow,
127            self.user,
128            component="ak-stage-authenticator-email",
129            email_required=False,
130        )
131
132        response = self.client.post(
133            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
134            data={"component": "ak-stage-authenticator-email", "email": "test@example.com"},
135        )
136        self.assertEqual(response.status_code, 200)
137        self.assertEqual(len(mail.outbox), 2)
138        sent_mail = mail.outbox[1]
139        self.assertEqual(sent_mail.subject, self.stage.subject)
140        self.assertEqual(sent_mail.to, [f"{self.user} <test@example.com>"])
141        # Get from_address from global email config to test if global settings are being used
142        from_address_global = CONFIG.get("email.from")
143        self.assertEqual(sent_mail.from_email, from_address_global)
144
145        self.assertStageResponse(
146            response,
147            self.flow,
148            self.user,
149            component="ak-stage-authenticator-email",
150            response_errors={},
151            email_required=False,
152        )
153
154    def test_email_template(self):
155        """Test email template rendering"""
156        self.device.generate_token()
157        message = self.device._compose_email()
158
159        self.assertIsInstance(message, TemplateEmailMessage)
160        self.assertEqual(message.subject, self.stage.subject)
161        self.assertEqual(message.to, [f"{self.user.name} <{self.device.email}>"])
162        self.assertTrue(self.device.token in message.body)
163
164    def test_duplicate_email(self):
165        """Test attempting to use same email twice"""
166        email = "test2@authentik.local"
167        # First device
168        EmailDevice.objects.create(
169            user=self.user,
170            stage=self.stage,
171            email=email,
172        )
173        # Attempt to create second device with same email
174        with self.assertRaises(IntegrityError):
175            EmailDevice.objects.create(
176                user=self.user,
177                stage=self.stage,
178                email=email,
179            )
180
181    def test_token_expiry(self):
182        """Test token expiration behavior"""
183        self.device.generate_token()
184        token = self.device.token
185        # Set token as expired
186        self.device.valid_until = now() - timedelta(minutes=1)
187        self.device.save()
188        # Verify expired token fails
189        self.assertFalse(self.device.verify_token(token))
190
191    def test_template_errors(self):
192        """Test handling of template errors"""
193        self.stage.template = "{% invalid template %}"
194        with self.assertRaises(TemplateDoesNotExist):
195            self.stage.send(self.device)
196
197    @patch(
198        "authentik.stages.authenticator_email.models.AuthenticatorEmailStage.backend_class",
199        PropertyMock(return_value=EmailBackend),
200    )
201    def test_challenge_response_validation(self):
202        """Test challenge response validation"""
203        # Initialize the flow
204        self.client.force_login(self.user_noemail)
205        response = self.client.get(
206            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
207        )
208
209        # Test missing code and email
210        response = self.client.post(
211            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
212            data={"component": "ak-stage-authenticator-email"},
213        )
214        self.assertStageResponse(
215            response,
216            self.flow,
217            response_errors={"non_field_errors": [{"code": "invalid", "string": "email required"}]},
218        )
219
220        # Test invalid code
221        response = self.client.post(
222            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
223            data={"component": "ak-stage-authenticator-email", "code": "000000"},
224        )
225        self.assertStageResponse(
226            response,
227            self.flow,
228            response_errors={
229                "non_field_errors": [{"code": "invalid", "string": "Code does not match"}]
230            },
231        )
232
233        # Test valid code
234        device = self.device
235        token = device.token
236        response = self.client.post(
237            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
238            data={"component": "ak-stage-authenticator-email", "code": token},
239        )
240        self.assertEqual(response.status_code, 200)
241        self.assertTrue(device.confirmed)
242
243    @patch(
244        "authentik.stages.authenticator_email.models.AuthenticatorEmailStage.backend_class",
245        PropertyMock(return_value=EmailBackend),
246    )
247    def test_challenge_generation(self):
248        """Test challenge generation"""
249        # Test with masked email
250        self.device.delete()
251        response = self.client.get(
252            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
253        )
254        self.assertStageResponse(
255            response,
256            self.flow,
257            self.user,
258            component="ak-stage-authenticator-email",
259            email_required=False,
260        )
261        masked_email = mask_email(self.user.email)
262        self.assertEqual(masked_email, response.json()["email"])
263        self.client.logout()
264
265        # Test without email
266        self.client.force_login(self.user_noemail)
267        response = self.client.get(
268            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
269        )
270        self.assertStageResponse(
271            response,
272            self.flow,
273            self.user_noemail,
274            component="ak-stage-authenticator-email",
275            email_required=True,
276        )
277        self.assertIsNone(response.json()["email"])
278
279    @patch(
280        "authentik.stages.authenticator_email.models.AuthenticatorEmailStage.backend_class",
281        PropertyMock(return_value=EmailBackend),
282    )
283    def test_session_management(self):
284        """Test session device management"""
285        # Test device creation in session
286        # Delete any existing devices for this test
287        EmailDevice.objects.filter(user=self.user).delete()
288
289        response = self.client.get(
290            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
291        )
292        device = self.get_flow_plan().context[PLAN_CONTEXT_EMAIL_DEVICE]
293        self.assertIsInstance(device, EmailDevice)
294        self.assertFalse(device.confirmed)
295        self.assertEqual(device.user, self.user)
296
297        # Test device confirmation and cleanup
298        device.confirmed = True
299        device.email = "new_test@authentik.local"  # Use a different email
300        response = self.client.post(
301            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
302            data={"component": "ak-stage-authenticator-email", "code": device.token},
303        )
304        self.assertEqual(response.status_code, 200)
305
306    def test_model_properties_and_methods(self):
307        """Test model properties"""
308        device = self.device
309        stage = self.stage
310
311        self.assertEqual(stage.serializer, AuthenticatorEmailStageSerializer)
312        self.assertIsInstance(stage.backend, SMTPEmailBackend)
313        self.assertEqual(device.serializer, EmailDeviceSerializer)
314
315        # Test AuthenticatorEmailStage send method
316        self.device.generate_token()
317        # Test EmailDevice _compose_email method
318        message = self.device._compose_email()
319        self.assertIsInstance(message, TemplateEmailMessage)
320        self.assertEqual(message.subject, self.stage.subject)
321        self.assertEqual(message.to, [f"{self.user.name} <{self.device.email}>"])
322        self.assertTrue(self.device.token in message.body)
323
324    @patch(
325        "authentik.stages.authenticator_email.models.AuthenticatorEmailStage.backend_class",
326        PropertyMock(return_value=EmailBackend),
327    )
328    def test_email_tasks(self):
329        # Test AuthenticatorEmailStage send method
330        self.stage.send(self.device)
331        self.assertEqual(len(mail.outbox), 1)
class TestAuthenticatorEmailStage(authentik.flows.tests.FlowTestCase):
 29class TestAuthenticatorEmailStage(FlowTestCase):
 30    """Test Email Authenticator stage"""
 31
 32    def setUp(self):
 33        super().setUp()
 34        self.flow = create_test_flow()
 35        self.user = create_test_user()
 36        self.user_noemail = create_test_user(email="")
 37        self.stage = AuthenticatorEmailStage.objects.create(
 38            name="email-authenticator",
 39            use_global_settings=True,
 40            from_address="test@authentik.local",
 41            configure_flow=self.flow,
 42            token_expiry="minutes=30",
 43        )  # nosec
 44        self.binding = FlowStageBinding.objects.create(target=self.flow, stage=self.stage, order=0)
 45        self.device = EmailDevice.objects.create(
 46            user=self.user,
 47            stage=self.stage,
 48            email="test@authentik.local",
 49        )
 50        self.client.force_login(self.user)
 51
 52    def test_device_str(self):
 53        """Test string representation of device"""
 54        self.assertEqual(str(self.device), f"Email Device for {self.user.pk}")
 55        # Test unsaved device
 56        unsaved_device = EmailDevice(
 57            user=self.user,
 58            stage=self.stage,
 59            email="test@authentik.local",
 60        )
 61        self.assertEqual(str(unsaved_device), "New Email Device")
 62
 63    def test_stage_str(self):
 64        """Test string representation of stage"""
 65        self.assertEqual(str(self.stage), f"Email Authenticator Stage {self.stage.name}")
 66
 67    def test_token_lifecycle(self):
 68        """Test token generation, validation and expiry"""
 69        # Initially no token
 70        self.assertIsNone(self.device.token)
 71
 72        # Generate token
 73        self.device.generate_token()
 74        token = self.device.token
 75        self.assertIsNotNone(token)
 76        self.assertIsNotNone(self.device.valid_until)
 77        self.assertTrue(self.device.valid_until > now())
 78
 79        # Verify invalid token
 80        self.assertFalse(self.device.verify_token("000000"))
 81
 82        # Verify correct token (should clear token after verification)
 83        self.assertTrue(self.device.verify_token(token))
 84        self.assertIsNone(self.device.token)
 85
 86    @patch(
 87        "authentik.stages.authenticator_email.models.AuthenticatorEmailStage.backend_class",
 88        PropertyMock(return_value=EmailBackend),
 89    )
 90    def test_stage_no_prefill(self):
 91        """Test stage without prefilled email"""
 92        self.client.force_login(self.user_noemail)
 93        response = self.client.get(
 94            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
 95        )
 96        self.assertStageResponse(
 97            response,
 98            self.flow,
 99            self.user_noemail,
100            component="ak-stage-authenticator-email",
101            email_required=True,
102        )
103
104    @patch(
105        "authentik.stages.authenticator_email.models.AuthenticatorEmailStage.backend_class",
106        PropertyMock(return_value=EmailBackend),
107    )
108    def test_stage_submit(self):
109        """Test stage email submission"""
110        # test fail because of existing device
111        response = self.client.get(
112            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
113        )
114        self.assertStageResponse(
115            response,
116            self.flow,
117            self.user,
118            component="ak-stage-access-denied",
119        )
120        self.device.delete()
121        # Initialize the flow
122        response = self.client.get(
123            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
124        )
125        self.assertStageResponse(
126            response,
127            self.flow,
128            self.user,
129            component="ak-stage-authenticator-email",
130            email_required=False,
131        )
132
133        response = self.client.post(
134            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
135            data={"component": "ak-stage-authenticator-email", "email": "test@example.com"},
136        )
137        self.assertEqual(response.status_code, 200)
138        self.assertEqual(len(mail.outbox), 2)
139        sent_mail = mail.outbox[1]
140        self.assertEqual(sent_mail.subject, self.stage.subject)
141        self.assertEqual(sent_mail.to, [f"{self.user} <test@example.com>"])
142        # Get from_address from global email config to test if global settings are being used
143        from_address_global = CONFIG.get("email.from")
144        self.assertEqual(sent_mail.from_email, from_address_global)
145
146        self.assertStageResponse(
147            response,
148            self.flow,
149            self.user,
150            component="ak-stage-authenticator-email",
151            response_errors={},
152            email_required=False,
153        )
154
155    def test_email_template(self):
156        """Test email template rendering"""
157        self.device.generate_token()
158        message = self.device._compose_email()
159
160        self.assertIsInstance(message, TemplateEmailMessage)
161        self.assertEqual(message.subject, self.stage.subject)
162        self.assertEqual(message.to, [f"{self.user.name} <{self.device.email}>"])
163        self.assertTrue(self.device.token in message.body)
164
165    def test_duplicate_email(self):
166        """Test attempting to use same email twice"""
167        email = "test2@authentik.local"
168        # First device
169        EmailDevice.objects.create(
170            user=self.user,
171            stage=self.stage,
172            email=email,
173        )
174        # Attempt to create second device with same email
175        with self.assertRaises(IntegrityError):
176            EmailDevice.objects.create(
177                user=self.user,
178                stage=self.stage,
179                email=email,
180            )
181
182    def test_token_expiry(self):
183        """Test token expiration behavior"""
184        self.device.generate_token()
185        token = self.device.token
186        # Set token as expired
187        self.device.valid_until = now() - timedelta(minutes=1)
188        self.device.save()
189        # Verify expired token fails
190        self.assertFalse(self.device.verify_token(token))
191
192    def test_template_errors(self):
193        """Test handling of template errors"""
194        self.stage.template = "{% invalid template %}"
195        with self.assertRaises(TemplateDoesNotExist):
196            self.stage.send(self.device)
197
198    @patch(
199        "authentik.stages.authenticator_email.models.AuthenticatorEmailStage.backend_class",
200        PropertyMock(return_value=EmailBackend),
201    )
202    def test_challenge_response_validation(self):
203        """Test challenge response validation"""
204        # Initialize the flow
205        self.client.force_login(self.user_noemail)
206        response = self.client.get(
207            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
208        )
209
210        # Test missing code and email
211        response = self.client.post(
212            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
213            data={"component": "ak-stage-authenticator-email"},
214        )
215        self.assertStageResponse(
216            response,
217            self.flow,
218            response_errors={"non_field_errors": [{"code": "invalid", "string": "email required"}]},
219        )
220
221        # Test invalid code
222        response = self.client.post(
223            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
224            data={"component": "ak-stage-authenticator-email", "code": "000000"},
225        )
226        self.assertStageResponse(
227            response,
228            self.flow,
229            response_errors={
230                "non_field_errors": [{"code": "invalid", "string": "Code does not match"}]
231            },
232        )
233
234        # Test valid code
235        device = self.device
236        token = device.token
237        response = self.client.post(
238            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
239            data={"component": "ak-stage-authenticator-email", "code": token},
240        )
241        self.assertEqual(response.status_code, 200)
242        self.assertTrue(device.confirmed)
243
244    @patch(
245        "authentik.stages.authenticator_email.models.AuthenticatorEmailStage.backend_class",
246        PropertyMock(return_value=EmailBackend),
247    )
248    def test_challenge_generation(self):
249        """Test challenge generation"""
250        # Test with masked email
251        self.device.delete()
252        response = self.client.get(
253            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
254        )
255        self.assertStageResponse(
256            response,
257            self.flow,
258            self.user,
259            component="ak-stage-authenticator-email",
260            email_required=False,
261        )
262        masked_email = mask_email(self.user.email)
263        self.assertEqual(masked_email, response.json()["email"])
264        self.client.logout()
265
266        # Test without email
267        self.client.force_login(self.user_noemail)
268        response = self.client.get(
269            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
270        )
271        self.assertStageResponse(
272            response,
273            self.flow,
274            self.user_noemail,
275            component="ak-stage-authenticator-email",
276            email_required=True,
277        )
278        self.assertIsNone(response.json()["email"])
279
280    @patch(
281        "authentik.stages.authenticator_email.models.AuthenticatorEmailStage.backend_class",
282        PropertyMock(return_value=EmailBackend),
283    )
284    def test_session_management(self):
285        """Test session device management"""
286        # Test device creation in session
287        # Delete any existing devices for this test
288        EmailDevice.objects.filter(user=self.user).delete()
289
290        response = self.client.get(
291            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
292        )
293        device = self.get_flow_plan().context[PLAN_CONTEXT_EMAIL_DEVICE]
294        self.assertIsInstance(device, EmailDevice)
295        self.assertFalse(device.confirmed)
296        self.assertEqual(device.user, self.user)
297
298        # Test device confirmation and cleanup
299        device.confirmed = True
300        device.email = "new_test@authentik.local"  # Use a different email
301        response = self.client.post(
302            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
303            data={"component": "ak-stage-authenticator-email", "code": device.token},
304        )
305        self.assertEqual(response.status_code, 200)
306
307    def test_model_properties_and_methods(self):
308        """Test model properties"""
309        device = self.device
310        stage = self.stage
311
312        self.assertEqual(stage.serializer, AuthenticatorEmailStageSerializer)
313        self.assertIsInstance(stage.backend, SMTPEmailBackend)
314        self.assertEqual(device.serializer, EmailDeviceSerializer)
315
316        # Test AuthenticatorEmailStage send method
317        self.device.generate_token()
318        # Test EmailDevice _compose_email method
319        message = self.device._compose_email()
320        self.assertIsInstance(message, TemplateEmailMessage)
321        self.assertEqual(message.subject, self.stage.subject)
322        self.assertEqual(message.to, [f"{self.user.name} <{self.device.email}>"])
323        self.assertTrue(self.device.token in message.body)
324
325    @patch(
326        "authentik.stages.authenticator_email.models.AuthenticatorEmailStage.backend_class",
327        PropertyMock(return_value=EmailBackend),
328    )
329    def test_email_tasks(self):
330        # Test AuthenticatorEmailStage send method
331        self.stage.send(self.device)
332        self.assertEqual(len(mail.outbox), 1)

Test Email Authenticator stage

def setUp(self):
32    def setUp(self):
33        super().setUp()
34        self.flow = create_test_flow()
35        self.user = create_test_user()
36        self.user_noemail = create_test_user(email="")
37        self.stage = AuthenticatorEmailStage.objects.create(
38            name="email-authenticator",
39            use_global_settings=True,
40            from_address="test@authentik.local",
41            configure_flow=self.flow,
42            token_expiry="minutes=30",
43        )  # nosec
44        self.binding = FlowStageBinding.objects.create(target=self.flow, stage=self.stage, order=0)
45        self.device = EmailDevice.objects.create(
46            user=self.user,
47            stage=self.stage,
48            email="test@authentik.local",
49        )
50        self.client.force_login(self.user)

Hook method for setting up the test fixture before exercising it.

def test_device_str(self):
52    def test_device_str(self):
53        """Test string representation of device"""
54        self.assertEqual(str(self.device), f"Email Device for {self.user.pk}")
55        # Test unsaved device
56        unsaved_device = EmailDevice(
57            user=self.user,
58            stage=self.stage,
59            email="test@authentik.local",
60        )
61        self.assertEqual(str(unsaved_device), "New Email Device")

Test string representation of device

def test_stage_str(self):
63    def test_stage_str(self):
64        """Test string representation of stage"""
65        self.assertEqual(str(self.stage), f"Email Authenticator Stage {self.stage.name}")

Test string representation of stage

def test_token_lifecycle(self):
67    def test_token_lifecycle(self):
68        """Test token generation, validation and expiry"""
69        # Initially no token
70        self.assertIsNone(self.device.token)
71
72        # Generate token
73        self.device.generate_token()
74        token = self.device.token
75        self.assertIsNotNone(token)
76        self.assertIsNotNone(self.device.valid_until)
77        self.assertTrue(self.device.valid_until > now())
78
79        # Verify invalid token
80        self.assertFalse(self.device.verify_token("000000"))
81
82        # Verify correct token (should clear token after verification)
83        self.assertTrue(self.device.verify_token(token))
84        self.assertIsNone(self.device.token)

Test token generation, validation and expiry

@patch('authentik.stages.authenticator_email.models.AuthenticatorEmailStage.backend_class', PropertyMock(return_value=EmailBackend))
def test_stage_no_prefill(self):
 86    @patch(
 87        "authentik.stages.authenticator_email.models.AuthenticatorEmailStage.backend_class",
 88        PropertyMock(return_value=EmailBackend),
 89    )
 90    def test_stage_no_prefill(self):
 91        """Test stage without prefilled email"""
 92        self.client.force_login(self.user_noemail)
 93        response = self.client.get(
 94            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
 95        )
 96        self.assertStageResponse(
 97            response,
 98            self.flow,
 99            self.user_noemail,
100            component="ak-stage-authenticator-email",
101            email_required=True,
102        )

Test stage without prefilled email

@patch('authentik.stages.authenticator_email.models.AuthenticatorEmailStage.backend_class', PropertyMock(return_value=EmailBackend))
def test_stage_submit(self):
104    @patch(
105        "authentik.stages.authenticator_email.models.AuthenticatorEmailStage.backend_class",
106        PropertyMock(return_value=EmailBackend),
107    )
108    def test_stage_submit(self):
109        """Test stage email submission"""
110        # test fail because of existing device
111        response = self.client.get(
112            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
113        )
114        self.assertStageResponse(
115            response,
116            self.flow,
117            self.user,
118            component="ak-stage-access-denied",
119        )
120        self.device.delete()
121        # Initialize the flow
122        response = self.client.get(
123            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
124        )
125        self.assertStageResponse(
126            response,
127            self.flow,
128            self.user,
129            component="ak-stage-authenticator-email",
130            email_required=False,
131        )
132
133        response = self.client.post(
134            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
135            data={"component": "ak-stage-authenticator-email", "email": "test@example.com"},
136        )
137        self.assertEqual(response.status_code, 200)
138        self.assertEqual(len(mail.outbox), 2)
139        sent_mail = mail.outbox[1]
140        self.assertEqual(sent_mail.subject, self.stage.subject)
141        self.assertEqual(sent_mail.to, [f"{self.user} <test@example.com>"])
142        # Get from_address from global email config to test if global settings are being used
143        from_address_global = CONFIG.get("email.from")
144        self.assertEqual(sent_mail.from_email, from_address_global)
145
146        self.assertStageResponse(
147            response,
148            self.flow,
149            self.user,
150            component="ak-stage-authenticator-email",
151            response_errors={},
152            email_required=False,
153        )

Test stage email submission

def test_email_template(self):
155    def test_email_template(self):
156        """Test email template rendering"""
157        self.device.generate_token()
158        message = self.device._compose_email()
159
160        self.assertIsInstance(message, TemplateEmailMessage)
161        self.assertEqual(message.subject, self.stage.subject)
162        self.assertEqual(message.to, [f"{self.user.name} <{self.device.email}>"])
163        self.assertTrue(self.device.token in message.body)

Test email template rendering

def test_duplicate_email(self):
165    def test_duplicate_email(self):
166        """Test attempting to use same email twice"""
167        email = "test2@authentik.local"
168        # First device
169        EmailDevice.objects.create(
170            user=self.user,
171            stage=self.stage,
172            email=email,
173        )
174        # Attempt to create second device with same email
175        with self.assertRaises(IntegrityError):
176            EmailDevice.objects.create(
177                user=self.user,
178                stage=self.stage,
179                email=email,
180            )

Test attempting to use same email twice

def test_token_expiry(self):
182    def test_token_expiry(self):
183        """Test token expiration behavior"""
184        self.device.generate_token()
185        token = self.device.token
186        # Set token as expired
187        self.device.valid_until = now() - timedelta(minutes=1)
188        self.device.save()
189        # Verify expired token fails
190        self.assertFalse(self.device.verify_token(token))

Test token expiration behavior

def test_template_errors(self):
192    def test_template_errors(self):
193        """Test handling of template errors"""
194        self.stage.template = "{% invalid template %}"
195        with self.assertRaises(TemplateDoesNotExist):
196            self.stage.send(self.device)

Test handling of template errors

@patch('authentik.stages.authenticator_email.models.AuthenticatorEmailStage.backend_class', PropertyMock(return_value=EmailBackend))
def test_challenge_response_validation(self):
198    @patch(
199        "authentik.stages.authenticator_email.models.AuthenticatorEmailStage.backend_class",
200        PropertyMock(return_value=EmailBackend),
201    )
202    def test_challenge_response_validation(self):
203        """Test challenge response validation"""
204        # Initialize the flow
205        self.client.force_login(self.user_noemail)
206        response = self.client.get(
207            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
208        )
209
210        # Test missing code and email
211        response = self.client.post(
212            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
213            data={"component": "ak-stage-authenticator-email"},
214        )
215        self.assertStageResponse(
216            response,
217            self.flow,
218            response_errors={"non_field_errors": [{"code": "invalid", "string": "email required"}]},
219        )
220
221        # Test invalid code
222        response = self.client.post(
223            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
224            data={"component": "ak-stage-authenticator-email", "code": "000000"},
225        )
226        self.assertStageResponse(
227            response,
228            self.flow,
229            response_errors={
230                "non_field_errors": [{"code": "invalid", "string": "Code does not match"}]
231            },
232        )
233
234        # Test valid code
235        device = self.device
236        token = device.token
237        response = self.client.post(
238            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
239            data={"component": "ak-stage-authenticator-email", "code": token},
240        )
241        self.assertEqual(response.status_code, 200)
242        self.assertTrue(device.confirmed)

Test challenge response validation

@patch('authentik.stages.authenticator_email.models.AuthenticatorEmailStage.backend_class', PropertyMock(return_value=EmailBackend))
def test_challenge_generation(self):
244    @patch(
245        "authentik.stages.authenticator_email.models.AuthenticatorEmailStage.backend_class",
246        PropertyMock(return_value=EmailBackend),
247    )
248    def test_challenge_generation(self):
249        """Test challenge generation"""
250        # Test with masked email
251        self.device.delete()
252        response = self.client.get(
253            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
254        )
255        self.assertStageResponse(
256            response,
257            self.flow,
258            self.user,
259            component="ak-stage-authenticator-email",
260            email_required=False,
261        )
262        masked_email = mask_email(self.user.email)
263        self.assertEqual(masked_email, response.json()["email"])
264        self.client.logout()
265
266        # Test without email
267        self.client.force_login(self.user_noemail)
268        response = self.client.get(
269            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
270        )
271        self.assertStageResponse(
272            response,
273            self.flow,
274            self.user_noemail,
275            component="ak-stage-authenticator-email",
276            email_required=True,
277        )
278        self.assertIsNone(response.json()["email"])

Test challenge generation

@patch('authentik.stages.authenticator_email.models.AuthenticatorEmailStage.backend_class', PropertyMock(return_value=EmailBackend))
def test_session_management(self):
280    @patch(
281        "authentik.stages.authenticator_email.models.AuthenticatorEmailStage.backend_class",
282        PropertyMock(return_value=EmailBackend),
283    )
284    def test_session_management(self):
285        """Test session device management"""
286        # Test device creation in session
287        # Delete any existing devices for this test
288        EmailDevice.objects.filter(user=self.user).delete()
289
290        response = self.client.get(
291            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
292        )
293        device = self.get_flow_plan().context[PLAN_CONTEXT_EMAIL_DEVICE]
294        self.assertIsInstance(device, EmailDevice)
295        self.assertFalse(device.confirmed)
296        self.assertEqual(device.user, self.user)
297
298        # Test device confirmation and cleanup
299        device.confirmed = True
300        device.email = "new_test@authentik.local"  # Use a different email
301        response = self.client.post(
302            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
303            data={"component": "ak-stage-authenticator-email", "code": device.token},
304        )
305        self.assertEqual(response.status_code, 200)

Test session device management

def test_model_properties_and_methods(self):
307    def test_model_properties_and_methods(self):
308        """Test model properties"""
309        device = self.device
310        stage = self.stage
311
312        self.assertEqual(stage.serializer, AuthenticatorEmailStageSerializer)
313        self.assertIsInstance(stage.backend, SMTPEmailBackend)
314        self.assertEqual(device.serializer, EmailDeviceSerializer)
315
316        # Test AuthenticatorEmailStage send method
317        self.device.generate_token()
318        # Test EmailDevice _compose_email method
319        message = self.device._compose_email()
320        self.assertIsInstance(message, TemplateEmailMessage)
321        self.assertEqual(message.subject, self.stage.subject)
322        self.assertEqual(message.to, [f"{self.user.name} <{self.device.email}>"])
323        self.assertTrue(self.device.token in message.body)

Test model properties

@patch('authentik.stages.authenticator_email.models.AuthenticatorEmailStage.backend_class', PropertyMock(return_value=EmailBackend))
def test_email_tasks(self):
325    @patch(
326        "authentik.stages.authenticator_email.models.AuthenticatorEmailStage.backend_class",
327        PropertyMock(return_value=EmailBackend),
328    )
329    def test_email_tasks(self):
330        # Test AuthenticatorEmailStage send method
331        self.stage.send(self.device)
332        self.assertEqual(len(mail.outbox), 1)