authentik.stages.authenticator_validate.tests.test_stage

Test validator stage

  1"""Test validator stage"""
  2
  3from unittest.mock import MagicMock, patch
  4
  5from django.test.client import RequestFactory
  6from django.urls.base import reverse
  7from django.utils.timezone import now
  8
  9from authentik.core.tests.utils import create_test_admin_user, create_test_flow
 10from authentik.flows.models import FlowDesignation, FlowStageBinding, NotConfiguredAction
 11from authentik.flows.planner import FlowPlan
 12from authentik.flows.tests import FlowTestCase
 13from authentik.flows.views.executor import SESSION_KEY_PLAN
 14from authentik.lib.generators import generate_id, generate_key
 15from authentik.stages.authenticator_duo.models import AuthenticatorDuoStage, DuoDevice
 16from authentik.stages.authenticator_static.models import AuthenticatorStaticStage
 17from authentik.stages.authenticator_totp.models import AuthenticatorTOTPStage, TOTPDigits
 18from authentik.stages.authenticator_validate.api import AuthenticatorValidateStageSerializer
 19from authentik.stages.authenticator_validate.models import AuthenticatorValidateStage, DeviceClasses
 20from authentik.stages.authenticator_validate.stage import PLAN_CONTEXT_DEVICE_CHALLENGES
 21from authentik.stages.identification.models import IdentificationStage, UserFields
 22
 23
 24class AuthenticatorValidateStageTests(FlowTestCase):
 25    """Test validator stage"""
 26
 27    def setUp(self) -> None:
 28        self.user = create_test_admin_user()
 29        self.request_factory = RequestFactory()
 30
 31    def test_not_configured_action(self):
 32        """Test not_configured_action"""
 33        ident_stage = IdentificationStage.objects.create(
 34            name=generate_id(),
 35            user_fields=[
 36                UserFields.USERNAME,
 37            ],
 38        )
 39        conf_stage = AuthenticatorStaticStage.objects.create(
 40            name=generate_id(),
 41        )
 42        stage = AuthenticatorValidateStage.objects.create(
 43            name=generate_id(),
 44            not_configured_action=NotConfiguredAction.CONFIGURE,
 45        )
 46        stage.configuration_stages.set([conf_stage])
 47        flow = create_test_flow()
 48        FlowStageBinding.objects.create(target=flow, stage=ident_stage, order=0)
 49        FlowStageBinding.objects.create(target=flow, stage=stage, order=1)
 50
 51        response = self.client.get(
 52            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
 53        )
 54        self.assertEqual(response.status_code, 200)
 55        response = self.client.post(
 56            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
 57            {"uid_field": self.user.username},
 58        )
 59        self.assertEqual(response.status_code, 302)
 60        response = self.client.get(
 61            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
 62            follow=True,
 63        )
 64        self.assertStageResponse(
 65            response,
 66            flow,
 67            component="ak-stage-authenticator-static",
 68        )
 69
 70    def test_not_configured_action_multiple(self):
 71        """Test not_configured_action"""
 72        ident_stage = IdentificationStage.objects.create(
 73            name=generate_id(),
 74            user_fields=[
 75                UserFields.USERNAME,
 76            ],
 77        )
 78        conf_stage = AuthenticatorStaticStage.objects.create(
 79            name=generate_id(),
 80        )
 81        conf_stage2 = AuthenticatorTOTPStage.objects.create(
 82            name=generate_id(), digits=TOTPDigits.SIX
 83        )
 84        stage = AuthenticatorValidateStage.objects.create(
 85            name=generate_id(),
 86            not_configured_action=NotConfiguredAction.CONFIGURE,
 87        )
 88        stage.configuration_stages.set([conf_stage, conf_stage2])
 89        flow = create_test_flow()
 90        FlowStageBinding.objects.create(target=flow, stage=ident_stage, order=0)
 91        FlowStageBinding.objects.create(target=flow, stage=stage, order=1)
 92
 93        # Get initial identification stage
 94        response = self.client.get(
 95            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
 96        )
 97        self.assertEqual(response.status_code, 200)
 98        # Answer initial identification stage
 99        response = self.client.post(
100            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
101            {"uid_field": self.user.username},
102        )
103        self.assertEqual(response.status_code, 302)
104        # Get list of all configuration stages
105        response = self.client.get(
106            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
107        )
108        self.assertEqual(response.status_code, 200)
109        # Select stage
110        response = self.client.post(
111            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
112            {"selected_stage": conf_stage.pk},
113        )
114        self.assertEqual(response.status_code, 302)
115        # get actual identification stage response
116        response = self.client.get(
117            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
118        )
119        self.assertEqual(response.status_code, 200)
120        self.assertStageResponse(
121            response,
122            flow,
123            component="ak-stage-authenticator-static",
124        )
125
126    def test_stage_validation(self):
127        """Test serializer validation"""
128        self.client.force_login(self.user)
129        serializer = AuthenticatorValidateStageSerializer(
130            data={
131                "name": generate_id(),
132                "not_configured_action": NotConfiguredAction.CONFIGURE,
133            }
134        )
135        self.assertFalse(serializer.is_valid())
136        self.assertIn("not_configured_action", serializer.errors)
137        serializer = AuthenticatorValidateStageSerializer(
138            data={"name": generate_id(), "not_configured_action": NotConfiguredAction.DENY}
139        )
140        self.assertTrue(serializer.is_valid())
141
142    def test_validate_selected_challenge(self):
143        """Test validate_selected_challenge"""
144        flow = create_test_flow()
145        stage = AuthenticatorValidateStage.objects.create(
146            name=generate_id(),
147            not_configured_action=NotConfiguredAction.CONFIGURE,
148            device_classes=[DeviceClasses.STATIC, DeviceClasses.TOTP],
149        )
150
151        session = self.client.session
152        plan = FlowPlan(flow_pk=flow.pk.hex)
153        plan.append_stage(stage)
154        plan.context[PLAN_CONTEXT_DEVICE_CHALLENGES] = [
155            {
156                "device_class": DeviceClasses.STATIC,
157                "device_uid": "1",
158                "challenge": {},
159                "last_used": now(),
160            },
161            {
162                "device_class": DeviceClasses.TOTP,
163                "device_uid": "2",
164                "challenge": {},
165                "last_used": now(),
166            },
167        ]
168        session[SESSION_KEY_PLAN] = plan
169        session.save()
170
171        response = self.client.post(
172            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
173            data={
174                "selected_challenge": {
175                    "device_class": DeviceClasses.WEBAUTHN,
176                    "device_uid": "quox",
177                    "challenge": {},
178                    "last_used": None,
179                }
180            },
181        )
182        self.assertStageResponse(
183            response,
184            flow,
185            response_errors={
186                "selected_challenge": [{"string": "invalid challenge selected", "code": "invalid"}]
187            },
188            component="ak-stage-authenticator-validate",
189        )
190
191        response = self.client.post(
192            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
193            data={
194                "selected_challenge": {
195                    "device_class": "static",
196                    "device_uid": "1",
197                    "challenge": {},
198                    "last_used": None,
199                },
200            },
201        )
202        self.assertStageResponse(
203            response,
204            flow,
205            response_errors={"non_field_errors": [{"string": "Empty response", "code": "invalid"}]},
206            component="ak-stage-authenticator-validate",
207        )
208
209    @patch(
210        "authentik.stages.authenticator_duo.models.AuthenticatorDuoStage.auth_client",
211        MagicMock(
212            return_value=MagicMock(
213                auth=MagicMock(
214                    return_value={
215                        "result": "allow",
216                        "status": "allow",
217                        "status_msg": "Success. Logging you in...",
218                    }
219                )
220            )
221        ),
222    )
223    def test_non_authentication_flow(self):
224        """Test full in an authorization flow (no pending user)"""
225        self.client.force_login(self.user)
226        duo_stage = AuthenticatorDuoStage.objects.create(
227            name=generate_id(),
228            client_id=generate_id(),
229            client_secret=generate_key(),
230            api_hostname="",
231        )
232        duo_device = DuoDevice.objects.create(
233            user=self.user,
234            stage=duo_stage,
235        )
236
237        flow = create_test_flow(FlowDesignation.AUTHORIZATION)
238        stage = AuthenticatorValidateStage.objects.create(
239            name=generate_id(),
240            device_classes=[DeviceClasses.DUO],
241        )
242
243        plan = FlowPlan(flow_pk=flow.pk.hex)
244        plan.append(FlowStageBinding.objects.create(target=flow, stage=stage, order=2))
245        session = self.client.session
246        session[SESSION_KEY_PLAN] = plan
247        session.save()
248
249        response = self.client.get(
250            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
251        )
252        self.assertEqual(response.status_code, 200)
253
254        response = self.client.post(
255            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
256            {"duo": duo_device.pk},
257            follow=True,
258        )
259
260        self.assertEqual(response.status_code, 200)
261        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
class AuthenticatorValidateStageTests(authentik.flows.tests.FlowTestCase):
 25class AuthenticatorValidateStageTests(FlowTestCase):
 26    """Test validator stage"""
 27
 28    def setUp(self) -> None:
 29        self.user = create_test_admin_user()
 30        self.request_factory = RequestFactory()
 31
 32    def test_not_configured_action(self):
 33        """Test not_configured_action"""
 34        ident_stage = IdentificationStage.objects.create(
 35            name=generate_id(),
 36            user_fields=[
 37                UserFields.USERNAME,
 38            ],
 39        )
 40        conf_stage = AuthenticatorStaticStage.objects.create(
 41            name=generate_id(),
 42        )
 43        stage = AuthenticatorValidateStage.objects.create(
 44            name=generate_id(),
 45            not_configured_action=NotConfiguredAction.CONFIGURE,
 46        )
 47        stage.configuration_stages.set([conf_stage])
 48        flow = create_test_flow()
 49        FlowStageBinding.objects.create(target=flow, stage=ident_stage, order=0)
 50        FlowStageBinding.objects.create(target=flow, stage=stage, order=1)
 51
 52        response = self.client.get(
 53            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
 54        )
 55        self.assertEqual(response.status_code, 200)
 56        response = self.client.post(
 57            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
 58            {"uid_field": self.user.username},
 59        )
 60        self.assertEqual(response.status_code, 302)
 61        response = self.client.get(
 62            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
 63            follow=True,
 64        )
 65        self.assertStageResponse(
 66            response,
 67            flow,
 68            component="ak-stage-authenticator-static",
 69        )
 70
 71    def test_not_configured_action_multiple(self):
 72        """Test not_configured_action"""
 73        ident_stage = IdentificationStage.objects.create(
 74            name=generate_id(),
 75            user_fields=[
 76                UserFields.USERNAME,
 77            ],
 78        )
 79        conf_stage = AuthenticatorStaticStage.objects.create(
 80            name=generate_id(),
 81        )
 82        conf_stage2 = AuthenticatorTOTPStage.objects.create(
 83            name=generate_id(), digits=TOTPDigits.SIX
 84        )
 85        stage = AuthenticatorValidateStage.objects.create(
 86            name=generate_id(),
 87            not_configured_action=NotConfiguredAction.CONFIGURE,
 88        )
 89        stage.configuration_stages.set([conf_stage, conf_stage2])
 90        flow = create_test_flow()
 91        FlowStageBinding.objects.create(target=flow, stage=ident_stage, order=0)
 92        FlowStageBinding.objects.create(target=flow, stage=stage, order=1)
 93
 94        # Get initial identification stage
 95        response = self.client.get(
 96            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
 97        )
 98        self.assertEqual(response.status_code, 200)
 99        # Answer initial identification stage
100        response = self.client.post(
101            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
102            {"uid_field": self.user.username},
103        )
104        self.assertEqual(response.status_code, 302)
105        # Get list of all configuration stages
106        response = self.client.get(
107            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
108        )
109        self.assertEqual(response.status_code, 200)
110        # Select stage
111        response = self.client.post(
112            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
113            {"selected_stage": conf_stage.pk},
114        )
115        self.assertEqual(response.status_code, 302)
116        # get actual identification stage response
117        response = self.client.get(
118            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
119        )
120        self.assertEqual(response.status_code, 200)
121        self.assertStageResponse(
122            response,
123            flow,
124            component="ak-stage-authenticator-static",
125        )
126
127    def test_stage_validation(self):
128        """Test serializer validation"""
129        self.client.force_login(self.user)
130        serializer = AuthenticatorValidateStageSerializer(
131            data={
132                "name": generate_id(),
133                "not_configured_action": NotConfiguredAction.CONFIGURE,
134            }
135        )
136        self.assertFalse(serializer.is_valid())
137        self.assertIn("not_configured_action", serializer.errors)
138        serializer = AuthenticatorValidateStageSerializer(
139            data={"name": generate_id(), "not_configured_action": NotConfiguredAction.DENY}
140        )
141        self.assertTrue(serializer.is_valid())
142
143    def test_validate_selected_challenge(self):
144        """Test validate_selected_challenge"""
145        flow = create_test_flow()
146        stage = AuthenticatorValidateStage.objects.create(
147            name=generate_id(),
148            not_configured_action=NotConfiguredAction.CONFIGURE,
149            device_classes=[DeviceClasses.STATIC, DeviceClasses.TOTP],
150        )
151
152        session = self.client.session
153        plan = FlowPlan(flow_pk=flow.pk.hex)
154        plan.append_stage(stage)
155        plan.context[PLAN_CONTEXT_DEVICE_CHALLENGES] = [
156            {
157                "device_class": DeviceClasses.STATIC,
158                "device_uid": "1",
159                "challenge": {},
160                "last_used": now(),
161            },
162            {
163                "device_class": DeviceClasses.TOTP,
164                "device_uid": "2",
165                "challenge": {},
166                "last_used": now(),
167            },
168        ]
169        session[SESSION_KEY_PLAN] = plan
170        session.save()
171
172        response = self.client.post(
173            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
174            data={
175                "selected_challenge": {
176                    "device_class": DeviceClasses.WEBAUTHN,
177                    "device_uid": "quox",
178                    "challenge": {},
179                    "last_used": None,
180                }
181            },
182        )
183        self.assertStageResponse(
184            response,
185            flow,
186            response_errors={
187                "selected_challenge": [{"string": "invalid challenge selected", "code": "invalid"}]
188            },
189            component="ak-stage-authenticator-validate",
190        )
191
192        response = self.client.post(
193            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
194            data={
195                "selected_challenge": {
196                    "device_class": "static",
197                    "device_uid": "1",
198                    "challenge": {},
199                    "last_used": None,
200                },
201            },
202        )
203        self.assertStageResponse(
204            response,
205            flow,
206            response_errors={"non_field_errors": [{"string": "Empty response", "code": "invalid"}]},
207            component="ak-stage-authenticator-validate",
208        )
209
210    @patch(
211        "authentik.stages.authenticator_duo.models.AuthenticatorDuoStage.auth_client",
212        MagicMock(
213            return_value=MagicMock(
214                auth=MagicMock(
215                    return_value={
216                        "result": "allow",
217                        "status": "allow",
218                        "status_msg": "Success. Logging you in...",
219                    }
220                )
221            )
222        ),
223    )
224    def test_non_authentication_flow(self):
225        """Test full in an authorization flow (no pending user)"""
226        self.client.force_login(self.user)
227        duo_stage = AuthenticatorDuoStage.objects.create(
228            name=generate_id(),
229            client_id=generate_id(),
230            client_secret=generate_key(),
231            api_hostname="",
232        )
233        duo_device = DuoDevice.objects.create(
234            user=self.user,
235            stage=duo_stage,
236        )
237
238        flow = create_test_flow(FlowDesignation.AUTHORIZATION)
239        stage = AuthenticatorValidateStage.objects.create(
240            name=generate_id(),
241            device_classes=[DeviceClasses.DUO],
242        )
243
244        plan = FlowPlan(flow_pk=flow.pk.hex)
245        plan.append(FlowStageBinding.objects.create(target=flow, stage=stage, order=2))
246        session = self.client.session
247        session[SESSION_KEY_PLAN] = plan
248        session.save()
249
250        response = self.client.get(
251            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
252        )
253        self.assertEqual(response.status_code, 200)
254
255        response = self.client.post(
256            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
257            {"duo": duo_device.pk},
258            follow=True,
259        )
260
261        self.assertEqual(response.status_code, 200)
262        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))

Test validator stage

def setUp(self) -> None:
28    def setUp(self) -> None:
29        self.user = create_test_admin_user()
30        self.request_factory = RequestFactory()

Hook method for setting up the test fixture before exercising it.

def test_not_configured_action(self):
32    def test_not_configured_action(self):
33        """Test not_configured_action"""
34        ident_stage = IdentificationStage.objects.create(
35            name=generate_id(),
36            user_fields=[
37                UserFields.USERNAME,
38            ],
39        )
40        conf_stage = AuthenticatorStaticStage.objects.create(
41            name=generate_id(),
42        )
43        stage = AuthenticatorValidateStage.objects.create(
44            name=generate_id(),
45            not_configured_action=NotConfiguredAction.CONFIGURE,
46        )
47        stage.configuration_stages.set([conf_stage])
48        flow = create_test_flow()
49        FlowStageBinding.objects.create(target=flow, stage=ident_stage, order=0)
50        FlowStageBinding.objects.create(target=flow, stage=stage, order=1)
51
52        response = self.client.get(
53            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
54        )
55        self.assertEqual(response.status_code, 200)
56        response = self.client.post(
57            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
58            {"uid_field": self.user.username},
59        )
60        self.assertEqual(response.status_code, 302)
61        response = self.client.get(
62            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
63            follow=True,
64        )
65        self.assertStageResponse(
66            response,
67            flow,
68            component="ak-stage-authenticator-static",
69        )

Test not_configured_action

def test_not_configured_action_multiple(self):
 71    def test_not_configured_action_multiple(self):
 72        """Test not_configured_action"""
 73        ident_stage = IdentificationStage.objects.create(
 74            name=generate_id(),
 75            user_fields=[
 76                UserFields.USERNAME,
 77            ],
 78        )
 79        conf_stage = AuthenticatorStaticStage.objects.create(
 80            name=generate_id(),
 81        )
 82        conf_stage2 = AuthenticatorTOTPStage.objects.create(
 83            name=generate_id(), digits=TOTPDigits.SIX
 84        )
 85        stage = AuthenticatorValidateStage.objects.create(
 86            name=generate_id(),
 87            not_configured_action=NotConfiguredAction.CONFIGURE,
 88        )
 89        stage.configuration_stages.set([conf_stage, conf_stage2])
 90        flow = create_test_flow()
 91        FlowStageBinding.objects.create(target=flow, stage=ident_stage, order=0)
 92        FlowStageBinding.objects.create(target=flow, stage=stage, order=1)
 93
 94        # Get initial identification stage
 95        response = self.client.get(
 96            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
 97        )
 98        self.assertEqual(response.status_code, 200)
 99        # Answer initial identification stage
100        response = self.client.post(
101            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
102            {"uid_field": self.user.username},
103        )
104        self.assertEqual(response.status_code, 302)
105        # Get list of all configuration stages
106        response = self.client.get(
107            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
108        )
109        self.assertEqual(response.status_code, 200)
110        # Select stage
111        response = self.client.post(
112            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
113            {"selected_stage": conf_stage.pk},
114        )
115        self.assertEqual(response.status_code, 302)
116        # get actual identification stage response
117        response = self.client.get(
118            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
119        )
120        self.assertEqual(response.status_code, 200)
121        self.assertStageResponse(
122            response,
123            flow,
124            component="ak-stage-authenticator-static",
125        )

Test not_configured_action

def test_stage_validation(self):
127    def test_stage_validation(self):
128        """Test serializer validation"""
129        self.client.force_login(self.user)
130        serializer = AuthenticatorValidateStageSerializer(
131            data={
132                "name": generate_id(),
133                "not_configured_action": NotConfiguredAction.CONFIGURE,
134            }
135        )
136        self.assertFalse(serializer.is_valid())
137        self.assertIn("not_configured_action", serializer.errors)
138        serializer = AuthenticatorValidateStageSerializer(
139            data={"name": generate_id(), "not_configured_action": NotConfiguredAction.DENY}
140        )
141        self.assertTrue(serializer.is_valid())

Test serializer validation

def test_validate_selected_challenge(self):
143    def test_validate_selected_challenge(self):
144        """Test validate_selected_challenge"""
145        flow = create_test_flow()
146        stage = AuthenticatorValidateStage.objects.create(
147            name=generate_id(),
148            not_configured_action=NotConfiguredAction.CONFIGURE,
149            device_classes=[DeviceClasses.STATIC, DeviceClasses.TOTP],
150        )
151
152        session = self.client.session
153        plan = FlowPlan(flow_pk=flow.pk.hex)
154        plan.append_stage(stage)
155        plan.context[PLAN_CONTEXT_DEVICE_CHALLENGES] = [
156            {
157                "device_class": DeviceClasses.STATIC,
158                "device_uid": "1",
159                "challenge": {},
160                "last_used": now(),
161            },
162            {
163                "device_class": DeviceClasses.TOTP,
164                "device_uid": "2",
165                "challenge": {},
166                "last_used": now(),
167            },
168        ]
169        session[SESSION_KEY_PLAN] = plan
170        session.save()
171
172        response = self.client.post(
173            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
174            data={
175                "selected_challenge": {
176                    "device_class": DeviceClasses.WEBAUTHN,
177                    "device_uid": "quox",
178                    "challenge": {},
179                    "last_used": None,
180                }
181            },
182        )
183        self.assertStageResponse(
184            response,
185            flow,
186            response_errors={
187                "selected_challenge": [{"string": "invalid challenge selected", "code": "invalid"}]
188            },
189            component="ak-stage-authenticator-validate",
190        )
191
192        response = self.client.post(
193            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
194            data={
195                "selected_challenge": {
196                    "device_class": "static",
197                    "device_uid": "1",
198                    "challenge": {},
199                    "last_used": None,
200                },
201            },
202        )
203        self.assertStageResponse(
204            response,
205            flow,
206            response_errors={"non_field_errors": [{"string": "Empty response", "code": "invalid"}]},
207            component="ak-stage-authenticator-validate",
208        )

Test validate_selected_challenge

@patch('authentik.stages.authenticator_duo.models.AuthenticatorDuoStage.auth_client', MagicMock(return_value=MagicMock(auth=MagicMock(return_value={'result': 'allow', 'status': 'allow', 'status_msg': 'Success. Logging you in...'}))))
def test_non_authentication_flow(self):
210    @patch(
211        "authentik.stages.authenticator_duo.models.AuthenticatorDuoStage.auth_client",
212        MagicMock(
213            return_value=MagicMock(
214                auth=MagicMock(
215                    return_value={
216                        "result": "allow",
217                        "status": "allow",
218                        "status_msg": "Success. Logging you in...",
219                    }
220                )
221            )
222        ),
223    )
224    def test_non_authentication_flow(self):
225        """Test full in an authorization flow (no pending user)"""
226        self.client.force_login(self.user)
227        duo_stage = AuthenticatorDuoStage.objects.create(
228            name=generate_id(),
229            client_id=generate_id(),
230            client_secret=generate_key(),
231            api_hostname="",
232        )
233        duo_device = DuoDevice.objects.create(
234            user=self.user,
235            stage=duo_stage,
236        )
237
238        flow = create_test_flow(FlowDesignation.AUTHORIZATION)
239        stage = AuthenticatorValidateStage.objects.create(
240            name=generate_id(),
241            device_classes=[DeviceClasses.DUO],
242        )
243
244        plan = FlowPlan(flow_pk=flow.pk.hex)
245        plan.append(FlowStageBinding.objects.create(target=flow, stage=stage, order=2))
246        session = self.client.session
247        session[SESSION_KEY_PLAN] = plan
248        session.save()
249
250        response = self.client.get(
251            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
252        )
253        self.assertEqual(response.status_code, 200)
254
255        response = self.client.post(
256            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
257            {"duo": duo_device.pk},
258            follow=True,
259        )
260
261        self.assertEqual(response.status_code, 200)
262        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))

Test full in an authorization flow (no pending user)