authentik.stages.authenticator_duo.tests

Test duo stage

  1"""Test duo stage"""
  2
  3from unittest.mock import MagicMock, patch
  4from uuid import uuid4
  5
  6from django.test.client import RequestFactory
  7from django.urls import reverse
  8
  9from authentik.core.tests.utils import create_test_admin_user, create_test_flow
 10from authentik.flows.models import FlowStageBinding
 11from authentik.flows.tests import FlowTestCase
 12from authentik.lib.generators import generate_id
 13from authentik.stages.authenticator_duo.models import AuthenticatorDuoStage, DuoDevice
 14from authentik.stages.identification.models import IdentificationStage, UserFields
 15
 16
 17class AuthenticatorDuoStageTests(FlowTestCase):
 18    """Test duo stage"""
 19
 20    def setUp(self) -> None:
 21        self.user = create_test_admin_user()
 22        self.request_factory = RequestFactory()
 23
 24    def test_client(self):
 25        """Test Duo client setup"""
 26        stage = AuthenticatorDuoStage(
 27            name=generate_id(),
 28            client_id=generate_id(),
 29            client_secret=generate_id(),
 30            admin_integration_key=generate_id(),
 31            admin_secret_key=generate_id(),
 32            api_hostname=generate_id(),
 33        )
 34        self.assertEqual(stage.auth_client().ikey, stage.client_id)
 35        self.assertEqual(stage.admin_client().ikey, stage.admin_integration_key)
 36        stage.admin_integration_key = ""
 37        with self.assertRaises(ValueError):
 38            self.assertEqual(stage.admin_client().ikey, stage.admin_integration_key)
 39
 40    def test_api_enrollment_invalid(self):
 41        """Test `enrollment_status`"""
 42        self.client.force_login(self.user)
 43        response = self.client.post(
 44            reverse(
 45                "authentik_api:authenticatorduostage-enrollment-status",
 46                kwargs={
 47                    "pk": str(uuid4()),
 48                },
 49            )
 50        )
 51        self.assertEqual(response.status_code, 404)
 52
 53    def test_api_import_manual_invalid_username(self):
 54        """Test `import_device_manual`"""
 55        self.client.force_login(self.user)
 56        stage = AuthenticatorDuoStage.objects.create(
 57            name=generate_id(),
 58            client_id=generate_id(),
 59            client_secret=generate_id(),
 60            api_hostname=generate_id(),
 61        )
 62        response = self.client.post(
 63            reverse(
 64                "authentik_api:authenticatorduostage-import-device-manual",
 65                kwargs={
 66                    "pk": str(stage.pk),
 67                },
 68            ),
 69            data={
 70                "username": generate_id(),
 71            },
 72        )
 73        self.assertEqual(response.status_code, 400)
 74
 75    def test_api_import_manual_duplicate_device(self):
 76        """Test `import_device_manual`"""
 77        self.client.force_login(self.user)
 78        stage = AuthenticatorDuoStage.objects.create(
 79            name=generate_id(),
 80            client_id=generate_id(),
 81            client_secret=generate_id(),
 82            api_hostname=generate_id(),
 83        )
 84        device = DuoDevice.objects.create(
 85            name="foo",
 86            duo_user_id=generate_id(),
 87            user=self.user,
 88            stage=stage,
 89        )
 90        response = self.client.post(
 91            reverse(
 92                "authentik_api:authenticatorduostage-import-device-manual",
 93                kwargs={
 94                    "pk": str(stage.pk),
 95                },
 96            ),
 97            data={
 98                "username": self.user.username,
 99                "duo_user_id": device.duo_user_id,
100            },
101        )
102        self.assertEqual(response.status_code, 400)
103
104    def test_api_import_manual(self):
105        """Test `import_device_manual`"""
106        self.client.force_login(self.user)
107        stage = AuthenticatorDuoStage.objects.create(
108            name=generate_id(),
109            client_id=generate_id(),
110            client_secret=generate_id(),
111            api_hostname=generate_id(),
112        )
113        response = self.client.post(
114            reverse(
115                "authentik_api:authenticatorduostage-import-device-manual",
116                kwargs={
117                    "pk": str(stage.pk),
118                },
119            ),
120            data={
121                "username": self.user.username,
122                "duo_user_id": "foo",
123            },
124        )
125        self.assertEqual(response.status_code, 204)
126
127    def test_api_import_automatic_invalid(self):
128        """test `import_devices_automatic`"""
129        self.client.force_login(self.user)
130        stage = AuthenticatorDuoStage.objects.create(
131            name=generate_id(),
132            client_id=generate_id(),
133            client_secret=generate_id(),
134            api_hostname=generate_id(),
135        )
136
137        # Test missing admin credentials
138        response = self.client.post(
139            reverse(
140                "authentik_api:authenticatorduostage-import-devices-automatic",
141                kwargs={
142                    "pk": str(stage.pk),
143                },
144            ),
145        )
146        self.assertEqual(response.status_code, 400)
147
148        # Test internal error handling
149        stage.admin_integration_key = generate_id()
150        stage.admin_secret_key = generate_id()
151        stage.save()
152        with patch(
153            "duo_client.admin.Admin.get_users_iterator",
154            MagicMock(side_effect=RuntimeError("Duo API error")),
155        ):
156            response = self.client.post(
157                reverse(
158                    "authentik_api:authenticatorduostage-import-devices-automatic",
159                    kwargs={
160                        "pk": str(stage.pk),
161                    },
162                ),
163            )
164            self.assertEqual(response.status_code, 400)
165            self.assertJSONEqual(
166                response.content,
167                {
168                    "error": "An internal error occurred while importing devices.",
169                    "count": 0,
170                },
171            )
172
173    def test_api_import_automatic(self):
174        """test `import_devices_automatic`"""
175        self.client.force_login(self.user)
176        stage = AuthenticatorDuoStage.objects.create(
177            name=generate_id(),
178            client_id=generate_id(),
179            client_secret=generate_id(),
180            admin_integration_key=generate_id(),
181            admin_secret_key=generate_id(),
182            api_hostname=generate_id(),
183        )
184        device = DuoDevice.objects.create(
185            name="foo",
186            duo_user_id=generate_id(),
187            user=self.user,
188            stage=stage,
189        )
190        with patch(
191            "duo_client.admin.Admin.get_users_iterator",
192            MagicMock(
193                return_value=[
194                    {
195                        "user_id": "foo",
196                        "username": "bar",
197                    },
198                    {
199                        "user_id": device.duo_user_id,
200                        "username": self.user.username,
201                    },
202                    {
203                        "user_id": generate_id(),
204                        "username": self.user.username,
205                    },
206                ]
207            ),
208        ):
209            response = self.client.post(
210                reverse(
211                    "authentik_api:authenticatorduostage-import-devices-automatic",
212                    kwargs={
213                        "pk": str(stage.pk),
214                    },
215                ),
216            )
217            self.assertEqual(response.status_code, 200)
218            self.assertEqual(response.content.decode(), '{"error":"","count":1}')
219
220    def test_stage_enroll_basic(self):
221        """Test stage"""
222        conf_stage = IdentificationStage.objects.create(
223            name=generate_id(),
224            user_fields=[
225                UserFields.USERNAME,
226            ],
227        )
228        stage = AuthenticatorDuoStage.objects.create(
229            name=generate_id(),
230            client_id=generate_id(),
231            client_secret=generate_id(),
232            api_hostname=generate_id(),
233        )
234        flow = create_test_flow()
235        FlowStageBinding.objects.create(target=flow, stage=conf_stage, order=0)
236        FlowStageBinding.objects.create(target=flow, stage=stage, order=1)
237
238        response = self.client.post(
239            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
240            {"uid_field": self.user.username},
241        )
242        self.assertEqual(response.status_code, 302)
243
244        enroll_mock = MagicMock(
245            return_value={
246                "user_id": "foo",
247                "activation_barcode": "bar",
248                "activation_code": "bar",
249            }
250        )
251        with patch("duo_client.auth.Auth.enroll", enroll_mock):
252            response = self.client.get(
253                reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
254                follow=True,
255            )
256            self.assertStageResponse(
257                response,
258                flow,
259                component="ak-stage-authenticator-duo",
260                pending_user=self.user.username,
261                activation_barcode="bar",
262                activation_code="bar",
263            )
264
265            response = self.client.get(
266                reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
267                follow=True,
268            )
269            self.assertStageResponse(
270                response,
271                flow,
272                component="ak-stage-authenticator-duo",
273                pending_user=self.user.username,
274                activation_barcode="bar",
275                activation_code="bar",
276            )
277            self.assertEqual(enroll_mock.call_count, 1)
278
279            with patch("duo_client.auth.Auth.enroll_status", MagicMock(return_value="success")):
280                response = self.client.post(
281                    reverse(
282                        "authentik_api:authenticatorduostage-enrollment-status",
283                        kwargs={
284                            "pk": str(stage.pk),
285                        },
286                    )
287                )
288                self.assertEqual(response.status_code, 200)
289                self.assertJSONEqual(response.content, {"duo_response": "success"})
290
291                response = self.client.post(
292                    reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), {}
293                )
294                self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
class AuthenticatorDuoStageTests(authentik.flows.tests.FlowTestCase):
 18class AuthenticatorDuoStageTests(FlowTestCase):
 19    """Test duo stage"""
 20
 21    def setUp(self) -> None:
 22        self.user = create_test_admin_user()
 23        self.request_factory = RequestFactory()
 24
 25    def test_client(self):
 26        """Test Duo client setup"""
 27        stage = AuthenticatorDuoStage(
 28            name=generate_id(),
 29            client_id=generate_id(),
 30            client_secret=generate_id(),
 31            admin_integration_key=generate_id(),
 32            admin_secret_key=generate_id(),
 33            api_hostname=generate_id(),
 34        )
 35        self.assertEqual(stage.auth_client().ikey, stage.client_id)
 36        self.assertEqual(stage.admin_client().ikey, stage.admin_integration_key)
 37        stage.admin_integration_key = ""
 38        with self.assertRaises(ValueError):
 39            self.assertEqual(stage.admin_client().ikey, stage.admin_integration_key)
 40
 41    def test_api_enrollment_invalid(self):
 42        """Test `enrollment_status`"""
 43        self.client.force_login(self.user)
 44        response = self.client.post(
 45            reverse(
 46                "authentik_api:authenticatorduostage-enrollment-status",
 47                kwargs={
 48                    "pk": str(uuid4()),
 49                },
 50            )
 51        )
 52        self.assertEqual(response.status_code, 404)
 53
 54    def test_api_import_manual_invalid_username(self):
 55        """Test `import_device_manual`"""
 56        self.client.force_login(self.user)
 57        stage = AuthenticatorDuoStage.objects.create(
 58            name=generate_id(),
 59            client_id=generate_id(),
 60            client_secret=generate_id(),
 61            api_hostname=generate_id(),
 62        )
 63        response = self.client.post(
 64            reverse(
 65                "authentik_api:authenticatorduostage-import-device-manual",
 66                kwargs={
 67                    "pk": str(stage.pk),
 68                },
 69            ),
 70            data={
 71                "username": generate_id(),
 72            },
 73        )
 74        self.assertEqual(response.status_code, 400)
 75
 76    def test_api_import_manual_duplicate_device(self):
 77        """Test `import_device_manual`"""
 78        self.client.force_login(self.user)
 79        stage = AuthenticatorDuoStage.objects.create(
 80            name=generate_id(),
 81            client_id=generate_id(),
 82            client_secret=generate_id(),
 83            api_hostname=generate_id(),
 84        )
 85        device = DuoDevice.objects.create(
 86            name="foo",
 87            duo_user_id=generate_id(),
 88            user=self.user,
 89            stage=stage,
 90        )
 91        response = self.client.post(
 92            reverse(
 93                "authentik_api:authenticatorduostage-import-device-manual",
 94                kwargs={
 95                    "pk": str(stage.pk),
 96                },
 97            ),
 98            data={
 99                "username": self.user.username,
100                "duo_user_id": device.duo_user_id,
101            },
102        )
103        self.assertEqual(response.status_code, 400)
104
105    def test_api_import_manual(self):
106        """Test `import_device_manual`"""
107        self.client.force_login(self.user)
108        stage = AuthenticatorDuoStage.objects.create(
109            name=generate_id(),
110            client_id=generate_id(),
111            client_secret=generate_id(),
112            api_hostname=generate_id(),
113        )
114        response = self.client.post(
115            reverse(
116                "authentik_api:authenticatorduostage-import-device-manual",
117                kwargs={
118                    "pk": str(stage.pk),
119                },
120            ),
121            data={
122                "username": self.user.username,
123                "duo_user_id": "foo",
124            },
125        )
126        self.assertEqual(response.status_code, 204)
127
128    def test_api_import_automatic_invalid(self):
129        """test `import_devices_automatic`"""
130        self.client.force_login(self.user)
131        stage = AuthenticatorDuoStage.objects.create(
132            name=generate_id(),
133            client_id=generate_id(),
134            client_secret=generate_id(),
135            api_hostname=generate_id(),
136        )
137
138        # Test missing admin credentials
139        response = self.client.post(
140            reverse(
141                "authentik_api:authenticatorduostage-import-devices-automatic",
142                kwargs={
143                    "pk": str(stage.pk),
144                },
145            ),
146        )
147        self.assertEqual(response.status_code, 400)
148
149        # Test internal error handling
150        stage.admin_integration_key = generate_id()
151        stage.admin_secret_key = generate_id()
152        stage.save()
153        with patch(
154            "duo_client.admin.Admin.get_users_iterator",
155            MagicMock(side_effect=RuntimeError("Duo API error")),
156        ):
157            response = self.client.post(
158                reverse(
159                    "authentik_api:authenticatorduostage-import-devices-automatic",
160                    kwargs={
161                        "pk": str(stage.pk),
162                    },
163                ),
164            )
165            self.assertEqual(response.status_code, 400)
166            self.assertJSONEqual(
167                response.content,
168                {
169                    "error": "An internal error occurred while importing devices.",
170                    "count": 0,
171                },
172            )
173
174    def test_api_import_automatic(self):
175        """test `import_devices_automatic`"""
176        self.client.force_login(self.user)
177        stage = AuthenticatorDuoStage.objects.create(
178            name=generate_id(),
179            client_id=generate_id(),
180            client_secret=generate_id(),
181            admin_integration_key=generate_id(),
182            admin_secret_key=generate_id(),
183            api_hostname=generate_id(),
184        )
185        device = DuoDevice.objects.create(
186            name="foo",
187            duo_user_id=generate_id(),
188            user=self.user,
189            stage=stage,
190        )
191        with patch(
192            "duo_client.admin.Admin.get_users_iterator",
193            MagicMock(
194                return_value=[
195                    {
196                        "user_id": "foo",
197                        "username": "bar",
198                    },
199                    {
200                        "user_id": device.duo_user_id,
201                        "username": self.user.username,
202                    },
203                    {
204                        "user_id": generate_id(),
205                        "username": self.user.username,
206                    },
207                ]
208            ),
209        ):
210            response = self.client.post(
211                reverse(
212                    "authentik_api:authenticatorduostage-import-devices-automatic",
213                    kwargs={
214                        "pk": str(stage.pk),
215                    },
216                ),
217            )
218            self.assertEqual(response.status_code, 200)
219            self.assertEqual(response.content.decode(), '{"error":"","count":1}')
220
221    def test_stage_enroll_basic(self):
222        """Test stage"""
223        conf_stage = IdentificationStage.objects.create(
224            name=generate_id(),
225            user_fields=[
226                UserFields.USERNAME,
227            ],
228        )
229        stage = AuthenticatorDuoStage.objects.create(
230            name=generate_id(),
231            client_id=generate_id(),
232            client_secret=generate_id(),
233            api_hostname=generate_id(),
234        )
235        flow = create_test_flow()
236        FlowStageBinding.objects.create(target=flow, stage=conf_stage, order=0)
237        FlowStageBinding.objects.create(target=flow, stage=stage, order=1)
238
239        response = self.client.post(
240            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
241            {"uid_field": self.user.username},
242        )
243        self.assertEqual(response.status_code, 302)
244
245        enroll_mock = MagicMock(
246            return_value={
247                "user_id": "foo",
248                "activation_barcode": "bar",
249                "activation_code": "bar",
250            }
251        )
252        with patch("duo_client.auth.Auth.enroll", enroll_mock):
253            response = self.client.get(
254                reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
255                follow=True,
256            )
257            self.assertStageResponse(
258                response,
259                flow,
260                component="ak-stage-authenticator-duo",
261                pending_user=self.user.username,
262                activation_barcode="bar",
263                activation_code="bar",
264            )
265
266            response = self.client.get(
267                reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
268                follow=True,
269            )
270            self.assertStageResponse(
271                response,
272                flow,
273                component="ak-stage-authenticator-duo",
274                pending_user=self.user.username,
275                activation_barcode="bar",
276                activation_code="bar",
277            )
278            self.assertEqual(enroll_mock.call_count, 1)
279
280            with patch("duo_client.auth.Auth.enroll_status", MagicMock(return_value="success")):
281                response = self.client.post(
282                    reverse(
283                        "authentik_api:authenticatorduostage-enrollment-status",
284                        kwargs={
285                            "pk": str(stage.pk),
286                        },
287                    )
288                )
289                self.assertEqual(response.status_code, 200)
290                self.assertJSONEqual(response.content, {"duo_response": "success"})
291
292                response = self.client.post(
293                    reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), {}
294                )
295                self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))

Test duo stage

def setUp(self) -> None:
21    def setUp(self) -> None:
22        self.user = create_test_admin_user()
23        self.request_factory = RequestFactory()

Hook method for setting up the test fixture before exercising it.

def test_client(self):
25    def test_client(self):
26        """Test Duo client setup"""
27        stage = AuthenticatorDuoStage(
28            name=generate_id(),
29            client_id=generate_id(),
30            client_secret=generate_id(),
31            admin_integration_key=generate_id(),
32            admin_secret_key=generate_id(),
33            api_hostname=generate_id(),
34        )
35        self.assertEqual(stage.auth_client().ikey, stage.client_id)
36        self.assertEqual(stage.admin_client().ikey, stage.admin_integration_key)
37        stage.admin_integration_key = ""
38        with self.assertRaises(ValueError):
39            self.assertEqual(stage.admin_client().ikey, stage.admin_integration_key)

Test Duo client setup

def test_api_enrollment_invalid(self):
41    def test_api_enrollment_invalid(self):
42        """Test `enrollment_status`"""
43        self.client.force_login(self.user)
44        response = self.client.post(
45            reverse(
46                "authentik_api:authenticatorduostage-enrollment-status",
47                kwargs={
48                    "pk": str(uuid4()),
49                },
50            )
51        )
52        self.assertEqual(response.status_code, 404)

Test enrollment_status

def test_api_import_manual_invalid_username(self):
54    def test_api_import_manual_invalid_username(self):
55        """Test `import_device_manual`"""
56        self.client.force_login(self.user)
57        stage = AuthenticatorDuoStage.objects.create(
58            name=generate_id(),
59            client_id=generate_id(),
60            client_secret=generate_id(),
61            api_hostname=generate_id(),
62        )
63        response = self.client.post(
64            reverse(
65                "authentik_api:authenticatorduostage-import-device-manual",
66                kwargs={
67                    "pk": str(stage.pk),
68                },
69            ),
70            data={
71                "username": generate_id(),
72            },
73        )
74        self.assertEqual(response.status_code, 400)

Test import_device_manual

def test_api_import_manual_duplicate_device(self):
 76    def test_api_import_manual_duplicate_device(self):
 77        """Test `import_device_manual`"""
 78        self.client.force_login(self.user)
 79        stage = AuthenticatorDuoStage.objects.create(
 80            name=generate_id(),
 81            client_id=generate_id(),
 82            client_secret=generate_id(),
 83            api_hostname=generate_id(),
 84        )
 85        device = DuoDevice.objects.create(
 86            name="foo",
 87            duo_user_id=generate_id(),
 88            user=self.user,
 89            stage=stage,
 90        )
 91        response = self.client.post(
 92            reverse(
 93                "authentik_api:authenticatorduostage-import-device-manual",
 94                kwargs={
 95                    "pk": str(stage.pk),
 96                },
 97            ),
 98            data={
 99                "username": self.user.username,
100                "duo_user_id": device.duo_user_id,
101            },
102        )
103        self.assertEqual(response.status_code, 400)

Test import_device_manual

def test_api_import_manual(self):
105    def test_api_import_manual(self):
106        """Test `import_device_manual`"""
107        self.client.force_login(self.user)
108        stage = AuthenticatorDuoStage.objects.create(
109            name=generate_id(),
110            client_id=generate_id(),
111            client_secret=generate_id(),
112            api_hostname=generate_id(),
113        )
114        response = self.client.post(
115            reverse(
116                "authentik_api:authenticatorduostage-import-device-manual",
117                kwargs={
118                    "pk": str(stage.pk),
119                },
120            ),
121            data={
122                "username": self.user.username,
123                "duo_user_id": "foo",
124            },
125        )
126        self.assertEqual(response.status_code, 204)

Test import_device_manual

def test_api_import_automatic_invalid(self):
128    def test_api_import_automatic_invalid(self):
129        """test `import_devices_automatic`"""
130        self.client.force_login(self.user)
131        stage = AuthenticatorDuoStage.objects.create(
132            name=generate_id(),
133            client_id=generate_id(),
134            client_secret=generate_id(),
135            api_hostname=generate_id(),
136        )
137
138        # Test missing admin credentials
139        response = self.client.post(
140            reverse(
141                "authentik_api:authenticatorduostage-import-devices-automatic",
142                kwargs={
143                    "pk": str(stage.pk),
144                },
145            ),
146        )
147        self.assertEqual(response.status_code, 400)
148
149        # Test internal error handling
150        stage.admin_integration_key = generate_id()
151        stage.admin_secret_key = generate_id()
152        stage.save()
153        with patch(
154            "duo_client.admin.Admin.get_users_iterator",
155            MagicMock(side_effect=RuntimeError("Duo API error")),
156        ):
157            response = self.client.post(
158                reverse(
159                    "authentik_api:authenticatorduostage-import-devices-automatic",
160                    kwargs={
161                        "pk": str(stage.pk),
162                    },
163                ),
164            )
165            self.assertEqual(response.status_code, 400)
166            self.assertJSONEqual(
167                response.content,
168                {
169                    "error": "An internal error occurred while importing devices.",
170                    "count": 0,
171                },
172            )

test import_devices_automatic

def test_api_import_automatic(self):
174    def test_api_import_automatic(self):
175        """test `import_devices_automatic`"""
176        self.client.force_login(self.user)
177        stage = AuthenticatorDuoStage.objects.create(
178            name=generate_id(),
179            client_id=generate_id(),
180            client_secret=generate_id(),
181            admin_integration_key=generate_id(),
182            admin_secret_key=generate_id(),
183            api_hostname=generate_id(),
184        )
185        device = DuoDevice.objects.create(
186            name="foo",
187            duo_user_id=generate_id(),
188            user=self.user,
189            stage=stage,
190        )
191        with patch(
192            "duo_client.admin.Admin.get_users_iterator",
193            MagicMock(
194                return_value=[
195                    {
196                        "user_id": "foo",
197                        "username": "bar",
198                    },
199                    {
200                        "user_id": device.duo_user_id,
201                        "username": self.user.username,
202                    },
203                    {
204                        "user_id": generate_id(),
205                        "username": self.user.username,
206                    },
207                ]
208            ),
209        ):
210            response = self.client.post(
211                reverse(
212                    "authentik_api:authenticatorduostage-import-devices-automatic",
213                    kwargs={
214                        "pk": str(stage.pk),
215                    },
216                ),
217            )
218            self.assertEqual(response.status_code, 200)
219            self.assertEqual(response.content.decode(), '{"error":"","count":1}')

test import_devices_automatic

def test_stage_enroll_basic(self):
221    def test_stage_enroll_basic(self):
222        """Test stage"""
223        conf_stage = IdentificationStage.objects.create(
224            name=generate_id(),
225            user_fields=[
226                UserFields.USERNAME,
227            ],
228        )
229        stage = AuthenticatorDuoStage.objects.create(
230            name=generate_id(),
231            client_id=generate_id(),
232            client_secret=generate_id(),
233            api_hostname=generate_id(),
234        )
235        flow = create_test_flow()
236        FlowStageBinding.objects.create(target=flow, stage=conf_stage, order=0)
237        FlowStageBinding.objects.create(target=flow, stage=stage, order=1)
238
239        response = self.client.post(
240            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
241            {"uid_field": self.user.username},
242        )
243        self.assertEqual(response.status_code, 302)
244
245        enroll_mock = MagicMock(
246            return_value={
247                "user_id": "foo",
248                "activation_barcode": "bar",
249                "activation_code": "bar",
250            }
251        )
252        with patch("duo_client.auth.Auth.enroll", enroll_mock):
253            response = self.client.get(
254                reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
255                follow=True,
256            )
257            self.assertStageResponse(
258                response,
259                flow,
260                component="ak-stage-authenticator-duo",
261                pending_user=self.user.username,
262                activation_barcode="bar",
263                activation_code="bar",
264            )
265
266            response = self.client.get(
267                reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
268                follow=True,
269            )
270            self.assertStageResponse(
271                response,
272                flow,
273                component="ak-stage-authenticator-duo",
274                pending_user=self.user.username,
275                activation_barcode="bar",
276                activation_code="bar",
277            )
278            self.assertEqual(enroll_mock.call_count, 1)
279
280            with patch("duo_client.auth.Auth.enroll_status", MagicMock(return_value="success")):
281                response = self.client.post(
282                    reverse(
283                        "authentik_api:authenticatorduostage-enrollment-status",
284                        kwargs={
285                            "pk": str(stage.pk),
286                        },
287                    )
288                )
289                self.assertEqual(response.status_code, 200)
290                self.assertJSONEqual(response.content, {"duo_response": "success"})
291
292                response = self.client.post(
293                    reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}), {}
294                )
295                self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))

Test stage