authentik.stages.user_write.tests
write tests
1"""write tests""" 2 3from unittest.mock import patch 4 5from django.urls import reverse 6from django.utils.timezone import now 7 8from authentik.core.models import ( 9 USER_ATTRIBUTE_SOURCES, 10 Group, 11 Source, 12 User, 13 UserSourceConnection, 14) 15from authentik.core.sources.stage import PLAN_CONTEXT_SOURCES_CONNECTION 16from authentik.core.tests.utils import create_test_admin_user, create_test_flow 17from authentik.events.models import Event, EventAction 18from authentik.flows.markers import StageMarker 19from authentik.flows.models import FlowStageBinding 20from authentik.flows.planner import PLAN_CONTEXT_PENDING_USER, FlowPlan 21from authentik.flows.tests import FlowTestCase 22from authentik.flows.tests.test_executor import TO_STAGE_RESPONSE_MOCK 23from authentik.flows.views.executor import SESSION_KEY_PLAN 24from authentik.lib.generators import generate_key 25from authentik.stages.prompt.stage import PLAN_CONTEXT_PROMPT 26from authentik.stages.user_write.models import UserCreationMode, UserWriteStage 27from authentik.stages.user_write.stage import PLAN_CONTEXT_GROUPS, UserWriteStageView 28 29 30class TestUserWriteStage(FlowTestCase): 31 """Write tests""" 32 33 def setUp(self): 34 super().setUp() 35 self.flow = create_test_flow() 36 self.group = Group.objects.create(name="test-group") 37 self.other_group = Group.objects.create(name="other-group") 38 self.stage: UserWriteStage = UserWriteStage.objects.create( 39 name="write", create_users_as_inactive=True, create_users_group=self.group 40 ) 41 self.binding = FlowStageBinding.objects.create(target=self.flow, stage=self.stage, order=2) 42 self.source = Source.objects.create(name="fake_source") 43 self.user = create_test_admin_user() 44 45 def test_user_create(self): 46 """Test creation of user""" 47 password = generate_key() 48 49 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 50 plan.context[PLAN_CONTEXT_PROMPT] = { 51 "username": "test-user", 52 "name": "name", 53 "email": "test@goauthentik.io", 54 "password": password, 55 } 56 plan.context[PLAN_CONTEXT_GROUPS] = [self.other_group] 57 plan.context[PLAN_CONTEXT_SOURCES_CONNECTION] = UserSourceConnection(source=self.source) 58 session = self.client.session 59 session[SESSION_KEY_PLAN] = plan 60 session.save() 61 62 response = self.client.get( 63 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 64 ) 65 66 self.assertEqual(response.status_code, 200) 67 self.assertStageRedirects(response, reverse("authentik_core:root-redirect")) 68 user_qs = User.objects.filter(username=plan.context[PLAN_CONTEXT_PROMPT]["username"]) 69 self.assertTrue(user_qs.exists()) 70 user = user_qs.first() 71 self.assertTrue(user.check_password(password)) 72 self.assertEqual(list(user.groups.order_by("name")), [self.other_group, self.group]) 73 self.assertEqual(user.attributes, {USER_ATTRIBUTE_SOURCES: [self.source.name]}) 74 75 self.assertTrue( 76 Event.objects.filter( 77 action=EventAction.MODEL_CREATED, 78 context__model={ 79 "app": "authentik_core", 80 "model_name": "user", 81 "pk": user.pk, 82 "name": "name", 83 }, 84 ) 85 ) 86 self.assertTrue( 87 Event.objects.filter( 88 action=EventAction.MODEL_UPDATED, 89 context__model={ 90 "app": "authentik_core", 91 "model_name": "user", 92 "pk": user.pk, 93 "name": "name", 94 }, 95 ) 96 ) 97 98 def test_user_update(self): 99 """Test update of existing user""" 100 new_password = generate_key() 101 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 102 plan.context[PLAN_CONTEXT_PENDING_USER] = User.objects.create( 103 username="unittest", email="test@goauthentik.io" 104 ) 105 plan.context[PLAN_CONTEXT_PROMPT] = { 106 "username": "test-user-new", 107 "password": new_password, 108 "attributes.some.custom-attribute": "test", 109 "attributes.some_custom_attribute": "test", 110 "attributes": { 111 "foo": "bar", 112 }, 113 "some_ignored_attribute": "bar", 114 } 115 session = self.client.session 116 session[SESSION_KEY_PLAN] = plan 117 session.save() 118 119 response = self.client.post( 120 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 121 ) 122 123 self.assertEqual(response.status_code, 200) 124 self.assertStageRedirects(response, reverse("authentik_core:root-redirect")) 125 user_qs = User.objects.filter(username=plan.context[PLAN_CONTEXT_PROMPT]["username"]) 126 self.assertTrue(user_qs.exists()) 127 self.assertTrue(user_qs.first().check_password(new_password)) 128 self.assertEqual(user_qs.first().attributes["some"]["custom-attribute"], "test") 129 self.assertEqual(user_qs.first().attributes["foo"], "bar") 130 self.assertEqual(user_qs.first().attributes["some_custom_attribute"], "test") 131 132 def test_user_update_complex(self): 133 """Test update of existing user""" 134 new_password = generate_key() 135 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 136 plan.context[PLAN_CONTEXT_PENDING_USER] = User.objects.create( 137 username="unittest", email="test@goauthentik.io" 138 ) 139 time = now() 140 plan.context[PLAN_CONTEXT_PROMPT] = { 141 "username": "test-user-new", 142 "password": new_password, 143 "attributes.foo": time, 144 } 145 session = self.client.session 146 session[SESSION_KEY_PLAN] = plan 147 session.save() 148 149 response = self.client.post( 150 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 151 ) 152 153 self.assertEqual(response.status_code, 200) 154 self.assertStageRedirects(response, reverse("authentik_core:root-redirect")) 155 user_qs = User.objects.filter(username=plan.context[PLAN_CONTEXT_PROMPT]["username"]) 156 self.assertTrue(user_qs.exists()) 157 self.assertTrue(user_qs.first().check_password(new_password)) 158 self.assertEqual(user_qs.first().attributes["foo"], time.isoformat()[:-6] + "Z") 159 160 def test_user_update_source(self): 161 """Test update of existing user with a source""" 162 new_password = generate_key() 163 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 164 plan.context[PLAN_CONTEXT_PENDING_USER] = User.objects.create( 165 username="unittest", 166 email="test@goauthentik.io", 167 attributes={ 168 USER_ATTRIBUTE_SOURCES: [ 169 self.source.name, 170 ] 171 }, 172 ) 173 plan.context[PLAN_CONTEXT_SOURCES_CONNECTION] = UserSourceConnection(source=self.source) 174 plan.context[PLAN_CONTEXT_PROMPT] = { 175 "username": "test-user-new", 176 "password": new_password, 177 "attributes.some.custom-attribute": "test", 178 "attributes": { 179 "foo": "bar", 180 }, 181 "some_ignored_attribute": "bar", 182 } 183 session = self.client.session 184 session[SESSION_KEY_PLAN] = plan 185 session.save() 186 187 response = self.client.post( 188 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 189 ) 190 191 self.assertEqual(response.status_code, 200) 192 self.assertStageRedirects(response, reverse("authentik_core:root-redirect")) 193 user_qs = User.objects.filter(username=plan.context[PLAN_CONTEXT_PROMPT]["username"]) 194 self.assertTrue(user_qs.exists()) 195 self.assertTrue(user_qs.first().check_password(new_password)) 196 self.assertEqual(user_qs.first().attributes["some"]["custom-attribute"], "test") 197 self.assertEqual(user_qs.first().attributes["foo"], "bar") 198 self.assertEqual(user_qs.first().attributes[USER_ATTRIBUTE_SOURCES], [self.source.name]) 199 self.assertNotIn("some_ignored_attribute", user_qs.first().attributes) 200 201 @patch( 202 "authentik.flows.views.executor.to_stage_response", 203 TO_STAGE_RESPONSE_MOCK, 204 ) 205 def test_without_data(self): 206 """Test without data results in error""" 207 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 208 session = self.client.session 209 session[SESSION_KEY_PLAN] = plan 210 session.save() 211 212 response = self.client.get( 213 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 214 ) 215 216 self.assertStageResponse( 217 response, 218 self.flow, 219 component="ak-stage-access-denied", 220 ) 221 222 @patch( 223 "authentik.flows.views.executor.to_stage_response", 224 TO_STAGE_RESPONSE_MOCK, 225 ) 226 def test_blank_username(self): 227 """Test with blank username results in error""" 228 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 229 session = self.client.session 230 plan.context[PLAN_CONTEXT_PROMPT] = { 231 "username": "", 232 "attribute_some-custom-attribute": "test", 233 "some_ignored_attribute": "bar", 234 } 235 session[SESSION_KEY_PLAN] = plan 236 session.save() 237 238 response = self.client.get( 239 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 240 ) 241 242 self.assertStageResponse( 243 response, 244 self.flow, 245 component="ak-stage-access-denied", 246 ) 247 248 def test_authenticated_no_user(self): 249 """Test user in session and none in plan""" 250 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 251 self.client.force_login(self.user) 252 session = self.client.session 253 plan.context[PLAN_CONTEXT_PROMPT] = { 254 "username": "foo", 255 "attribute_some-custom-attribute": "test", 256 "some_ignored_attribute": "bar", 257 } 258 session[SESSION_KEY_PLAN] = plan 259 session.save() 260 261 response = self.client.get( 262 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 263 ) 264 self.assertStageRedirects(response, reverse("authentik_core:root-redirect")) 265 self.user.refresh_from_db() 266 self.assertEqual(self.user.username, "foo") 267 268 def test_no_create(self): 269 """Test can_create_users set to false""" 270 self.stage.user_creation_mode = UserCreationMode.NEVER_CREATE 271 self.stage.save() 272 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 273 session = self.client.session 274 plan.context[PLAN_CONTEXT_PROMPT] = { 275 "username": "foo", 276 "attribute_some-custom-attribute": "test", 277 "some_ignored_attribute": "bar", 278 } 279 session[SESSION_KEY_PLAN] = plan 280 session.save() 281 282 response = self.client.get( 283 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 284 ) 285 self.assertStageResponse( 286 response, 287 self.flow, 288 component="ak-stage-access-denied", 289 ) 290 291 @patch( 292 "authentik.flows.views.executor.to_stage_response", 293 TO_STAGE_RESPONSE_MOCK, 294 ) 295 def test_duplicate_data(self): 296 """Test with duplicate data, should trigger error""" 297 user = create_test_admin_user() 298 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 299 session = self.client.session 300 plan.context[PLAN_CONTEXT_PROMPT] = { 301 "username": user.username, 302 "attribute_some-custom-attribute": "test", 303 "some_ignored_attribute": "bar", 304 } 305 session[SESSION_KEY_PLAN] = plan 306 session.save() 307 308 response = self.client.get( 309 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 310 ) 311 312 self.assertStageResponse( 313 response, 314 self.flow, 315 component="ak-stage-access-denied", 316 ) 317 318 def test_write_attribute(self): 319 """Test write_attribute""" 320 user = create_test_admin_user() 321 user.attributes = { 322 "foo": "bar", 323 "baz": { 324 "qwer": [ 325 "quox", 326 ] 327 }, 328 } 329 user.save() 330 UserWriteStageView.write_attribute(user, "attributes.foo", "baz") 331 self.assertEqual( 332 user.attributes, 333 { 334 "foo": "baz", 335 "baz": { 336 "qwer": [ 337 "quox", 338 ] 339 }, 340 }, 341 ) 342 UserWriteStageView.write_attribute(user, "attributes.foob.bar", "baz") 343 self.assertEqual( 344 user.attributes, 345 { 346 "foo": "baz", 347 "foob": { 348 "bar": "baz", 349 }, 350 "baz": { 351 "qwer": [ 352 "quox", 353 ] 354 }, 355 }, 356 )
31class TestUserWriteStage(FlowTestCase): 32 """Write tests""" 33 34 def setUp(self): 35 super().setUp() 36 self.flow = create_test_flow() 37 self.group = Group.objects.create(name="test-group") 38 self.other_group = Group.objects.create(name="other-group") 39 self.stage: UserWriteStage = UserWriteStage.objects.create( 40 name="write", create_users_as_inactive=True, create_users_group=self.group 41 ) 42 self.binding = FlowStageBinding.objects.create(target=self.flow, stage=self.stage, order=2) 43 self.source = Source.objects.create(name="fake_source") 44 self.user = create_test_admin_user() 45 46 def test_user_create(self): 47 """Test creation of user""" 48 password = generate_key() 49 50 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 51 plan.context[PLAN_CONTEXT_PROMPT] = { 52 "username": "test-user", 53 "name": "name", 54 "email": "test@goauthentik.io", 55 "password": password, 56 } 57 plan.context[PLAN_CONTEXT_GROUPS] = [self.other_group] 58 plan.context[PLAN_CONTEXT_SOURCES_CONNECTION] = UserSourceConnection(source=self.source) 59 session = self.client.session 60 session[SESSION_KEY_PLAN] = plan 61 session.save() 62 63 response = self.client.get( 64 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 65 ) 66 67 self.assertEqual(response.status_code, 200) 68 self.assertStageRedirects(response, reverse("authentik_core:root-redirect")) 69 user_qs = User.objects.filter(username=plan.context[PLAN_CONTEXT_PROMPT]["username"]) 70 self.assertTrue(user_qs.exists()) 71 user = user_qs.first() 72 self.assertTrue(user.check_password(password)) 73 self.assertEqual(list(user.groups.order_by("name")), [self.other_group, self.group]) 74 self.assertEqual(user.attributes, {USER_ATTRIBUTE_SOURCES: [self.source.name]}) 75 76 self.assertTrue( 77 Event.objects.filter( 78 action=EventAction.MODEL_CREATED, 79 context__model={ 80 "app": "authentik_core", 81 "model_name": "user", 82 "pk": user.pk, 83 "name": "name", 84 }, 85 ) 86 ) 87 self.assertTrue( 88 Event.objects.filter( 89 action=EventAction.MODEL_UPDATED, 90 context__model={ 91 "app": "authentik_core", 92 "model_name": "user", 93 "pk": user.pk, 94 "name": "name", 95 }, 96 ) 97 ) 98 99 def test_user_update(self): 100 """Test update of existing user""" 101 new_password = generate_key() 102 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 103 plan.context[PLAN_CONTEXT_PENDING_USER] = User.objects.create( 104 username="unittest", email="test@goauthentik.io" 105 ) 106 plan.context[PLAN_CONTEXT_PROMPT] = { 107 "username": "test-user-new", 108 "password": new_password, 109 "attributes.some.custom-attribute": "test", 110 "attributes.some_custom_attribute": "test", 111 "attributes": { 112 "foo": "bar", 113 }, 114 "some_ignored_attribute": "bar", 115 } 116 session = self.client.session 117 session[SESSION_KEY_PLAN] = plan 118 session.save() 119 120 response = self.client.post( 121 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 122 ) 123 124 self.assertEqual(response.status_code, 200) 125 self.assertStageRedirects(response, reverse("authentik_core:root-redirect")) 126 user_qs = User.objects.filter(username=plan.context[PLAN_CONTEXT_PROMPT]["username"]) 127 self.assertTrue(user_qs.exists()) 128 self.assertTrue(user_qs.first().check_password(new_password)) 129 self.assertEqual(user_qs.first().attributes["some"]["custom-attribute"], "test") 130 self.assertEqual(user_qs.first().attributes["foo"], "bar") 131 self.assertEqual(user_qs.first().attributes["some_custom_attribute"], "test") 132 133 def test_user_update_complex(self): 134 """Test update of existing user""" 135 new_password = generate_key() 136 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 137 plan.context[PLAN_CONTEXT_PENDING_USER] = User.objects.create( 138 username="unittest", email="test@goauthentik.io" 139 ) 140 time = now() 141 plan.context[PLAN_CONTEXT_PROMPT] = { 142 "username": "test-user-new", 143 "password": new_password, 144 "attributes.foo": time, 145 } 146 session = self.client.session 147 session[SESSION_KEY_PLAN] = plan 148 session.save() 149 150 response = self.client.post( 151 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 152 ) 153 154 self.assertEqual(response.status_code, 200) 155 self.assertStageRedirects(response, reverse("authentik_core:root-redirect")) 156 user_qs = User.objects.filter(username=plan.context[PLAN_CONTEXT_PROMPT]["username"]) 157 self.assertTrue(user_qs.exists()) 158 self.assertTrue(user_qs.first().check_password(new_password)) 159 self.assertEqual(user_qs.first().attributes["foo"], time.isoformat()[:-6] + "Z") 160 161 def test_user_update_source(self): 162 """Test update of existing user with a source""" 163 new_password = generate_key() 164 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 165 plan.context[PLAN_CONTEXT_PENDING_USER] = User.objects.create( 166 username="unittest", 167 email="test@goauthentik.io", 168 attributes={ 169 USER_ATTRIBUTE_SOURCES: [ 170 self.source.name, 171 ] 172 }, 173 ) 174 plan.context[PLAN_CONTEXT_SOURCES_CONNECTION] = UserSourceConnection(source=self.source) 175 plan.context[PLAN_CONTEXT_PROMPT] = { 176 "username": "test-user-new", 177 "password": new_password, 178 "attributes.some.custom-attribute": "test", 179 "attributes": { 180 "foo": "bar", 181 }, 182 "some_ignored_attribute": "bar", 183 } 184 session = self.client.session 185 session[SESSION_KEY_PLAN] = plan 186 session.save() 187 188 response = self.client.post( 189 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 190 ) 191 192 self.assertEqual(response.status_code, 200) 193 self.assertStageRedirects(response, reverse("authentik_core:root-redirect")) 194 user_qs = User.objects.filter(username=plan.context[PLAN_CONTEXT_PROMPT]["username"]) 195 self.assertTrue(user_qs.exists()) 196 self.assertTrue(user_qs.first().check_password(new_password)) 197 self.assertEqual(user_qs.first().attributes["some"]["custom-attribute"], "test") 198 self.assertEqual(user_qs.first().attributes["foo"], "bar") 199 self.assertEqual(user_qs.first().attributes[USER_ATTRIBUTE_SOURCES], [self.source.name]) 200 self.assertNotIn("some_ignored_attribute", user_qs.first().attributes) 201 202 @patch( 203 "authentik.flows.views.executor.to_stage_response", 204 TO_STAGE_RESPONSE_MOCK, 205 ) 206 def test_without_data(self): 207 """Test without data results in error""" 208 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 209 session = self.client.session 210 session[SESSION_KEY_PLAN] = plan 211 session.save() 212 213 response = self.client.get( 214 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 215 ) 216 217 self.assertStageResponse( 218 response, 219 self.flow, 220 component="ak-stage-access-denied", 221 ) 222 223 @patch( 224 "authentik.flows.views.executor.to_stage_response", 225 TO_STAGE_RESPONSE_MOCK, 226 ) 227 def test_blank_username(self): 228 """Test with blank username results in error""" 229 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 230 session = self.client.session 231 plan.context[PLAN_CONTEXT_PROMPT] = { 232 "username": "", 233 "attribute_some-custom-attribute": "test", 234 "some_ignored_attribute": "bar", 235 } 236 session[SESSION_KEY_PLAN] = plan 237 session.save() 238 239 response = self.client.get( 240 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 241 ) 242 243 self.assertStageResponse( 244 response, 245 self.flow, 246 component="ak-stage-access-denied", 247 ) 248 249 def test_authenticated_no_user(self): 250 """Test user in session and none in plan""" 251 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 252 self.client.force_login(self.user) 253 session = self.client.session 254 plan.context[PLAN_CONTEXT_PROMPT] = { 255 "username": "foo", 256 "attribute_some-custom-attribute": "test", 257 "some_ignored_attribute": "bar", 258 } 259 session[SESSION_KEY_PLAN] = plan 260 session.save() 261 262 response = self.client.get( 263 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 264 ) 265 self.assertStageRedirects(response, reverse("authentik_core:root-redirect")) 266 self.user.refresh_from_db() 267 self.assertEqual(self.user.username, "foo") 268 269 def test_no_create(self): 270 """Test can_create_users set to false""" 271 self.stage.user_creation_mode = UserCreationMode.NEVER_CREATE 272 self.stage.save() 273 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 274 session = self.client.session 275 plan.context[PLAN_CONTEXT_PROMPT] = { 276 "username": "foo", 277 "attribute_some-custom-attribute": "test", 278 "some_ignored_attribute": "bar", 279 } 280 session[SESSION_KEY_PLAN] = plan 281 session.save() 282 283 response = self.client.get( 284 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 285 ) 286 self.assertStageResponse( 287 response, 288 self.flow, 289 component="ak-stage-access-denied", 290 ) 291 292 @patch( 293 "authentik.flows.views.executor.to_stage_response", 294 TO_STAGE_RESPONSE_MOCK, 295 ) 296 def test_duplicate_data(self): 297 """Test with duplicate data, should trigger error""" 298 user = create_test_admin_user() 299 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 300 session = self.client.session 301 plan.context[PLAN_CONTEXT_PROMPT] = { 302 "username": user.username, 303 "attribute_some-custom-attribute": "test", 304 "some_ignored_attribute": "bar", 305 } 306 session[SESSION_KEY_PLAN] = plan 307 session.save() 308 309 response = self.client.get( 310 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 311 ) 312 313 self.assertStageResponse( 314 response, 315 self.flow, 316 component="ak-stage-access-denied", 317 ) 318 319 def test_write_attribute(self): 320 """Test write_attribute""" 321 user = create_test_admin_user() 322 user.attributes = { 323 "foo": "bar", 324 "baz": { 325 "qwer": [ 326 "quox", 327 ] 328 }, 329 } 330 user.save() 331 UserWriteStageView.write_attribute(user, "attributes.foo", "baz") 332 self.assertEqual( 333 user.attributes, 334 { 335 "foo": "baz", 336 "baz": { 337 "qwer": [ 338 "quox", 339 ] 340 }, 341 }, 342 ) 343 UserWriteStageView.write_attribute(user, "attributes.foob.bar", "baz") 344 self.assertEqual( 345 user.attributes, 346 { 347 "foo": "baz", 348 "foob": { 349 "bar": "baz", 350 }, 351 "baz": { 352 "qwer": [ 353 "quox", 354 ] 355 }, 356 }, 357 )
Write tests
def
setUp(self):
34 def setUp(self): 35 super().setUp() 36 self.flow = create_test_flow() 37 self.group = Group.objects.create(name="test-group") 38 self.other_group = Group.objects.create(name="other-group") 39 self.stage: UserWriteStage = UserWriteStage.objects.create( 40 name="write", create_users_as_inactive=True, create_users_group=self.group 41 ) 42 self.binding = FlowStageBinding.objects.create(target=self.flow, stage=self.stage, order=2) 43 self.source = Source.objects.create(name="fake_source") 44 self.user = create_test_admin_user()
Hook method for setting up the test fixture before exercising it.
def
test_user_create(self):
46 def test_user_create(self): 47 """Test creation of user""" 48 password = generate_key() 49 50 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 51 plan.context[PLAN_CONTEXT_PROMPT] = { 52 "username": "test-user", 53 "name": "name", 54 "email": "test@goauthentik.io", 55 "password": password, 56 } 57 plan.context[PLAN_CONTEXT_GROUPS] = [self.other_group] 58 plan.context[PLAN_CONTEXT_SOURCES_CONNECTION] = UserSourceConnection(source=self.source) 59 session = self.client.session 60 session[SESSION_KEY_PLAN] = plan 61 session.save() 62 63 response = self.client.get( 64 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 65 ) 66 67 self.assertEqual(response.status_code, 200) 68 self.assertStageRedirects(response, reverse("authentik_core:root-redirect")) 69 user_qs = User.objects.filter(username=plan.context[PLAN_CONTEXT_PROMPT]["username"]) 70 self.assertTrue(user_qs.exists()) 71 user = user_qs.first() 72 self.assertTrue(user.check_password(password)) 73 self.assertEqual(list(user.groups.order_by("name")), [self.other_group, self.group]) 74 self.assertEqual(user.attributes, {USER_ATTRIBUTE_SOURCES: [self.source.name]}) 75 76 self.assertTrue( 77 Event.objects.filter( 78 action=EventAction.MODEL_CREATED, 79 context__model={ 80 "app": "authentik_core", 81 "model_name": "user", 82 "pk": user.pk, 83 "name": "name", 84 }, 85 ) 86 ) 87 self.assertTrue( 88 Event.objects.filter( 89 action=EventAction.MODEL_UPDATED, 90 context__model={ 91 "app": "authentik_core", 92 "model_name": "user", 93 "pk": user.pk, 94 "name": "name", 95 }, 96 ) 97 )
Test creation of user
def
test_user_update(self):
99 def test_user_update(self): 100 """Test update of existing user""" 101 new_password = generate_key() 102 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 103 plan.context[PLAN_CONTEXT_PENDING_USER] = User.objects.create( 104 username="unittest", email="test@goauthentik.io" 105 ) 106 plan.context[PLAN_CONTEXT_PROMPT] = { 107 "username": "test-user-new", 108 "password": new_password, 109 "attributes.some.custom-attribute": "test", 110 "attributes.some_custom_attribute": "test", 111 "attributes": { 112 "foo": "bar", 113 }, 114 "some_ignored_attribute": "bar", 115 } 116 session = self.client.session 117 session[SESSION_KEY_PLAN] = plan 118 session.save() 119 120 response = self.client.post( 121 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 122 ) 123 124 self.assertEqual(response.status_code, 200) 125 self.assertStageRedirects(response, reverse("authentik_core:root-redirect")) 126 user_qs = User.objects.filter(username=plan.context[PLAN_CONTEXT_PROMPT]["username"]) 127 self.assertTrue(user_qs.exists()) 128 self.assertTrue(user_qs.first().check_password(new_password)) 129 self.assertEqual(user_qs.first().attributes["some"]["custom-attribute"], "test") 130 self.assertEqual(user_qs.first().attributes["foo"], "bar") 131 self.assertEqual(user_qs.first().attributes["some_custom_attribute"], "test")
Test update of existing user
def
test_user_update_complex(self):
133 def test_user_update_complex(self): 134 """Test update of existing user""" 135 new_password = generate_key() 136 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 137 plan.context[PLAN_CONTEXT_PENDING_USER] = User.objects.create( 138 username="unittest", email="test@goauthentik.io" 139 ) 140 time = now() 141 plan.context[PLAN_CONTEXT_PROMPT] = { 142 "username": "test-user-new", 143 "password": new_password, 144 "attributes.foo": time, 145 } 146 session = self.client.session 147 session[SESSION_KEY_PLAN] = plan 148 session.save() 149 150 response = self.client.post( 151 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 152 ) 153 154 self.assertEqual(response.status_code, 200) 155 self.assertStageRedirects(response, reverse("authentik_core:root-redirect")) 156 user_qs = User.objects.filter(username=plan.context[PLAN_CONTEXT_PROMPT]["username"]) 157 self.assertTrue(user_qs.exists()) 158 self.assertTrue(user_qs.first().check_password(new_password)) 159 self.assertEqual(user_qs.first().attributes["foo"], time.isoformat()[:-6] + "Z")
Test update of existing user
def
test_user_update_source(self):
161 def test_user_update_source(self): 162 """Test update of existing user with a source""" 163 new_password = generate_key() 164 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 165 plan.context[PLAN_CONTEXT_PENDING_USER] = User.objects.create( 166 username="unittest", 167 email="test@goauthentik.io", 168 attributes={ 169 USER_ATTRIBUTE_SOURCES: [ 170 self.source.name, 171 ] 172 }, 173 ) 174 plan.context[PLAN_CONTEXT_SOURCES_CONNECTION] = UserSourceConnection(source=self.source) 175 plan.context[PLAN_CONTEXT_PROMPT] = { 176 "username": "test-user-new", 177 "password": new_password, 178 "attributes.some.custom-attribute": "test", 179 "attributes": { 180 "foo": "bar", 181 }, 182 "some_ignored_attribute": "bar", 183 } 184 session = self.client.session 185 session[SESSION_KEY_PLAN] = plan 186 session.save() 187 188 response = self.client.post( 189 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 190 ) 191 192 self.assertEqual(response.status_code, 200) 193 self.assertStageRedirects(response, reverse("authentik_core:root-redirect")) 194 user_qs = User.objects.filter(username=plan.context[PLAN_CONTEXT_PROMPT]["username"]) 195 self.assertTrue(user_qs.exists()) 196 self.assertTrue(user_qs.first().check_password(new_password)) 197 self.assertEqual(user_qs.first().attributes["some"]["custom-attribute"], "test") 198 self.assertEqual(user_qs.first().attributes["foo"], "bar") 199 self.assertEqual(user_qs.first().attributes[USER_ATTRIBUTE_SOURCES], [self.source.name]) 200 self.assertNotIn("some_ignored_attribute", user_qs.first().attributes)
Test update of existing user with a source
@patch('authentik.flows.views.executor.to_stage_response', TO_STAGE_RESPONSE_MOCK)
def
test_without_data(self):
202 @patch( 203 "authentik.flows.views.executor.to_stage_response", 204 TO_STAGE_RESPONSE_MOCK, 205 ) 206 def test_without_data(self): 207 """Test without data results in error""" 208 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 209 session = self.client.session 210 session[SESSION_KEY_PLAN] = plan 211 session.save() 212 213 response = self.client.get( 214 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 215 ) 216 217 self.assertStageResponse( 218 response, 219 self.flow, 220 component="ak-stage-access-denied", 221 )
Test without data results in error
@patch('authentik.flows.views.executor.to_stage_response', TO_STAGE_RESPONSE_MOCK)
def
test_blank_username(self):
223 @patch( 224 "authentik.flows.views.executor.to_stage_response", 225 TO_STAGE_RESPONSE_MOCK, 226 ) 227 def test_blank_username(self): 228 """Test with blank username results in error""" 229 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 230 session = self.client.session 231 plan.context[PLAN_CONTEXT_PROMPT] = { 232 "username": "", 233 "attribute_some-custom-attribute": "test", 234 "some_ignored_attribute": "bar", 235 } 236 session[SESSION_KEY_PLAN] = plan 237 session.save() 238 239 response = self.client.get( 240 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 241 ) 242 243 self.assertStageResponse( 244 response, 245 self.flow, 246 component="ak-stage-access-denied", 247 )
Test with blank username results in error
def
test_authenticated_no_user(self):
249 def test_authenticated_no_user(self): 250 """Test user in session and none in plan""" 251 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 252 self.client.force_login(self.user) 253 session = self.client.session 254 plan.context[PLAN_CONTEXT_PROMPT] = { 255 "username": "foo", 256 "attribute_some-custom-attribute": "test", 257 "some_ignored_attribute": "bar", 258 } 259 session[SESSION_KEY_PLAN] = plan 260 session.save() 261 262 response = self.client.get( 263 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 264 ) 265 self.assertStageRedirects(response, reverse("authentik_core:root-redirect")) 266 self.user.refresh_from_db() 267 self.assertEqual(self.user.username, "foo")
Test user in session and none in plan
def
test_no_create(self):
269 def test_no_create(self): 270 """Test can_create_users set to false""" 271 self.stage.user_creation_mode = UserCreationMode.NEVER_CREATE 272 self.stage.save() 273 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 274 session = self.client.session 275 plan.context[PLAN_CONTEXT_PROMPT] = { 276 "username": "foo", 277 "attribute_some-custom-attribute": "test", 278 "some_ignored_attribute": "bar", 279 } 280 session[SESSION_KEY_PLAN] = plan 281 session.save() 282 283 response = self.client.get( 284 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 285 ) 286 self.assertStageResponse( 287 response, 288 self.flow, 289 component="ak-stage-access-denied", 290 )
Test can_create_users set to false
@patch('authentik.flows.views.executor.to_stage_response', TO_STAGE_RESPONSE_MOCK)
def
test_duplicate_data(self):
292 @patch( 293 "authentik.flows.views.executor.to_stage_response", 294 TO_STAGE_RESPONSE_MOCK, 295 ) 296 def test_duplicate_data(self): 297 """Test with duplicate data, should trigger error""" 298 user = create_test_admin_user() 299 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 300 session = self.client.session 301 plan.context[PLAN_CONTEXT_PROMPT] = { 302 "username": user.username, 303 "attribute_some-custom-attribute": "test", 304 "some_ignored_attribute": "bar", 305 } 306 session[SESSION_KEY_PLAN] = plan 307 session.save() 308 309 response = self.client.get( 310 reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 311 ) 312 313 self.assertStageResponse( 314 response, 315 self.flow, 316 component="ak-stage-access-denied", 317 )
Test with duplicate data, should trigger error
def
test_write_attribute(self):
319 def test_write_attribute(self): 320 """Test write_attribute""" 321 user = create_test_admin_user() 322 user.attributes = { 323 "foo": "bar", 324 "baz": { 325 "qwer": [ 326 "quox", 327 ] 328 }, 329 } 330 user.save() 331 UserWriteStageView.write_attribute(user, "attributes.foo", "baz") 332 self.assertEqual( 333 user.attributes, 334 { 335 "foo": "baz", 336 "baz": { 337 "qwer": [ 338 "quox", 339 ] 340 }, 341 }, 342 ) 343 UserWriteStageView.write_attribute(user, "attributes.foob.bar", "baz") 344 self.assertEqual( 345 user.attributes, 346 { 347 "foo": "baz", 348 "foob": { 349 "bar": "baz", 350 }, 351 "baz": { 352 "qwer": [ 353 "quox", 354 ] 355 }, 356 }, 357 )
Test write_attribute