authentik.stages.authenticator_validate.tests.test_duo

Test validator stage

  1"""Test validator stage"""
  2
  3from unittest.mock import MagicMock, patch
  4
  5from django.urls import reverse
  6from rest_framework.exceptions import ValidationError
  7
  8from authentik.brands.utils import get_brand_for_request
  9from authentik.core.middleware import RESPONSE_HEADER_ID
 10from authentik.core.tests.utils import RequestFactory, create_test_admin_user, create_test_flow
 11from authentik.events.models import Event, EventAction
 12from authentik.flows.models import FlowDesignation, FlowStageBinding
 13from authentik.flows.planner import PLAN_CONTEXT_PENDING_USER, FlowPlan
 14from authentik.flows.stage import StageView
 15from authentik.flows.tests import FlowTestCase
 16from authentik.flows.views.executor import SESSION_KEY_PLAN, FlowExecutorView
 17from authentik.lib.generators import generate_id, generate_key
 18from authentik.stages.authenticator_duo.models import AuthenticatorDuoStage, DuoDevice
 19from authentik.stages.authenticator_validate.challenge import validate_challenge_duo
 20from authentik.stages.authenticator_validate.models import AuthenticatorValidateStage, DeviceClasses
 21from authentik.stages.user_login.models import UserLoginStage
 22
 23
 24class AuthenticatorValidateStageDuoTests(FlowTestCase):
 25    """Test validator stage"""
 26
 27    def setUp(self) -> None:
 28        self.user = create_test_admin_user()
 29        self.request_factory = RequestFactory()
 30
 31    def test_device_challenge_duo(self):
 32        """Test duo"""
 33        request = self.request_factory.get("/")
 34
 35        request.brand = get_brand_for_request(request)
 36
 37        stage = AuthenticatorDuoStage.objects.create(
 38            name=generate_id(),
 39            client_id=generate_id(),
 40            client_secret=generate_key(),
 41            api_hostname="",
 42        )
 43        duo_device = DuoDevice.objects.create(
 44            user=self.user,
 45            stage=stage,
 46        )
 47        with patch(
 48            "authentik.stages.authenticator_duo.models.AuthenticatorDuoStage.auth_client",
 49            MagicMock(
 50                return_value=MagicMock(
 51                    auth=MagicMock(
 52                        return_value={
 53                            "result": "allow",
 54                            "status": "allow",
 55                            "status_msg": "Success. Logging you in...",
 56                        }
 57                    )
 58                )
 59            ),
 60        ):
 61            self.assertEqual(
 62                duo_device,
 63                validate_challenge_duo(
 64                    duo_device.pk,
 65                    StageView(
 66                        FlowExecutorView(
 67                            current_stage=stage,
 68                            plan=FlowPlan(generate_id(), [], {}),
 69                        ),
 70                        request=request,
 71                    ),
 72                    self.user,
 73                ),
 74            )
 75        with patch(
 76            "authentik.stages.authenticator_duo.models.AuthenticatorDuoStage.auth_client",
 77            MagicMock(
 78                return_value=MagicMock(
 79                    auth=MagicMock(
 80                        return_value={
 81                            "result": "deny",
 82                            "status": "deny",
 83                            "status_msg": "foo",
 84                        }
 85                    )
 86                )
 87            ),
 88        ):
 89            with self.assertRaises(ValidationError):
 90                validate_challenge_duo(
 91                    duo_device.pk,
 92                    StageView(
 93                        FlowExecutorView(
 94                            current_stage=stage,
 95                            plan=FlowPlan(generate_id(), [], {}),
 96                        ),
 97                        request=request,
 98                    ),
 99                    self.user,
100                )
101
102    @patch(
103        "authentik.stages.authenticator_duo.models.AuthenticatorDuoStage.auth_client",
104        MagicMock(
105            return_value=MagicMock(
106                auth=MagicMock(
107                    return_value={
108                        "result": "allow",
109                        "status": "allow",
110                        "status_msg": "Success. Logging you in...",
111                    }
112                )
113            )
114        ),
115    )
116    def test_full(self):
117        """Test full within a flow executor"""
118        duo_stage = AuthenticatorDuoStage.objects.create(
119            name=generate_id(),
120            client_id=generate_id(),
121            client_secret=generate_key(),
122            api_hostname="",
123        )
124        duo_device = DuoDevice.objects.create(
125            user=self.user,
126            stage=duo_stage,
127        )
128
129        flow = create_test_flow(FlowDesignation.AUTHENTICATION)
130        stage = AuthenticatorValidateStage.objects.create(
131            name=generate_id(),
132            device_classes=[DeviceClasses.DUO],
133        )
134
135        plan = FlowPlan(flow_pk=flow.pk.hex)
136        plan.append(FlowStageBinding.objects.create(target=flow, stage=stage, order=2))
137        plan.append(
138            FlowStageBinding.objects.create(
139                target=flow, stage=UserLoginStage.objects.create(name=generate_id()), order=3
140            )
141        )
142        plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
143        session = self.client.session
144        session[SESSION_KEY_PLAN] = plan
145        session.save()
146
147        response = self.client.get(
148            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
149        )
150        self.assertEqual(response.status_code, 200)
151
152        response = self.client.post(
153            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
154            {"duo": duo_device.pk},
155            follow=True,
156        )
157
158        self.assertEqual(response.status_code, 200)
159        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
160        event = Event.objects.filter(
161            action=EventAction.LOGIN,
162            user__pk=self.user.pk,
163        ).first()
164        self.assertIsNotNone(event)
165        self.assertEqual(
166            event.context,
167            {
168                "auth_method": "auth_mfa",
169                "auth_method_args": {
170                    "known_device": False,
171                    "mfa_devices": [
172                        {
173                            "app": "authentik_stages_authenticator_duo",
174                            "model_name": "duodevice",
175                            "name": "",
176                            "pk": duo_device.pk,
177                        }
178                    ],
179                },
180                "http_request": {
181                    "args": {},
182                    "method": "GET",
183                    "path": f"/api/v3/flows/executor/{flow.slug}/",
184                    "user_agent": "",
185                    "request_id": response[RESPONSE_HEADER_ID],
186                },
187            },
188        )
class AuthenticatorValidateStageDuoTests(authentik.flows.tests.FlowTestCase):
 25class AuthenticatorValidateStageDuoTests(FlowTestCase):
 26    """Test validator stage"""
 27
 28    def setUp(self) -> None:
 29        self.user = create_test_admin_user()
 30        self.request_factory = RequestFactory()
 31
 32    def test_device_challenge_duo(self):
 33        """Test duo"""
 34        request = self.request_factory.get("/")
 35
 36        request.brand = get_brand_for_request(request)
 37
 38        stage = AuthenticatorDuoStage.objects.create(
 39            name=generate_id(),
 40            client_id=generate_id(),
 41            client_secret=generate_key(),
 42            api_hostname="",
 43        )
 44        duo_device = DuoDevice.objects.create(
 45            user=self.user,
 46            stage=stage,
 47        )
 48        with patch(
 49            "authentik.stages.authenticator_duo.models.AuthenticatorDuoStage.auth_client",
 50            MagicMock(
 51                return_value=MagicMock(
 52                    auth=MagicMock(
 53                        return_value={
 54                            "result": "allow",
 55                            "status": "allow",
 56                            "status_msg": "Success. Logging you in...",
 57                        }
 58                    )
 59                )
 60            ),
 61        ):
 62            self.assertEqual(
 63                duo_device,
 64                validate_challenge_duo(
 65                    duo_device.pk,
 66                    StageView(
 67                        FlowExecutorView(
 68                            current_stage=stage,
 69                            plan=FlowPlan(generate_id(), [], {}),
 70                        ),
 71                        request=request,
 72                    ),
 73                    self.user,
 74                ),
 75            )
 76        with patch(
 77            "authentik.stages.authenticator_duo.models.AuthenticatorDuoStage.auth_client",
 78            MagicMock(
 79                return_value=MagicMock(
 80                    auth=MagicMock(
 81                        return_value={
 82                            "result": "deny",
 83                            "status": "deny",
 84                            "status_msg": "foo",
 85                        }
 86                    )
 87                )
 88            ),
 89        ):
 90            with self.assertRaises(ValidationError):
 91                validate_challenge_duo(
 92                    duo_device.pk,
 93                    StageView(
 94                        FlowExecutorView(
 95                            current_stage=stage,
 96                            plan=FlowPlan(generate_id(), [], {}),
 97                        ),
 98                        request=request,
 99                    ),
100                    self.user,
101                )
102
103    @patch(
104        "authentik.stages.authenticator_duo.models.AuthenticatorDuoStage.auth_client",
105        MagicMock(
106            return_value=MagicMock(
107                auth=MagicMock(
108                    return_value={
109                        "result": "allow",
110                        "status": "allow",
111                        "status_msg": "Success. Logging you in...",
112                    }
113                )
114            )
115        ),
116    )
117    def test_full(self):
118        """Test full within a flow executor"""
119        duo_stage = AuthenticatorDuoStage.objects.create(
120            name=generate_id(),
121            client_id=generate_id(),
122            client_secret=generate_key(),
123            api_hostname="",
124        )
125        duo_device = DuoDevice.objects.create(
126            user=self.user,
127            stage=duo_stage,
128        )
129
130        flow = create_test_flow(FlowDesignation.AUTHENTICATION)
131        stage = AuthenticatorValidateStage.objects.create(
132            name=generate_id(),
133            device_classes=[DeviceClasses.DUO],
134        )
135
136        plan = FlowPlan(flow_pk=flow.pk.hex)
137        plan.append(FlowStageBinding.objects.create(target=flow, stage=stage, order=2))
138        plan.append(
139            FlowStageBinding.objects.create(
140                target=flow, stage=UserLoginStage.objects.create(name=generate_id()), order=3
141            )
142        )
143        plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
144        session = self.client.session
145        session[SESSION_KEY_PLAN] = plan
146        session.save()
147
148        response = self.client.get(
149            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
150        )
151        self.assertEqual(response.status_code, 200)
152
153        response = self.client.post(
154            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
155            {"duo": duo_device.pk},
156            follow=True,
157        )
158
159        self.assertEqual(response.status_code, 200)
160        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
161        event = Event.objects.filter(
162            action=EventAction.LOGIN,
163            user__pk=self.user.pk,
164        ).first()
165        self.assertIsNotNone(event)
166        self.assertEqual(
167            event.context,
168            {
169                "auth_method": "auth_mfa",
170                "auth_method_args": {
171                    "known_device": False,
172                    "mfa_devices": [
173                        {
174                            "app": "authentik_stages_authenticator_duo",
175                            "model_name": "duodevice",
176                            "name": "",
177                            "pk": duo_device.pk,
178                        }
179                    ],
180                },
181                "http_request": {
182                    "args": {},
183                    "method": "GET",
184                    "path": f"/api/v3/flows/executor/{flow.slug}/",
185                    "user_agent": "",
186                    "request_id": response[RESPONSE_HEADER_ID],
187                },
188            },
189        )

Test validator stage

def setUp(self) -> None:
28    def setUp(self) -> None:
29        self.user = create_test_admin_user()
30        self.request_factory = RequestFactory()

Hook method for setting up the test fixture before exercising it.

def test_device_challenge_duo(self):
 32    def test_device_challenge_duo(self):
 33        """Test duo"""
 34        request = self.request_factory.get("/")
 35
 36        request.brand = get_brand_for_request(request)
 37
 38        stage = AuthenticatorDuoStage.objects.create(
 39            name=generate_id(),
 40            client_id=generate_id(),
 41            client_secret=generate_key(),
 42            api_hostname="",
 43        )
 44        duo_device = DuoDevice.objects.create(
 45            user=self.user,
 46            stage=stage,
 47        )
 48        with patch(
 49            "authentik.stages.authenticator_duo.models.AuthenticatorDuoStage.auth_client",
 50            MagicMock(
 51                return_value=MagicMock(
 52                    auth=MagicMock(
 53                        return_value={
 54                            "result": "allow",
 55                            "status": "allow",
 56                            "status_msg": "Success. Logging you in...",
 57                        }
 58                    )
 59                )
 60            ),
 61        ):
 62            self.assertEqual(
 63                duo_device,
 64                validate_challenge_duo(
 65                    duo_device.pk,
 66                    StageView(
 67                        FlowExecutorView(
 68                            current_stage=stage,
 69                            plan=FlowPlan(generate_id(), [], {}),
 70                        ),
 71                        request=request,
 72                    ),
 73                    self.user,
 74                ),
 75            )
 76        with patch(
 77            "authentik.stages.authenticator_duo.models.AuthenticatorDuoStage.auth_client",
 78            MagicMock(
 79                return_value=MagicMock(
 80                    auth=MagicMock(
 81                        return_value={
 82                            "result": "deny",
 83                            "status": "deny",
 84                            "status_msg": "foo",
 85                        }
 86                    )
 87                )
 88            ),
 89        ):
 90            with self.assertRaises(ValidationError):
 91                validate_challenge_duo(
 92                    duo_device.pk,
 93                    StageView(
 94                        FlowExecutorView(
 95                            current_stage=stage,
 96                            plan=FlowPlan(generate_id(), [], {}),
 97                        ),
 98                        request=request,
 99                    ),
100                    self.user,
101                )

Test duo

@patch('authentik.stages.authenticator_duo.models.AuthenticatorDuoStage.auth_client', MagicMock(return_value=MagicMock(auth=MagicMock(return_value={'result': 'allow', 'status': 'allow', 'status_msg': 'Success. Logging you in...'}))))
def test_full(self):
103    @patch(
104        "authentik.stages.authenticator_duo.models.AuthenticatorDuoStage.auth_client",
105        MagicMock(
106            return_value=MagicMock(
107                auth=MagicMock(
108                    return_value={
109                        "result": "allow",
110                        "status": "allow",
111                        "status_msg": "Success. Logging you in...",
112                    }
113                )
114            )
115        ),
116    )
117    def test_full(self):
118        """Test full within a flow executor"""
119        duo_stage = AuthenticatorDuoStage.objects.create(
120            name=generate_id(),
121            client_id=generate_id(),
122            client_secret=generate_key(),
123            api_hostname="",
124        )
125        duo_device = DuoDevice.objects.create(
126            user=self.user,
127            stage=duo_stage,
128        )
129
130        flow = create_test_flow(FlowDesignation.AUTHENTICATION)
131        stage = AuthenticatorValidateStage.objects.create(
132            name=generate_id(),
133            device_classes=[DeviceClasses.DUO],
134        )
135
136        plan = FlowPlan(flow_pk=flow.pk.hex)
137        plan.append(FlowStageBinding.objects.create(target=flow, stage=stage, order=2))
138        plan.append(
139            FlowStageBinding.objects.create(
140                target=flow, stage=UserLoginStage.objects.create(name=generate_id()), order=3
141            )
142        )
143        plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
144        session = self.client.session
145        session[SESSION_KEY_PLAN] = plan
146        session.save()
147
148        response = self.client.get(
149            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
150        )
151        self.assertEqual(response.status_code, 200)
152
153        response = self.client.post(
154            reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
155            {"duo": duo_device.pk},
156            follow=True,
157        )
158
159        self.assertEqual(response.status_code, 200)
160        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
161        event = Event.objects.filter(
162            action=EventAction.LOGIN,
163            user__pk=self.user.pk,
164        ).first()
165        self.assertIsNotNone(event)
166        self.assertEqual(
167            event.context,
168            {
169                "auth_method": "auth_mfa",
170                "auth_method_args": {
171                    "known_device": False,
172                    "mfa_devices": [
173                        {
174                            "app": "authentik_stages_authenticator_duo",
175                            "model_name": "duodevice",
176                            "name": "",
177                            "pk": duo_device.pk,
178                        }
179                    ],
180                },
181                "http_request": {
182                    "args": {},
183                    "method": "GET",
184                    "path": f"/api/v3/flows/executor/{flow.slug}/",
185                    "user_agent": "",
186                    "request_id": response[RESPONSE_HEADER_ID],
187                },
188            },
189        )

Test full within a flow executor