authentik.stages.user_login.tests

login tests

  1"""login tests"""
  2
  3from time import sleep
  4from unittest.mock import patch
  5
  6from django.http import HttpRequest
  7from django.urls import reverse
  8from django.utils.timezone import now
  9
 10from authentik.blueprints.tests import apply_blueprint
 11from authentik.core.models import AuthenticatedSession, Session
 12from authentik.core.tests.utils import create_test_flow, create_test_user
 13from authentik.events.models import Event, EventAction
 14from authentik.events.utils import get_user
 15from authentik.flows.markers import StageMarker
 16from authentik.flows.models import FlowDesignation, FlowStageBinding
 17from authentik.flows.planner import PLAN_CONTEXT_PENDING_USER, FlowPlan
 18from authentik.flows.tests import FlowTestCase
 19from authentik.flows.tests.test_executor import TO_STAGE_RESPONSE_MOCK
 20from authentik.flows.views.executor import NEXT_ARG_NAME, SESSION_KEY_PLAN
 21from authentik.lib.generators import generate_id
 22from authentik.lib.utils.time import timedelta_from_string
 23from authentik.root.middleware import ClientIPMiddleware
 24from authentik.stages.user_login.middleware import (
 25    SESSION_KEY_BINDING_NET,
 26    BoundSessionMiddleware,
 27    SessionBindingBroken,
 28    logout_extra,
 29)
 30from authentik.stages.user_login.models import GeoIPBinding, NetworkBinding, UserLoginStage
 31
 32
 33class TestUserLoginStage(FlowTestCase):
 34    """Login tests"""
 35
 36    def setUp(self):
 37        super().setUp()
 38        self.user = create_test_user()
 39
 40        self.flow = create_test_flow(FlowDesignation.AUTHENTICATION)
 41        self.stage = UserLoginStage.objects.create(name="login")
 42        self.binding = FlowStageBinding.objects.create(target=self.flow, stage=self.stage, order=2)
 43
 44    def test_valid_get(self):
 45        """Test with a valid pending user and backend"""
 46        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
 47        plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
 48        session = self.client.session
 49        session[SESSION_KEY_PLAN] = plan
 50        session.save()
 51
 52        response = self.client.get(
 53            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
 54        )
 55
 56        self.assertEqual(response.status_code, 200)
 57        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
 58
 59    def test_valid_post(self):
 60        """Test with a valid pending user and backend"""
 61        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
 62        plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
 63        session = self.client.session
 64        session[SESSION_KEY_PLAN] = plan
 65        session.save()
 66
 67        response = self.client.post(
 68            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
 69        )
 70
 71        self.assertEqual(response.status_code, 200)
 72        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
 73
 74    def test_terminate_other_sessions(self):
 75        """Test terminate_other_sessions"""
 76        self.stage.terminate_other_sessions = True
 77        self.stage.save()
 78        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
 79        plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
 80        session = self.client.session
 81        session[SESSION_KEY_PLAN] = plan
 82        session.save()
 83
 84        key = generate_id()
 85        AuthenticatedSession.objects.create(
 86            session=Session.objects.create(
 87                session_key=key,
 88                last_ip=ClientIPMiddleware.default_ip,
 89            ),
 90            user=self.user,
 91        )
 92
 93        response = self.client.post(
 94            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
 95        )
 96
 97        self.assertEqual(response.status_code, 200)
 98        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
 99        self.assertFalse(AuthenticatedSession.objects.filter(session__session_key=key))
100        self.assertFalse(Session.objects.filter(session_key=key).exists())
101
102    def test_expiry(self):
103        """Test with expiry"""
104        self.stage.session_duration = "seconds=2"
105        self.stage.save()
106        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
107        plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
108        session = self.client.session
109        session[SESSION_KEY_PLAN] = plan
110        session.save()
111
112        before_request = now()
113        response = self.client.get(
114            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
115        )
116        self.assertEqual(response.status_code, 200)
117        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
118        self.assertNotEqual(list(self.client.session.keys()), [])
119        session_key = self.client.session.session_key
120        session = Session.objects.filter(session_key=session_key).first()
121        self.assertAlmostEqual(
122            session.expires.timestamp() - before_request.timestamp(),
123            timedelta_from_string(self.stage.session_duration).total_seconds(),
124            delta=1,
125        )
126        sleep(3)
127        self.client.session.clear_expired()
128        self.assertEqual(list(self.client.session.keys()), [])
129
130    def test_expiry_remember(self):
131        """Test with expiry"""
132        self.stage.session_duration = "seconds=2"
133        self.stage.remember_me_offset = "seconds=2"
134        self.stage.save()
135        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
136        plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
137        session = self.client.session
138        session[SESSION_KEY_PLAN] = plan
139        session.save()
140
141        response = self.client.get(
142            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
143        )
144        self.assertStageResponse(response, component="ak-stage-user-login")
145
146        response = self.client.post(
147            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
148            data={"remember_me": True},
149        )
150        _now = now().timestamp()
151        self.assertEqual(response.status_code, 200)
152        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
153        self.assertNotEqual(list(self.client.session.keys()), [])
154        session_key = self.client.session.session_key
155        session = Session.objects.filter(session_key=session_key).first()
156        self.assertAlmostEqual(
157            session.expires.timestamp() - _now,
158            timedelta_from_string(self.stage.session_duration).total_seconds()
159            + timedelta_from_string(self.stage.remember_me_offset).total_seconds(),
160            delta=1,
161        )
162        sleep(5)
163        self.client.session.clear_expired()
164        self.assertEqual(list(self.client.session.keys()), [])
165
166    @patch(
167        "authentik.flows.views.executor.to_stage_response",
168        TO_STAGE_RESPONSE_MOCK,
169    )
170    def test_without_user(self):
171        """Test a plan without any pending user, resulting in a denied"""
172        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
173        session = self.client.session
174        session[SESSION_KEY_PLAN] = plan
175        session.save()
176
177        response = self.client.get(
178            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
179        )
180
181        self.assertStageResponse(
182            response,
183            self.flow,
184            component="ak-stage-access-denied",
185        )
186
187    @apply_blueprint("default/flow-default-user-settings-flow.yaml")
188    def test_inactive_account(self):
189        """Test with a valid pending user and backend"""
190        self.user.is_active = False
191        self.user.save()
192        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
193        plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
194        session = self.client.session
195        session[SESSION_KEY_PLAN] = plan
196        session.save()
197
198        response = self.client.get(
199            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
200        )
201        self.assertEqual(response.status_code, 200)
202        self.assertStageResponse(
203            response, self.flow, component="ak-stage-access-denied", error_message="Unknown error"
204        )
205
206        # Check that API requests get rejected
207        response = self.client.get(reverse("authentik_api:application-list"))
208        self.assertEqual(response.status_code, 403)
209
210        # Check that flow requests requiring a user also get rejected
211        response = self.client.get(
212            reverse(
213                "authentik_api:flow-executor",
214                kwargs={"flow_slug": "default-user-settings-flow"},
215            )
216        )
217        self.assertStageResponse(
218            response,
219            self.flow,
220            component="ak-stage-access-denied",
221            error_message="Flow does not apply to current user.",
222        )
223
224    def test_binding_net_break_log(self):
225        """Test logout_extra with exception"""
226        # IPs from https://github.com/maxmind/MaxMind-DB/blob/main/source-data/GeoLite2-ASN-Test.json
227        for args, expect in [
228            [[NetworkBinding.BIND_ASN, "8.8.8.8", "8.8.8.8"], ["network.missing"]],
229            [[NetworkBinding.BIND_ASN, "1.0.0.1", "1.128.0.1"], ["network.asn"]],
230            [
231                [NetworkBinding.BIND_ASN_NETWORK, "12.81.96.1", "12.81.128.1"],
232                ["network.asn_network"],
233            ],
234            [[NetworkBinding.BIND_ASN_NETWORK_IP, "1.0.0.1", "1.0.0.2"], ["network.ip"]],
235        ]:
236            with self.subTest(args[0]):
237                with self.assertRaises(SessionBindingBroken) as cm:
238                    BoundSessionMiddleware.recheck_session_net(*args)
239                self.assertEqual(cm.exception.reason, expect[0])
240                # Ensure the request can be logged without throwing errors
241                self.client.force_login(self.user)
242                request = HttpRequest()
243                request.session = self.client.session
244                request.user = self.user
245                logout_extra(request, cm.exception)
246
247    def test_binding_geo_break_log(self):
248        """Test logout_extra with exception"""
249        # IPs from https://github.com/maxmind/MaxMind-DB/blob/main/source-data/GeoLite2-City-Test.json
250        for args, expect in [
251            [[GeoIPBinding.BIND_CONTINENT, "8.8.8.8", "8.8.8.8"], ["geoip.missing"]],
252            [[GeoIPBinding.BIND_CONTINENT, "2.125.160.216", "67.43.156.1"], ["geoip.continent"]],
253            [
254                [GeoIPBinding.BIND_CONTINENT_COUNTRY, "81.2.69.142", "89.160.20.112"],
255                ["geoip.country"],
256            ],
257            [
258                [GeoIPBinding.BIND_CONTINENT_COUNTRY_CITY, "2.125.160.216", "81.2.69.142"],
259                ["geoip.city"],
260            ],
261        ]:
262            with self.subTest(args[0]):
263                with self.assertRaises(SessionBindingBroken) as cm:
264                    BoundSessionMiddleware.recheck_session_geo(*args)
265                self.assertEqual(cm.exception.reason, expect[0])
266                # Ensure the request can be logged without throwing errors
267                self.client.force_login(self.user)
268                request = HttpRequest()
269                request.session = self.client.session
270                request.user = self.user
271                logout_extra(request, cm.exception)
272
273    def test_session_binding_broken(self):
274        """Test session binding"""
275        Event.objects.all().delete()
276        self.client.force_login(self.user)
277        session = self.client.session
278        session[Session.Keys.LAST_IP] = "192.0.2.1"
279        session[SESSION_KEY_BINDING_NET] = NetworkBinding.BIND_ASN_NETWORK_IP
280        session.save()
281
282        res = self.client.get(reverse("authentik_api:user-me"))
283        self.assertEqual(res.status_code, 302)
284        self.assertEqual(
285            res.url,
286            reverse(
287                "authentik_flows:default-authentication",
288            )
289            + f"?{NEXT_ARG_NAME}={reverse("authentik_api:user-me")}",
290        )
291        event = Event.objects.filter(action=EventAction.LOGOUT).first()
292        self.assertEqual(event.user, get_user(self.user))
class TestUserLoginStage(authentik.flows.tests.FlowTestCase):
 34class TestUserLoginStage(FlowTestCase):
 35    """Login tests"""
 36
 37    def setUp(self):
 38        super().setUp()
 39        self.user = create_test_user()
 40
 41        self.flow = create_test_flow(FlowDesignation.AUTHENTICATION)
 42        self.stage = UserLoginStage.objects.create(name="login")
 43        self.binding = FlowStageBinding.objects.create(target=self.flow, stage=self.stage, order=2)
 44
 45    def test_valid_get(self):
 46        """Test with a valid pending user and backend"""
 47        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
 48        plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
 49        session = self.client.session
 50        session[SESSION_KEY_PLAN] = plan
 51        session.save()
 52
 53        response = self.client.get(
 54            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
 55        )
 56
 57        self.assertEqual(response.status_code, 200)
 58        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
 59
 60    def test_valid_post(self):
 61        """Test with a valid pending user and backend"""
 62        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
 63        plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
 64        session = self.client.session
 65        session[SESSION_KEY_PLAN] = plan
 66        session.save()
 67
 68        response = self.client.post(
 69            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
 70        )
 71
 72        self.assertEqual(response.status_code, 200)
 73        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
 74
 75    def test_terminate_other_sessions(self):
 76        """Test terminate_other_sessions"""
 77        self.stage.terminate_other_sessions = True
 78        self.stage.save()
 79        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
 80        plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
 81        session = self.client.session
 82        session[SESSION_KEY_PLAN] = plan
 83        session.save()
 84
 85        key = generate_id()
 86        AuthenticatedSession.objects.create(
 87            session=Session.objects.create(
 88                session_key=key,
 89                last_ip=ClientIPMiddleware.default_ip,
 90            ),
 91            user=self.user,
 92        )
 93
 94        response = self.client.post(
 95            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
 96        )
 97
 98        self.assertEqual(response.status_code, 200)
 99        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
100        self.assertFalse(AuthenticatedSession.objects.filter(session__session_key=key))
101        self.assertFalse(Session.objects.filter(session_key=key).exists())
102
103    def test_expiry(self):
104        """Test with expiry"""
105        self.stage.session_duration = "seconds=2"
106        self.stage.save()
107        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
108        plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
109        session = self.client.session
110        session[SESSION_KEY_PLAN] = plan
111        session.save()
112
113        before_request = now()
114        response = self.client.get(
115            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
116        )
117        self.assertEqual(response.status_code, 200)
118        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
119        self.assertNotEqual(list(self.client.session.keys()), [])
120        session_key = self.client.session.session_key
121        session = Session.objects.filter(session_key=session_key).first()
122        self.assertAlmostEqual(
123            session.expires.timestamp() - before_request.timestamp(),
124            timedelta_from_string(self.stage.session_duration).total_seconds(),
125            delta=1,
126        )
127        sleep(3)
128        self.client.session.clear_expired()
129        self.assertEqual(list(self.client.session.keys()), [])
130
131    def test_expiry_remember(self):
132        """Test with expiry"""
133        self.stage.session_duration = "seconds=2"
134        self.stage.remember_me_offset = "seconds=2"
135        self.stage.save()
136        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
137        plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
138        session = self.client.session
139        session[SESSION_KEY_PLAN] = plan
140        session.save()
141
142        response = self.client.get(
143            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
144        )
145        self.assertStageResponse(response, component="ak-stage-user-login")
146
147        response = self.client.post(
148            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
149            data={"remember_me": True},
150        )
151        _now = now().timestamp()
152        self.assertEqual(response.status_code, 200)
153        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
154        self.assertNotEqual(list(self.client.session.keys()), [])
155        session_key = self.client.session.session_key
156        session = Session.objects.filter(session_key=session_key).first()
157        self.assertAlmostEqual(
158            session.expires.timestamp() - _now,
159            timedelta_from_string(self.stage.session_duration).total_seconds()
160            + timedelta_from_string(self.stage.remember_me_offset).total_seconds(),
161            delta=1,
162        )
163        sleep(5)
164        self.client.session.clear_expired()
165        self.assertEqual(list(self.client.session.keys()), [])
166
167    @patch(
168        "authentik.flows.views.executor.to_stage_response",
169        TO_STAGE_RESPONSE_MOCK,
170    )
171    def test_without_user(self):
172        """Test a plan without any pending user, resulting in a denied"""
173        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
174        session = self.client.session
175        session[SESSION_KEY_PLAN] = plan
176        session.save()
177
178        response = self.client.get(
179            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
180        )
181
182        self.assertStageResponse(
183            response,
184            self.flow,
185            component="ak-stage-access-denied",
186        )
187
188    @apply_blueprint("default/flow-default-user-settings-flow.yaml")
189    def test_inactive_account(self):
190        """Test with a valid pending user and backend"""
191        self.user.is_active = False
192        self.user.save()
193        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
194        plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
195        session = self.client.session
196        session[SESSION_KEY_PLAN] = plan
197        session.save()
198
199        response = self.client.get(
200            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
201        )
202        self.assertEqual(response.status_code, 200)
203        self.assertStageResponse(
204            response, self.flow, component="ak-stage-access-denied", error_message="Unknown error"
205        )
206
207        # Check that API requests get rejected
208        response = self.client.get(reverse("authentik_api:application-list"))
209        self.assertEqual(response.status_code, 403)
210
211        # Check that flow requests requiring a user also get rejected
212        response = self.client.get(
213            reverse(
214                "authentik_api:flow-executor",
215                kwargs={"flow_slug": "default-user-settings-flow"},
216            )
217        )
218        self.assertStageResponse(
219            response,
220            self.flow,
221            component="ak-stage-access-denied",
222            error_message="Flow does not apply to current user.",
223        )
224
225    def test_binding_net_break_log(self):
226        """Test logout_extra with exception"""
227        # IPs from https://github.com/maxmind/MaxMind-DB/blob/main/source-data/GeoLite2-ASN-Test.json
228        for args, expect in [
229            [[NetworkBinding.BIND_ASN, "8.8.8.8", "8.8.8.8"], ["network.missing"]],
230            [[NetworkBinding.BIND_ASN, "1.0.0.1", "1.128.0.1"], ["network.asn"]],
231            [
232                [NetworkBinding.BIND_ASN_NETWORK, "12.81.96.1", "12.81.128.1"],
233                ["network.asn_network"],
234            ],
235            [[NetworkBinding.BIND_ASN_NETWORK_IP, "1.0.0.1", "1.0.0.2"], ["network.ip"]],
236        ]:
237            with self.subTest(args[0]):
238                with self.assertRaises(SessionBindingBroken) as cm:
239                    BoundSessionMiddleware.recheck_session_net(*args)
240                self.assertEqual(cm.exception.reason, expect[0])
241                # Ensure the request can be logged without throwing errors
242                self.client.force_login(self.user)
243                request = HttpRequest()
244                request.session = self.client.session
245                request.user = self.user
246                logout_extra(request, cm.exception)
247
248    def test_binding_geo_break_log(self):
249        """Test logout_extra with exception"""
250        # IPs from https://github.com/maxmind/MaxMind-DB/blob/main/source-data/GeoLite2-City-Test.json
251        for args, expect in [
252            [[GeoIPBinding.BIND_CONTINENT, "8.8.8.8", "8.8.8.8"], ["geoip.missing"]],
253            [[GeoIPBinding.BIND_CONTINENT, "2.125.160.216", "67.43.156.1"], ["geoip.continent"]],
254            [
255                [GeoIPBinding.BIND_CONTINENT_COUNTRY, "81.2.69.142", "89.160.20.112"],
256                ["geoip.country"],
257            ],
258            [
259                [GeoIPBinding.BIND_CONTINENT_COUNTRY_CITY, "2.125.160.216", "81.2.69.142"],
260                ["geoip.city"],
261            ],
262        ]:
263            with self.subTest(args[0]):
264                with self.assertRaises(SessionBindingBroken) as cm:
265                    BoundSessionMiddleware.recheck_session_geo(*args)
266                self.assertEqual(cm.exception.reason, expect[0])
267                # Ensure the request can be logged without throwing errors
268                self.client.force_login(self.user)
269                request = HttpRequest()
270                request.session = self.client.session
271                request.user = self.user
272                logout_extra(request, cm.exception)
273
274    def test_session_binding_broken(self):
275        """Test session binding"""
276        Event.objects.all().delete()
277        self.client.force_login(self.user)
278        session = self.client.session
279        session[Session.Keys.LAST_IP] = "192.0.2.1"
280        session[SESSION_KEY_BINDING_NET] = NetworkBinding.BIND_ASN_NETWORK_IP
281        session.save()
282
283        res = self.client.get(reverse("authentik_api:user-me"))
284        self.assertEqual(res.status_code, 302)
285        self.assertEqual(
286            res.url,
287            reverse(
288                "authentik_flows:default-authentication",
289            )
290            + f"?{NEXT_ARG_NAME}={reverse("authentik_api:user-me")}",
291        )
292        event = Event.objects.filter(action=EventAction.LOGOUT).first()
293        self.assertEqual(event.user, get_user(self.user))

Login tests

def setUp(self):
37    def setUp(self):
38        super().setUp()
39        self.user = create_test_user()
40
41        self.flow = create_test_flow(FlowDesignation.AUTHENTICATION)
42        self.stage = UserLoginStage.objects.create(name="login")
43        self.binding = FlowStageBinding.objects.create(target=self.flow, stage=self.stage, order=2)

Hook method for setting up the test fixture before exercising it.

def test_valid_get(self):
45    def test_valid_get(self):
46        """Test with a valid pending user and backend"""
47        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
48        plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
49        session = self.client.session
50        session[SESSION_KEY_PLAN] = plan
51        session.save()
52
53        response = self.client.get(
54            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
55        )
56
57        self.assertEqual(response.status_code, 200)
58        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))

Test with a valid pending user and backend

def test_valid_post(self):
60    def test_valid_post(self):
61        """Test with a valid pending user and backend"""
62        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
63        plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
64        session = self.client.session
65        session[SESSION_KEY_PLAN] = plan
66        session.save()
67
68        response = self.client.post(
69            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
70        )
71
72        self.assertEqual(response.status_code, 200)
73        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))

Test with a valid pending user and backend

def test_terminate_other_sessions(self):
 75    def test_terminate_other_sessions(self):
 76        """Test terminate_other_sessions"""
 77        self.stage.terminate_other_sessions = True
 78        self.stage.save()
 79        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
 80        plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
 81        session = self.client.session
 82        session[SESSION_KEY_PLAN] = plan
 83        session.save()
 84
 85        key = generate_id()
 86        AuthenticatedSession.objects.create(
 87            session=Session.objects.create(
 88                session_key=key,
 89                last_ip=ClientIPMiddleware.default_ip,
 90            ),
 91            user=self.user,
 92        )
 93
 94        response = self.client.post(
 95            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
 96        )
 97
 98        self.assertEqual(response.status_code, 200)
 99        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
100        self.assertFalse(AuthenticatedSession.objects.filter(session__session_key=key))
101        self.assertFalse(Session.objects.filter(session_key=key).exists())

Test terminate_other_sessions

def test_expiry(self):
103    def test_expiry(self):
104        """Test with expiry"""
105        self.stage.session_duration = "seconds=2"
106        self.stage.save()
107        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
108        plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
109        session = self.client.session
110        session[SESSION_KEY_PLAN] = plan
111        session.save()
112
113        before_request = now()
114        response = self.client.get(
115            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
116        )
117        self.assertEqual(response.status_code, 200)
118        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
119        self.assertNotEqual(list(self.client.session.keys()), [])
120        session_key = self.client.session.session_key
121        session = Session.objects.filter(session_key=session_key).first()
122        self.assertAlmostEqual(
123            session.expires.timestamp() - before_request.timestamp(),
124            timedelta_from_string(self.stage.session_duration).total_seconds(),
125            delta=1,
126        )
127        sleep(3)
128        self.client.session.clear_expired()
129        self.assertEqual(list(self.client.session.keys()), [])

Test with expiry

def test_expiry_remember(self):
131    def test_expiry_remember(self):
132        """Test with expiry"""
133        self.stage.session_duration = "seconds=2"
134        self.stage.remember_me_offset = "seconds=2"
135        self.stage.save()
136        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
137        plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
138        session = self.client.session
139        session[SESSION_KEY_PLAN] = plan
140        session.save()
141
142        response = self.client.get(
143            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
144        )
145        self.assertStageResponse(response, component="ak-stage-user-login")
146
147        response = self.client.post(
148            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
149            data={"remember_me": True},
150        )
151        _now = now().timestamp()
152        self.assertEqual(response.status_code, 200)
153        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
154        self.assertNotEqual(list(self.client.session.keys()), [])
155        session_key = self.client.session.session_key
156        session = Session.objects.filter(session_key=session_key).first()
157        self.assertAlmostEqual(
158            session.expires.timestamp() - _now,
159            timedelta_from_string(self.stage.session_duration).total_seconds()
160            + timedelta_from_string(self.stage.remember_me_offset).total_seconds(),
161            delta=1,
162        )
163        sleep(5)
164        self.client.session.clear_expired()
165        self.assertEqual(list(self.client.session.keys()), [])

Test with expiry

@patch('authentik.flows.views.executor.to_stage_response', TO_STAGE_RESPONSE_MOCK)
def test_without_user(self):
167    @patch(
168        "authentik.flows.views.executor.to_stage_response",
169        TO_STAGE_RESPONSE_MOCK,
170    )
171    def test_without_user(self):
172        """Test a plan without any pending user, resulting in a denied"""
173        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
174        session = self.client.session
175        session[SESSION_KEY_PLAN] = plan
176        session.save()
177
178        response = self.client.get(
179            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
180        )
181
182        self.assertStageResponse(
183            response,
184            self.flow,
185            component="ak-stage-access-denied",
186        )

Test a plan without any pending user, resulting in a denied

@apply_blueprint('default/flow-default-user-settings-flow.yaml')
def test_inactive_account(self):
188    @apply_blueprint("default/flow-default-user-settings-flow.yaml")
189    def test_inactive_account(self):
190        """Test with a valid pending user and backend"""
191        self.user.is_active = False
192        self.user.save()
193        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
194        plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
195        session = self.client.session
196        session[SESSION_KEY_PLAN] = plan
197        session.save()
198
199        response = self.client.get(
200            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
201        )
202        self.assertEqual(response.status_code, 200)
203        self.assertStageResponse(
204            response, self.flow, component="ak-stage-access-denied", error_message="Unknown error"
205        )
206
207        # Check that API requests get rejected
208        response = self.client.get(reverse("authentik_api:application-list"))
209        self.assertEqual(response.status_code, 403)
210
211        # Check that flow requests requiring a user also get rejected
212        response = self.client.get(
213            reverse(
214                "authentik_api:flow-executor",
215                kwargs={"flow_slug": "default-user-settings-flow"},
216            )
217        )
218        self.assertStageResponse(
219            response,
220            self.flow,
221            component="ak-stage-access-denied",
222            error_message="Flow does not apply to current user.",
223        )

Test with a valid pending user and backend

def test_binding_net_break_log(self):
225    def test_binding_net_break_log(self):
226        """Test logout_extra with exception"""
227        # IPs from https://github.com/maxmind/MaxMind-DB/blob/main/source-data/GeoLite2-ASN-Test.json
228        for args, expect in [
229            [[NetworkBinding.BIND_ASN, "8.8.8.8", "8.8.8.8"], ["network.missing"]],
230            [[NetworkBinding.BIND_ASN, "1.0.0.1", "1.128.0.1"], ["network.asn"]],
231            [
232                [NetworkBinding.BIND_ASN_NETWORK, "12.81.96.1", "12.81.128.1"],
233                ["network.asn_network"],
234            ],
235            [[NetworkBinding.BIND_ASN_NETWORK_IP, "1.0.0.1", "1.0.0.2"], ["network.ip"]],
236        ]:
237            with self.subTest(args[0]):
238                with self.assertRaises(SessionBindingBroken) as cm:
239                    BoundSessionMiddleware.recheck_session_net(*args)
240                self.assertEqual(cm.exception.reason, expect[0])
241                # Ensure the request can be logged without throwing errors
242                self.client.force_login(self.user)
243                request = HttpRequest()
244                request.session = self.client.session
245                request.user = self.user
246                logout_extra(request, cm.exception)

Test logout_extra with exception

def test_binding_geo_break_log(self):
248    def test_binding_geo_break_log(self):
249        """Test logout_extra with exception"""
250        # IPs from https://github.com/maxmind/MaxMind-DB/blob/main/source-data/GeoLite2-City-Test.json
251        for args, expect in [
252            [[GeoIPBinding.BIND_CONTINENT, "8.8.8.8", "8.8.8.8"], ["geoip.missing"]],
253            [[GeoIPBinding.BIND_CONTINENT, "2.125.160.216", "67.43.156.1"], ["geoip.continent"]],
254            [
255                [GeoIPBinding.BIND_CONTINENT_COUNTRY, "81.2.69.142", "89.160.20.112"],
256                ["geoip.country"],
257            ],
258            [
259                [GeoIPBinding.BIND_CONTINENT_COUNTRY_CITY, "2.125.160.216", "81.2.69.142"],
260                ["geoip.city"],
261            ],
262        ]:
263            with self.subTest(args[0]):
264                with self.assertRaises(SessionBindingBroken) as cm:
265                    BoundSessionMiddleware.recheck_session_geo(*args)
266                self.assertEqual(cm.exception.reason, expect[0])
267                # Ensure the request can be logged without throwing errors
268                self.client.force_login(self.user)
269                request = HttpRequest()
270                request.session = self.client.session
271                request.user = self.user
272                logout_extra(request, cm.exception)

Test logout_extra with exception

def test_session_binding_broken(self):
274    def test_session_binding_broken(self):
275        """Test session binding"""
276        Event.objects.all().delete()
277        self.client.force_login(self.user)
278        session = self.client.session
279        session[Session.Keys.LAST_IP] = "192.0.2.1"
280        session[SESSION_KEY_BINDING_NET] = NetworkBinding.BIND_ASN_NETWORK_IP
281        session.save()
282
283        res = self.client.get(reverse("authentik_api:user-me"))
284        self.assertEqual(res.status_code, 302)
285        self.assertEqual(
286            res.url,
287            reverse(
288                "authentik_flows:default-authentication",
289            )
290            + f"?{NEXT_ARG_NAME}={reverse("authentik_api:user-me")}",
291        )
292        event = Event.objects.filter(action=EventAction.LOGOUT).first()
293        self.assertEqual(event.user, get_user(self.user))

Test session binding