authentik.stages.user_login.tests
login tests
1"""login tests""" 2 3from time import sleep 4from unittest.mock import patch 5 6from django.http import HttpRequest 7from django.urls import reverse 8from django.utils.timezone import now 9 10from authentik.blueprints.tests import apply_blueprint 11from authentik.core.models import AuthenticatedSession, Session 12from authentik.core.tests.utils import create_test_flow, create_test_user 13from authentik.events.models import Event, EventAction 14from authentik.events.utils import get_user 15from authentik.flows.markers import StageMarker 16from authentik.flows.models import FlowDesignation, FlowStageBinding 17from authentik.flows.planner import PLAN_CONTEXT_PENDING_USER, FlowPlan 18from authentik.flows.tests import FlowTestCase 19from authentik.flows.tests.test_executor import TO_STAGE_RESPONSE_MOCK 20from authentik.flows.views.executor import NEXT_ARG_NAME, SESSION_KEY_PLAN 21from authentik.lib.generators import generate_id 22from authentik.lib.utils.time import timedelta_from_string 23from authentik.root.middleware import ClientIPMiddleware 24from authentik.stages.user_login.middleware import ( 25 SESSION_KEY_BINDING_NET, 26 BoundSessionMiddleware, 27 SessionBindingBroken, 28 logout_extra, 29) 30from authentik.stages.user_login.models import GeoIPBinding, NetworkBinding, UserLoginStage 31 32 33class TestUserLoginStage(FlowTestCase): 34 """Login tests""" 35 36 def setUp(self): 37 super().setUp() 38 self.user = create_test_user() 39 40 self.flow = create_test_flow(FlowDesignation.AUTHENTICATION) 41 self.stage = UserLoginStage.objects.create(name="login") 42 self.binding = FlowStageBinding.objects.create(target=self.flow, stage=self.stage, order=2) 43 44 def test_valid_get(self): 45 """Test with a valid pending user and backend""" 46 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 47 plan.context[PLAN_CONTEXT_PENDING_USER] = self.user 48 session = self.client.session 49 session[SESSION_KEY_PLAN] = plan 50 session.save() 51 52 response = self.client.get( 53 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 54 ) 55 56 self.assertEqual(response.status_code, 200) 57 self.assertStageRedirects(response, reverse("authentik_core:root-redirect")) 58 59 def test_valid_post(self): 60 """Test with a valid pending user and backend""" 61 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 62 plan.context[PLAN_CONTEXT_PENDING_USER] = self.user 63 session = self.client.session 64 session[SESSION_KEY_PLAN] = plan 65 session.save() 66 67 response = self.client.post( 68 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 69 ) 70 71 self.assertEqual(response.status_code, 200) 72 self.assertStageRedirects(response, reverse("authentik_core:root-redirect")) 73 74 def test_terminate_other_sessions(self): 75 """Test terminate_other_sessions""" 76 self.stage.terminate_other_sessions = True 77 self.stage.save() 78 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 79 plan.context[PLAN_CONTEXT_PENDING_USER] = self.user 80 session = self.client.session 81 session[SESSION_KEY_PLAN] = plan 82 session.save() 83 84 key = generate_id() 85 AuthenticatedSession.objects.create( 86 session=Session.objects.create( 87 session_key=key, 88 last_ip=ClientIPMiddleware.default_ip, 89 ), 90 user=self.user, 91 ) 92 93 response = self.client.post( 94 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 95 ) 96 97 self.assertEqual(response.status_code, 200) 98 self.assertStageRedirects(response, reverse("authentik_core:root-redirect")) 99 self.assertFalse(AuthenticatedSession.objects.filter(session__session_key=key)) 100 self.assertFalse(Session.objects.filter(session_key=key).exists()) 101 102 def test_expiry(self): 103 """Test with expiry""" 104 self.stage.session_duration = "seconds=2" 105 self.stage.save() 106 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 107 plan.context[PLAN_CONTEXT_PENDING_USER] = self.user 108 session = self.client.session 109 session[SESSION_KEY_PLAN] = plan 110 session.save() 111 112 before_request = now() 113 response = self.client.get( 114 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 115 ) 116 self.assertEqual(response.status_code, 200) 117 self.assertStageRedirects(response, reverse("authentik_core:root-redirect")) 118 self.assertNotEqual(list(self.client.session.keys()), []) 119 session_key = self.client.session.session_key 120 session = Session.objects.filter(session_key=session_key).first() 121 self.assertAlmostEqual( 122 session.expires.timestamp() - before_request.timestamp(), 123 timedelta_from_string(self.stage.session_duration).total_seconds(), 124 delta=1, 125 ) 126 sleep(3) 127 self.client.session.clear_expired() 128 self.assertEqual(list(self.client.session.keys()), []) 129 130 def test_expiry_remember(self): 131 """Test with expiry""" 132 self.stage.session_duration = "seconds=2" 133 self.stage.remember_me_offset = "seconds=2" 134 self.stage.save() 135 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 136 plan.context[PLAN_CONTEXT_PENDING_USER] = self.user 137 session = self.client.session 138 session[SESSION_KEY_PLAN] = plan 139 session.save() 140 141 response = self.client.get( 142 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 143 ) 144 self.assertStageResponse(response, component="ak-stage-user-login") 145 146 response = self.client.post( 147 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 148 data={"remember_me": True}, 149 ) 150 _now = now().timestamp() 151 self.assertEqual(response.status_code, 200) 152 self.assertStageRedirects(response, reverse("authentik_core:root-redirect")) 153 self.assertNotEqual(list(self.client.session.keys()), []) 154 session_key = self.client.session.session_key 155 session = Session.objects.filter(session_key=session_key).first() 156 self.assertAlmostEqual( 157 session.expires.timestamp() - _now, 158 timedelta_from_string(self.stage.session_duration).total_seconds() 159 + timedelta_from_string(self.stage.remember_me_offset).total_seconds(), 160 delta=1, 161 ) 162 sleep(5) 163 self.client.session.clear_expired() 164 self.assertEqual(list(self.client.session.keys()), []) 165 166 @patch( 167 "authentik.flows.views.executor.to_stage_response", 168 TO_STAGE_RESPONSE_MOCK, 169 ) 170 def test_without_user(self): 171 """Test a plan without any pending user, resulting in a denied""" 172 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 173 session = self.client.session 174 session[SESSION_KEY_PLAN] = plan 175 session.save() 176 177 response = self.client.get( 178 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 179 ) 180 181 self.assertStageResponse( 182 response, 183 self.flow, 184 component="ak-stage-access-denied", 185 ) 186 187 @apply_blueprint("default/flow-default-user-settings-flow.yaml") 188 def test_inactive_account(self): 189 """Test with a valid pending user and backend""" 190 self.user.is_active = False 191 self.user.save() 192 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 193 plan.context[PLAN_CONTEXT_PENDING_USER] = self.user 194 session = self.client.session 195 session[SESSION_KEY_PLAN] = plan 196 session.save() 197 198 response = self.client.get( 199 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 200 ) 201 self.assertEqual(response.status_code, 200) 202 self.assertStageResponse( 203 response, self.flow, component="ak-stage-access-denied", error_message="Unknown error" 204 ) 205 206 # Check that API requests get rejected 207 response = self.client.get(reverse("authentik_api:application-list")) 208 self.assertEqual(response.status_code, 403) 209 210 # Check that flow requests requiring a user also get rejected 211 response = self.client.get( 212 reverse( 213 "authentik_api:flow-executor", 214 kwargs={"flow_slug": "default-user-settings-flow"}, 215 ) 216 ) 217 self.assertStageResponse( 218 response, 219 self.flow, 220 component="ak-stage-access-denied", 221 error_message="Flow does not apply to current user.", 222 ) 223 224 def test_binding_net_break_log(self): 225 """Test logout_extra with exception""" 226 # IPs from https://github.com/maxmind/MaxMind-DB/blob/main/source-data/GeoLite2-ASN-Test.json 227 for args, expect in [ 228 [[NetworkBinding.BIND_ASN, "8.8.8.8", "8.8.8.8"], ["network.missing"]], 229 [[NetworkBinding.BIND_ASN, "1.0.0.1", "1.128.0.1"], ["network.asn"]], 230 [ 231 [NetworkBinding.BIND_ASN_NETWORK, "12.81.96.1", "12.81.128.1"], 232 ["network.asn_network"], 233 ], 234 [[NetworkBinding.BIND_ASN_NETWORK_IP, "1.0.0.1", "1.0.0.2"], ["network.ip"]], 235 ]: 236 with self.subTest(args[0]): 237 with self.assertRaises(SessionBindingBroken) as cm: 238 BoundSessionMiddleware.recheck_session_net(*args) 239 self.assertEqual(cm.exception.reason, expect[0]) 240 # Ensure the request can be logged without throwing errors 241 self.client.force_login(self.user) 242 request = HttpRequest() 243 request.session = self.client.session 244 request.user = self.user 245 logout_extra(request, cm.exception) 246 247 def test_binding_geo_break_log(self): 248 """Test logout_extra with exception""" 249 # IPs from https://github.com/maxmind/MaxMind-DB/blob/main/source-data/GeoLite2-City-Test.json 250 for args, expect in [ 251 [[GeoIPBinding.BIND_CONTINENT, "8.8.8.8", "8.8.8.8"], ["geoip.missing"]], 252 [[GeoIPBinding.BIND_CONTINENT, "2.125.160.216", "67.43.156.1"], ["geoip.continent"]], 253 [ 254 [GeoIPBinding.BIND_CONTINENT_COUNTRY, "81.2.69.142", "89.160.20.112"], 255 ["geoip.country"], 256 ], 257 [ 258 [GeoIPBinding.BIND_CONTINENT_COUNTRY_CITY, "2.125.160.216", "81.2.69.142"], 259 ["geoip.city"], 260 ], 261 ]: 262 with self.subTest(args[0]): 263 with self.assertRaises(SessionBindingBroken) as cm: 264 BoundSessionMiddleware.recheck_session_geo(*args) 265 self.assertEqual(cm.exception.reason, expect[0]) 266 # Ensure the request can be logged without throwing errors 267 self.client.force_login(self.user) 268 request = HttpRequest() 269 request.session = self.client.session 270 request.user = self.user 271 logout_extra(request, cm.exception) 272 273 def test_session_binding_broken(self): 274 """Test session binding""" 275 Event.objects.all().delete() 276 self.client.force_login(self.user) 277 session = self.client.session 278 session[Session.Keys.LAST_IP] = "192.0.2.1" 279 session[SESSION_KEY_BINDING_NET] = NetworkBinding.BIND_ASN_NETWORK_IP 280 session.save() 281 282 res = self.client.get(reverse("authentik_api:user-me")) 283 self.assertEqual(res.status_code, 302) 284 self.assertEqual( 285 res.url, 286 reverse( 287 "authentik_flows:default-authentication", 288 ) 289 + f"?{NEXT_ARG_NAME}={reverse("authentik_api:user-me")}", 290 ) 291 event = Event.objects.filter(action=EventAction.LOGOUT).first() 292 self.assertEqual(event.user, get_user(self.user))
34class TestUserLoginStage(FlowTestCase): 35 """Login tests""" 36 37 def setUp(self): 38 super().setUp() 39 self.user = create_test_user() 40 41 self.flow = create_test_flow(FlowDesignation.AUTHENTICATION) 42 self.stage = UserLoginStage.objects.create(name="login") 43 self.binding = FlowStageBinding.objects.create(target=self.flow, stage=self.stage, order=2) 44 45 def test_valid_get(self): 46 """Test with a valid pending user and backend""" 47 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 48 plan.context[PLAN_CONTEXT_PENDING_USER] = self.user 49 session = self.client.session 50 session[SESSION_KEY_PLAN] = plan 51 session.save() 52 53 response = self.client.get( 54 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 55 ) 56 57 self.assertEqual(response.status_code, 200) 58 self.assertStageRedirects(response, reverse("authentik_core:root-redirect")) 59 60 def test_valid_post(self): 61 """Test with a valid pending user and backend""" 62 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 63 plan.context[PLAN_CONTEXT_PENDING_USER] = self.user 64 session = self.client.session 65 session[SESSION_KEY_PLAN] = plan 66 session.save() 67 68 response = self.client.post( 69 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 70 ) 71 72 self.assertEqual(response.status_code, 200) 73 self.assertStageRedirects(response, reverse("authentik_core:root-redirect")) 74 75 def test_terminate_other_sessions(self): 76 """Test terminate_other_sessions""" 77 self.stage.terminate_other_sessions = True 78 self.stage.save() 79 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 80 plan.context[PLAN_CONTEXT_PENDING_USER] = self.user 81 session = self.client.session 82 session[SESSION_KEY_PLAN] = plan 83 session.save() 84 85 key = generate_id() 86 AuthenticatedSession.objects.create( 87 session=Session.objects.create( 88 session_key=key, 89 last_ip=ClientIPMiddleware.default_ip, 90 ), 91 user=self.user, 92 ) 93 94 response = self.client.post( 95 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 96 ) 97 98 self.assertEqual(response.status_code, 200) 99 self.assertStageRedirects(response, reverse("authentik_core:root-redirect")) 100 self.assertFalse(AuthenticatedSession.objects.filter(session__session_key=key)) 101 self.assertFalse(Session.objects.filter(session_key=key).exists()) 102 103 def test_expiry(self): 104 """Test with expiry""" 105 self.stage.session_duration = "seconds=2" 106 self.stage.save() 107 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 108 plan.context[PLAN_CONTEXT_PENDING_USER] = self.user 109 session = self.client.session 110 session[SESSION_KEY_PLAN] = plan 111 session.save() 112 113 before_request = now() 114 response = self.client.get( 115 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 116 ) 117 self.assertEqual(response.status_code, 200) 118 self.assertStageRedirects(response, reverse("authentik_core:root-redirect")) 119 self.assertNotEqual(list(self.client.session.keys()), []) 120 session_key = self.client.session.session_key 121 session = Session.objects.filter(session_key=session_key).first() 122 self.assertAlmostEqual( 123 session.expires.timestamp() - before_request.timestamp(), 124 timedelta_from_string(self.stage.session_duration).total_seconds(), 125 delta=1, 126 ) 127 sleep(3) 128 self.client.session.clear_expired() 129 self.assertEqual(list(self.client.session.keys()), []) 130 131 def test_expiry_remember(self): 132 """Test with expiry""" 133 self.stage.session_duration = "seconds=2" 134 self.stage.remember_me_offset = "seconds=2" 135 self.stage.save() 136 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 137 plan.context[PLAN_CONTEXT_PENDING_USER] = self.user 138 session = self.client.session 139 session[SESSION_KEY_PLAN] = plan 140 session.save() 141 142 response = self.client.get( 143 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 144 ) 145 self.assertStageResponse(response, component="ak-stage-user-login") 146 147 response = self.client.post( 148 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 149 data={"remember_me": True}, 150 ) 151 _now = now().timestamp() 152 self.assertEqual(response.status_code, 200) 153 self.assertStageRedirects(response, reverse("authentik_core:root-redirect")) 154 self.assertNotEqual(list(self.client.session.keys()), []) 155 session_key = self.client.session.session_key 156 session = Session.objects.filter(session_key=session_key).first() 157 self.assertAlmostEqual( 158 session.expires.timestamp() - _now, 159 timedelta_from_string(self.stage.session_duration).total_seconds() 160 + timedelta_from_string(self.stage.remember_me_offset).total_seconds(), 161 delta=1, 162 ) 163 sleep(5) 164 self.client.session.clear_expired() 165 self.assertEqual(list(self.client.session.keys()), []) 166 167 @patch( 168 "authentik.flows.views.executor.to_stage_response", 169 TO_STAGE_RESPONSE_MOCK, 170 ) 171 def test_without_user(self): 172 """Test a plan without any pending user, resulting in a denied""" 173 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 174 session = self.client.session 175 session[SESSION_KEY_PLAN] = plan 176 session.save() 177 178 response = self.client.get( 179 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 180 ) 181 182 self.assertStageResponse( 183 response, 184 self.flow, 185 component="ak-stage-access-denied", 186 ) 187 188 @apply_blueprint("default/flow-default-user-settings-flow.yaml") 189 def test_inactive_account(self): 190 """Test with a valid pending user and backend""" 191 self.user.is_active = False 192 self.user.save() 193 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 194 plan.context[PLAN_CONTEXT_PENDING_USER] = self.user 195 session = self.client.session 196 session[SESSION_KEY_PLAN] = plan 197 session.save() 198 199 response = self.client.get( 200 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 201 ) 202 self.assertEqual(response.status_code, 200) 203 self.assertStageResponse( 204 response, self.flow, component="ak-stage-access-denied", error_message="Unknown error" 205 ) 206 207 # Check that API requests get rejected 208 response = self.client.get(reverse("authentik_api:application-list")) 209 self.assertEqual(response.status_code, 403) 210 211 # Check that flow requests requiring a user also get rejected 212 response = self.client.get( 213 reverse( 214 "authentik_api:flow-executor", 215 kwargs={"flow_slug": "default-user-settings-flow"}, 216 ) 217 ) 218 self.assertStageResponse( 219 response, 220 self.flow, 221 component="ak-stage-access-denied", 222 error_message="Flow does not apply to current user.", 223 ) 224 225 def test_binding_net_break_log(self): 226 """Test logout_extra with exception""" 227 # IPs from https://github.com/maxmind/MaxMind-DB/blob/main/source-data/GeoLite2-ASN-Test.json 228 for args, expect in [ 229 [[NetworkBinding.BIND_ASN, "8.8.8.8", "8.8.8.8"], ["network.missing"]], 230 [[NetworkBinding.BIND_ASN, "1.0.0.1", "1.128.0.1"], ["network.asn"]], 231 [ 232 [NetworkBinding.BIND_ASN_NETWORK, "12.81.96.1", "12.81.128.1"], 233 ["network.asn_network"], 234 ], 235 [[NetworkBinding.BIND_ASN_NETWORK_IP, "1.0.0.1", "1.0.0.2"], ["network.ip"]], 236 ]: 237 with self.subTest(args[0]): 238 with self.assertRaises(SessionBindingBroken) as cm: 239 BoundSessionMiddleware.recheck_session_net(*args) 240 self.assertEqual(cm.exception.reason, expect[0]) 241 # Ensure the request can be logged without throwing errors 242 self.client.force_login(self.user) 243 request = HttpRequest() 244 request.session = self.client.session 245 request.user = self.user 246 logout_extra(request, cm.exception) 247 248 def test_binding_geo_break_log(self): 249 """Test logout_extra with exception""" 250 # IPs from https://github.com/maxmind/MaxMind-DB/blob/main/source-data/GeoLite2-City-Test.json 251 for args, expect in [ 252 [[GeoIPBinding.BIND_CONTINENT, "8.8.8.8", "8.8.8.8"], ["geoip.missing"]], 253 [[GeoIPBinding.BIND_CONTINENT, "2.125.160.216", "67.43.156.1"], ["geoip.continent"]], 254 [ 255 [GeoIPBinding.BIND_CONTINENT_COUNTRY, "81.2.69.142", "89.160.20.112"], 256 ["geoip.country"], 257 ], 258 [ 259 [GeoIPBinding.BIND_CONTINENT_COUNTRY_CITY, "2.125.160.216", "81.2.69.142"], 260 ["geoip.city"], 261 ], 262 ]: 263 with self.subTest(args[0]): 264 with self.assertRaises(SessionBindingBroken) as cm: 265 BoundSessionMiddleware.recheck_session_geo(*args) 266 self.assertEqual(cm.exception.reason, expect[0]) 267 # Ensure the request can be logged without throwing errors 268 self.client.force_login(self.user) 269 request = HttpRequest() 270 request.session = self.client.session 271 request.user = self.user 272 logout_extra(request, cm.exception) 273 274 def test_session_binding_broken(self): 275 """Test session binding""" 276 Event.objects.all().delete() 277 self.client.force_login(self.user) 278 session = self.client.session 279 session[Session.Keys.LAST_IP] = "192.0.2.1" 280 session[SESSION_KEY_BINDING_NET] = NetworkBinding.BIND_ASN_NETWORK_IP 281 session.save() 282 283 res = self.client.get(reverse("authentik_api:user-me")) 284 self.assertEqual(res.status_code, 302) 285 self.assertEqual( 286 res.url, 287 reverse( 288 "authentik_flows:default-authentication", 289 ) 290 + f"?{NEXT_ARG_NAME}={reverse("authentik_api:user-me")}", 291 ) 292 event = Event.objects.filter(action=EventAction.LOGOUT).first() 293 self.assertEqual(event.user, get_user(self.user))
Login tests
def
setUp(self):
37 def setUp(self): 38 super().setUp() 39 self.user = create_test_user() 40 41 self.flow = create_test_flow(FlowDesignation.AUTHENTICATION) 42 self.stage = UserLoginStage.objects.create(name="login") 43 self.binding = FlowStageBinding.objects.create(target=self.flow, stage=self.stage, order=2)
Hook method for setting up the test fixture before exercising it.
def
test_valid_get(self):
45 def test_valid_get(self): 46 """Test with a valid pending user and backend""" 47 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 48 plan.context[PLAN_CONTEXT_PENDING_USER] = self.user 49 session = self.client.session 50 session[SESSION_KEY_PLAN] = plan 51 session.save() 52 53 response = self.client.get( 54 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 55 ) 56 57 self.assertEqual(response.status_code, 200) 58 self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
Test with a valid pending user and backend
def
test_valid_post(self):
60 def test_valid_post(self): 61 """Test with a valid pending user and backend""" 62 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 63 plan.context[PLAN_CONTEXT_PENDING_USER] = self.user 64 session = self.client.session 65 session[SESSION_KEY_PLAN] = plan 66 session.save() 67 68 response = self.client.post( 69 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 70 ) 71 72 self.assertEqual(response.status_code, 200) 73 self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
Test with a valid pending user and backend
def
test_terminate_other_sessions(self):
75 def test_terminate_other_sessions(self): 76 """Test terminate_other_sessions""" 77 self.stage.terminate_other_sessions = True 78 self.stage.save() 79 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 80 plan.context[PLAN_CONTEXT_PENDING_USER] = self.user 81 session = self.client.session 82 session[SESSION_KEY_PLAN] = plan 83 session.save() 84 85 key = generate_id() 86 AuthenticatedSession.objects.create( 87 session=Session.objects.create( 88 session_key=key, 89 last_ip=ClientIPMiddleware.default_ip, 90 ), 91 user=self.user, 92 ) 93 94 response = self.client.post( 95 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 96 ) 97 98 self.assertEqual(response.status_code, 200) 99 self.assertStageRedirects(response, reverse("authentik_core:root-redirect")) 100 self.assertFalse(AuthenticatedSession.objects.filter(session__session_key=key)) 101 self.assertFalse(Session.objects.filter(session_key=key).exists())
Test terminate_other_sessions
def
test_expiry(self):
103 def test_expiry(self): 104 """Test with expiry""" 105 self.stage.session_duration = "seconds=2" 106 self.stage.save() 107 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 108 plan.context[PLAN_CONTEXT_PENDING_USER] = self.user 109 session = self.client.session 110 session[SESSION_KEY_PLAN] = plan 111 session.save() 112 113 before_request = now() 114 response = self.client.get( 115 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 116 ) 117 self.assertEqual(response.status_code, 200) 118 self.assertStageRedirects(response, reverse("authentik_core:root-redirect")) 119 self.assertNotEqual(list(self.client.session.keys()), []) 120 session_key = self.client.session.session_key 121 session = Session.objects.filter(session_key=session_key).first() 122 self.assertAlmostEqual( 123 session.expires.timestamp() - before_request.timestamp(), 124 timedelta_from_string(self.stage.session_duration).total_seconds(), 125 delta=1, 126 ) 127 sleep(3) 128 self.client.session.clear_expired() 129 self.assertEqual(list(self.client.session.keys()), [])
Test with expiry
def
test_expiry_remember(self):
131 def test_expiry_remember(self): 132 """Test with expiry""" 133 self.stage.session_duration = "seconds=2" 134 self.stage.remember_me_offset = "seconds=2" 135 self.stage.save() 136 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 137 plan.context[PLAN_CONTEXT_PENDING_USER] = self.user 138 session = self.client.session 139 session[SESSION_KEY_PLAN] = plan 140 session.save() 141 142 response = self.client.get( 143 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 144 ) 145 self.assertStageResponse(response, component="ak-stage-user-login") 146 147 response = self.client.post( 148 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}), 149 data={"remember_me": True}, 150 ) 151 _now = now().timestamp() 152 self.assertEqual(response.status_code, 200) 153 self.assertStageRedirects(response, reverse("authentik_core:root-redirect")) 154 self.assertNotEqual(list(self.client.session.keys()), []) 155 session_key = self.client.session.session_key 156 session = Session.objects.filter(session_key=session_key).first() 157 self.assertAlmostEqual( 158 session.expires.timestamp() - _now, 159 timedelta_from_string(self.stage.session_duration).total_seconds() 160 + timedelta_from_string(self.stage.remember_me_offset).total_seconds(), 161 delta=1, 162 ) 163 sleep(5) 164 self.client.session.clear_expired() 165 self.assertEqual(list(self.client.session.keys()), [])
Test with expiry
@patch('authentik.flows.views.executor.to_stage_response', TO_STAGE_RESPONSE_MOCK)
def
test_without_user(self):
167 @patch( 168 "authentik.flows.views.executor.to_stage_response", 169 TO_STAGE_RESPONSE_MOCK, 170 ) 171 def test_without_user(self): 172 """Test a plan without any pending user, resulting in a denied""" 173 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 174 session = self.client.session 175 session[SESSION_KEY_PLAN] = plan 176 session.save() 177 178 response = self.client.get( 179 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 180 ) 181 182 self.assertStageResponse( 183 response, 184 self.flow, 185 component="ak-stage-access-denied", 186 )
Test a plan without any pending user, resulting in a denied
@apply_blueprint('default/flow-default-user-settings-flow.yaml')
def
test_inactive_account(self):
188 @apply_blueprint("default/flow-default-user-settings-flow.yaml") 189 def test_inactive_account(self): 190 """Test with a valid pending user and backend""" 191 self.user.is_active = False 192 self.user.save() 193 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 194 plan.context[PLAN_CONTEXT_PENDING_USER] = self.user 195 session = self.client.session 196 session[SESSION_KEY_PLAN] = plan 197 session.save() 198 199 response = self.client.get( 200 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 201 ) 202 self.assertEqual(response.status_code, 200) 203 self.assertStageResponse( 204 response, self.flow, component="ak-stage-access-denied", error_message="Unknown error" 205 ) 206 207 # Check that API requests get rejected 208 response = self.client.get(reverse("authentik_api:application-list")) 209 self.assertEqual(response.status_code, 403) 210 211 # Check that flow requests requiring a user also get rejected 212 response = self.client.get( 213 reverse( 214 "authentik_api:flow-executor", 215 kwargs={"flow_slug": "default-user-settings-flow"}, 216 ) 217 ) 218 self.assertStageResponse( 219 response, 220 self.flow, 221 component="ak-stage-access-denied", 222 error_message="Flow does not apply to current user.", 223 )
Test with a valid pending user and backend
def
test_binding_net_break_log(self):
225 def test_binding_net_break_log(self): 226 """Test logout_extra with exception""" 227 # IPs from https://github.com/maxmind/MaxMind-DB/blob/main/source-data/GeoLite2-ASN-Test.json 228 for args, expect in [ 229 [[NetworkBinding.BIND_ASN, "8.8.8.8", "8.8.8.8"], ["network.missing"]], 230 [[NetworkBinding.BIND_ASN, "1.0.0.1", "1.128.0.1"], ["network.asn"]], 231 [ 232 [NetworkBinding.BIND_ASN_NETWORK, "12.81.96.1", "12.81.128.1"], 233 ["network.asn_network"], 234 ], 235 [[NetworkBinding.BIND_ASN_NETWORK_IP, "1.0.0.1", "1.0.0.2"], ["network.ip"]], 236 ]: 237 with self.subTest(args[0]): 238 with self.assertRaises(SessionBindingBroken) as cm: 239 BoundSessionMiddleware.recheck_session_net(*args) 240 self.assertEqual(cm.exception.reason, expect[0]) 241 # Ensure the request can be logged without throwing errors 242 self.client.force_login(self.user) 243 request = HttpRequest() 244 request.session = self.client.session 245 request.user = self.user 246 logout_extra(request, cm.exception)
Test logout_extra with exception
def
test_binding_geo_break_log(self):
248 def test_binding_geo_break_log(self): 249 """Test logout_extra with exception""" 250 # IPs from https://github.com/maxmind/MaxMind-DB/blob/main/source-data/GeoLite2-City-Test.json 251 for args, expect in [ 252 [[GeoIPBinding.BIND_CONTINENT, "8.8.8.8", "8.8.8.8"], ["geoip.missing"]], 253 [[GeoIPBinding.BIND_CONTINENT, "2.125.160.216", "67.43.156.1"], ["geoip.continent"]], 254 [ 255 [GeoIPBinding.BIND_CONTINENT_COUNTRY, "81.2.69.142", "89.160.20.112"], 256 ["geoip.country"], 257 ], 258 [ 259 [GeoIPBinding.BIND_CONTINENT_COUNTRY_CITY, "2.125.160.216", "81.2.69.142"], 260 ["geoip.city"], 261 ], 262 ]: 263 with self.subTest(args[0]): 264 with self.assertRaises(SessionBindingBroken) as cm: 265 BoundSessionMiddleware.recheck_session_geo(*args) 266 self.assertEqual(cm.exception.reason, expect[0]) 267 # Ensure the request can be logged without throwing errors 268 self.client.force_login(self.user) 269 request = HttpRequest() 270 request.session = self.client.session 271 request.user = self.user 272 logout_extra(request, cm.exception)
Test logout_extra with exception
def
test_session_binding_broken(self):
274 def test_session_binding_broken(self): 275 """Test session binding""" 276 Event.objects.all().delete() 277 self.client.force_login(self.user) 278 session = self.client.session 279 session[Session.Keys.LAST_IP] = "192.0.2.1" 280 session[SESSION_KEY_BINDING_NET] = NetworkBinding.BIND_ASN_NETWORK_IP 281 session.save() 282 283 res = self.client.get(reverse("authentik_api:user-me")) 284 self.assertEqual(res.status_code, 302) 285 self.assertEqual( 286 res.url, 287 reverse( 288 "authentik_flows:default-authentication", 289 ) 290 + f"?{NEXT_ARG_NAME}={reverse("authentik_api:user-me")}", 291 ) 292 event = Event.objects.filter(action=EventAction.LOGOUT).first() 293 self.assertEqual(event.user, get_user(self.user))
Test session binding