authentik.stages.user_write.tests

write tests

  1"""write tests"""
  2
  3from unittest.mock import patch
  4
  5from django.urls import reverse
  6from django.utils.timezone import now
  7
  8from authentik.core.models import (
  9    USER_ATTRIBUTE_SOURCES,
 10    Group,
 11    Source,
 12    User,
 13    UserSourceConnection,
 14)
 15from authentik.core.sources.stage import PLAN_CONTEXT_SOURCES_CONNECTION
 16from authentik.core.tests.utils import create_test_admin_user, create_test_flow
 17from authentik.events.models import Event, EventAction
 18from authentik.flows.markers import StageMarker
 19from authentik.flows.models import FlowStageBinding
 20from authentik.flows.planner import PLAN_CONTEXT_PENDING_USER, FlowPlan
 21from authentik.flows.tests import FlowTestCase
 22from authentik.flows.tests.test_executor import TO_STAGE_RESPONSE_MOCK
 23from authentik.flows.views.executor import SESSION_KEY_PLAN
 24from authentik.lib.generators import generate_key
 25from authentik.stages.prompt.stage import PLAN_CONTEXT_PROMPT
 26from authentik.stages.user_write.models import UserCreationMode, UserWriteStage
 27from authentik.stages.user_write.stage import PLAN_CONTEXT_GROUPS, UserWriteStageView
 28
 29
 30class TestUserWriteStage(FlowTestCase):
 31    """Write tests"""
 32
 33    def setUp(self):
 34        super().setUp()
 35        self.flow = create_test_flow()
 36        self.group = Group.objects.create(name="test-group")
 37        self.other_group = Group.objects.create(name="other-group")
 38        self.stage: UserWriteStage = UserWriteStage.objects.create(
 39            name="write", create_users_as_inactive=True, create_users_group=self.group
 40        )
 41        self.binding = FlowStageBinding.objects.create(target=self.flow, stage=self.stage, order=2)
 42        self.source = Source.objects.create(name="fake_source")
 43        self.user = create_test_admin_user()
 44
 45    def test_user_create(self):
 46        """Test creation of user"""
 47        password = generate_key()
 48
 49        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
 50        plan.context[PLAN_CONTEXT_PROMPT] = {
 51            "username": "test-user",
 52            "name": "name",
 53            "email": "test@goauthentik.io",
 54            "password": password,
 55        }
 56        plan.context[PLAN_CONTEXT_GROUPS] = [self.other_group]
 57        plan.context[PLAN_CONTEXT_SOURCES_CONNECTION] = UserSourceConnection(source=self.source)
 58        session = self.client.session
 59        session[SESSION_KEY_PLAN] = plan
 60        session.save()
 61
 62        response = self.client.get(
 63            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
 64        )
 65
 66        self.assertEqual(response.status_code, 200)
 67        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
 68        user_qs = User.objects.filter(username=plan.context[PLAN_CONTEXT_PROMPT]["username"])
 69        self.assertTrue(user_qs.exists())
 70        user = user_qs.first()
 71        self.assertTrue(user.check_password(password))
 72        self.assertEqual(list(user.groups.order_by("name")), [self.other_group, self.group])
 73        self.assertEqual(user.attributes, {USER_ATTRIBUTE_SOURCES: [self.source.name]})
 74
 75        self.assertTrue(
 76            Event.objects.filter(
 77                action=EventAction.MODEL_CREATED,
 78                context__model={
 79                    "app": "authentik_core",
 80                    "model_name": "user",
 81                    "pk": user.pk,
 82                    "name": "name",
 83                },
 84            )
 85        )
 86        self.assertTrue(
 87            Event.objects.filter(
 88                action=EventAction.MODEL_UPDATED,
 89                context__model={
 90                    "app": "authentik_core",
 91                    "model_name": "user",
 92                    "pk": user.pk,
 93                    "name": "name",
 94                },
 95            )
 96        )
 97
 98    def test_user_update(self):
 99        """Test update of existing user"""
100        new_password = generate_key()
101        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
102        plan.context[PLAN_CONTEXT_PENDING_USER] = User.objects.create(
103            username="unittest", email="test@goauthentik.io"
104        )
105        plan.context[PLAN_CONTEXT_PROMPT] = {
106            "username": "test-user-new",
107            "password": new_password,
108            "attributes.some.custom-attribute": "test",
109            "attributes.some_custom_attribute": "test",
110            "attributes": {
111                "foo": "bar",
112            },
113            "some_ignored_attribute": "bar",
114        }
115        session = self.client.session
116        session[SESSION_KEY_PLAN] = plan
117        session.save()
118
119        response = self.client.post(
120            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
121        )
122
123        self.assertEqual(response.status_code, 200)
124        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
125        user_qs = User.objects.filter(username=plan.context[PLAN_CONTEXT_PROMPT]["username"])
126        self.assertTrue(user_qs.exists())
127        self.assertTrue(user_qs.first().check_password(new_password))
128        self.assertEqual(user_qs.first().attributes["some"]["custom-attribute"], "test")
129        self.assertEqual(user_qs.first().attributes["foo"], "bar")
130        self.assertEqual(user_qs.first().attributes["some_custom_attribute"], "test")
131
132    def test_user_update_complex(self):
133        """Test update of existing user"""
134        new_password = generate_key()
135        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
136        plan.context[PLAN_CONTEXT_PENDING_USER] = User.objects.create(
137            username="unittest", email="test@goauthentik.io"
138        )
139        time = now()
140        plan.context[PLAN_CONTEXT_PROMPT] = {
141            "username": "test-user-new",
142            "password": new_password,
143            "attributes.foo": time,
144        }
145        session = self.client.session
146        session[SESSION_KEY_PLAN] = plan
147        session.save()
148
149        response = self.client.post(
150            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
151        )
152
153        self.assertEqual(response.status_code, 200)
154        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
155        user_qs = User.objects.filter(username=plan.context[PLAN_CONTEXT_PROMPT]["username"])
156        self.assertTrue(user_qs.exists())
157        self.assertTrue(user_qs.first().check_password(new_password))
158        self.assertEqual(user_qs.first().attributes["foo"], time.isoformat()[:-6] + "Z")
159
160    def test_user_update_source(self):
161        """Test update of existing user with a source"""
162        new_password = generate_key()
163        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
164        plan.context[PLAN_CONTEXT_PENDING_USER] = User.objects.create(
165            username="unittest",
166            email="test@goauthentik.io",
167            attributes={
168                USER_ATTRIBUTE_SOURCES: [
169                    self.source.name,
170                ]
171            },
172        )
173        plan.context[PLAN_CONTEXT_SOURCES_CONNECTION] = UserSourceConnection(source=self.source)
174        plan.context[PLAN_CONTEXT_PROMPT] = {
175            "username": "test-user-new",
176            "password": new_password,
177            "attributes.some.custom-attribute": "test",
178            "attributes": {
179                "foo": "bar",
180            },
181            "some_ignored_attribute": "bar",
182        }
183        session = self.client.session
184        session[SESSION_KEY_PLAN] = plan
185        session.save()
186
187        response = self.client.post(
188            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
189        )
190
191        self.assertEqual(response.status_code, 200)
192        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
193        user_qs = User.objects.filter(username=plan.context[PLAN_CONTEXT_PROMPT]["username"])
194        self.assertTrue(user_qs.exists())
195        self.assertTrue(user_qs.first().check_password(new_password))
196        self.assertEqual(user_qs.first().attributes["some"]["custom-attribute"], "test")
197        self.assertEqual(user_qs.first().attributes["foo"], "bar")
198        self.assertEqual(user_qs.first().attributes[USER_ATTRIBUTE_SOURCES], [self.source.name])
199        self.assertNotIn("some_ignored_attribute", user_qs.first().attributes)
200
201    @patch(
202        "authentik.flows.views.executor.to_stage_response",
203        TO_STAGE_RESPONSE_MOCK,
204    )
205    def test_without_data(self):
206        """Test without data results in error"""
207        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
208        session = self.client.session
209        session[SESSION_KEY_PLAN] = plan
210        session.save()
211
212        response = self.client.get(
213            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
214        )
215
216        self.assertStageResponse(
217            response,
218            self.flow,
219            component="ak-stage-access-denied",
220        )
221
222    @patch(
223        "authentik.flows.views.executor.to_stage_response",
224        TO_STAGE_RESPONSE_MOCK,
225    )
226    def test_blank_username(self):
227        """Test with blank username results in error"""
228        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
229        session = self.client.session
230        plan.context[PLAN_CONTEXT_PROMPT] = {
231            "username": "",
232            "attribute_some-custom-attribute": "test",
233            "some_ignored_attribute": "bar",
234        }
235        session[SESSION_KEY_PLAN] = plan
236        session.save()
237
238        response = self.client.get(
239            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
240        )
241
242        self.assertStageResponse(
243            response,
244            self.flow,
245            component="ak-stage-access-denied",
246        )
247
248    def test_authenticated_no_user(self):
249        """Test user in session and none in plan"""
250        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
251        self.client.force_login(self.user)
252        session = self.client.session
253        plan.context[PLAN_CONTEXT_PROMPT] = {
254            "username": "foo",
255            "attribute_some-custom-attribute": "test",
256            "some_ignored_attribute": "bar",
257        }
258        session[SESSION_KEY_PLAN] = plan
259        session.save()
260
261        response = self.client.get(
262            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
263        )
264        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
265        self.user.refresh_from_db()
266        self.assertEqual(self.user.username, "foo")
267
268    def test_no_create(self):
269        """Test can_create_users set to false"""
270        self.stage.user_creation_mode = UserCreationMode.NEVER_CREATE
271        self.stage.save()
272        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
273        session = self.client.session
274        plan.context[PLAN_CONTEXT_PROMPT] = {
275            "username": "foo",
276            "attribute_some-custom-attribute": "test",
277            "some_ignored_attribute": "bar",
278        }
279        session[SESSION_KEY_PLAN] = plan
280        session.save()
281
282        response = self.client.get(
283            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
284        )
285        self.assertStageResponse(
286            response,
287            self.flow,
288            component="ak-stage-access-denied",
289        )
290
291    @patch(
292        "authentik.flows.views.executor.to_stage_response",
293        TO_STAGE_RESPONSE_MOCK,
294    )
295    def test_duplicate_data(self):
296        """Test with duplicate data, should trigger error"""
297        user = create_test_admin_user()
298        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
299        session = self.client.session
300        plan.context[PLAN_CONTEXT_PROMPT] = {
301            "username": user.username,
302            "attribute_some-custom-attribute": "test",
303            "some_ignored_attribute": "bar",
304        }
305        session[SESSION_KEY_PLAN] = plan
306        session.save()
307
308        response = self.client.get(
309            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
310        )
311
312        self.assertStageResponse(
313            response,
314            self.flow,
315            component="ak-stage-access-denied",
316        )
317
318    def test_write_attribute(self):
319        """Test write_attribute"""
320        user = create_test_admin_user()
321        user.attributes = {
322            "foo": "bar",
323            "baz": {
324                "qwer": [
325                    "quox",
326                ]
327            },
328        }
329        user.save()
330        UserWriteStageView.write_attribute(user, "attributes.foo", "baz")
331        self.assertEqual(
332            user.attributes,
333            {
334                "foo": "baz",
335                "baz": {
336                    "qwer": [
337                        "quox",
338                    ]
339                },
340            },
341        )
342        UserWriteStageView.write_attribute(user, "attributes.foob.bar", "baz")
343        self.assertEqual(
344            user.attributes,
345            {
346                "foo": "baz",
347                "foob": {
348                    "bar": "baz",
349                },
350                "baz": {
351                    "qwer": [
352                        "quox",
353                    ]
354                },
355            },
356        )
class TestUserWriteStage(authentik.flows.tests.FlowTestCase):
 31class TestUserWriteStage(FlowTestCase):
 32    """Write tests"""
 33
 34    def setUp(self):
 35        super().setUp()
 36        self.flow = create_test_flow()
 37        self.group = Group.objects.create(name="test-group")
 38        self.other_group = Group.objects.create(name="other-group")
 39        self.stage: UserWriteStage = UserWriteStage.objects.create(
 40            name="write", create_users_as_inactive=True, create_users_group=self.group
 41        )
 42        self.binding = FlowStageBinding.objects.create(target=self.flow, stage=self.stage, order=2)
 43        self.source = Source.objects.create(name="fake_source")
 44        self.user = create_test_admin_user()
 45
 46    def test_user_create(self):
 47        """Test creation of user"""
 48        password = generate_key()
 49
 50        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
 51        plan.context[PLAN_CONTEXT_PROMPT] = {
 52            "username": "test-user",
 53            "name": "name",
 54            "email": "test@goauthentik.io",
 55            "password": password,
 56        }
 57        plan.context[PLAN_CONTEXT_GROUPS] = [self.other_group]
 58        plan.context[PLAN_CONTEXT_SOURCES_CONNECTION] = UserSourceConnection(source=self.source)
 59        session = self.client.session
 60        session[SESSION_KEY_PLAN] = plan
 61        session.save()
 62
 63        response = self.client.get(
 64            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
 65        )
 66
 67        self.assertEqual(response.status_code, 200)
 68        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
 69        user_qs = User.objects.filter(username=plan.context[PLAN_CONTEXT_PROMPT]["username"])
 70        self.assertTrue(user_qs.exists())
 71        user = user_qs.first()
 72        self.assertTrue(user.check_password(password))
 73        self.assertEqual(list(user.groups.order_by("name")), [self.other_group, self.group])
 74        self.assertEqual(user.attributes, {USER_ATTRIBUTE_SOURCES: [self.source.name]})
 75
 76        self.assertTrue(
 77            Event.objects.filter(
 78                action=EventAction.MODEL_CREATED,
 79                context__model={
 80                    "app": "authentik_core",
 81                    "model_name": "user",
 82                    "pk": user.pk,
 83                    "name": "name",
 84                },
 85            )
 86        )
 87        self.assertTrue(
 88            Event.objects.filter(
 89                action=EventAction.MODEL_UPDATED,
 90                context__model={
 91                    "app": "authentik_core",
 92                    "model_name": "user",
 93                    "pk": user.pk,
 94                    "name": "name",
 95                },
 96            )
 97        )
 98
 99    def test_user_update(self):
100        """Test update of existing user"""
101        new_password = generate_key()
102        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
103        plan.context[PLAN_CONTEXT_PENDING_USER] = User.objects.create(
104            username="unittest", email="test@goauthentik.io"
105        )
106        plan.context[PLAN_CONTEXT_PROMPT] = {
107            "username": "test-user-new",
108            "password": new_password,
109            "attributes.some.custom-attribute": "test",
110            "attributes.some_custom_attribute": "test",
111            "attributes": {
112                "foo": "bar",
113            },
114            "some_ignored_attribute": "bar",
115        }
116        session = self.client.session
117        session[SESSION_KEY_PLAN] = plan
118        session.save()
119
120        response = self.client.post(
121            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
122        )
123
124        self.assertEqual(response.status_code, 200)
125        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
126        user_qs = User.objects.filter(username=plan.context[PLAN_CONTEXT_PROMPT]["username"])
127        self.assertTrue(user_qs.exists())
128        self.assertTrue(user_qs.first().check_password(new_password))
129        self.assertEqual(user_qs.first().attributes["some"]["custom-attribute"], "test")
130        self.assertEqual(user_qs.first().attributes["foo"], "bar")
131        self.assertEqual(user_qs.first().attributes["some_custom_attribute"], "test")
132
133    def test_user_update_complex(self):
134        """Test update of existing user"""
135        new_password = generate_key()
136        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
137        plan.context[PLAN_CONTEXT_PENDING_USER] = User.objects.create(
138            username="unittest", email="test@goauthentik.io"
139        )
140        time = now()
141        plan.context[PLAN_CONTEXT_PROMPT] = {
142            "username": "test-user-new",
143            "password": new_password,
144            "attributes.foo": time,
145        }
146        session = self.client.session
147        session[SESSION_KEY_PLAN] = plan
148        session.save()
149
150        response = self.client.post(
151            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
152        )
153
154        self.assertEqual(response.status_code, 200)
155        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
156        user_qs = User.objects.filter(username=plan.context[PLAN_CONTEXT_PROMPT]["username"])
157        self.assertTrue(user_qs.exists())
158        self.assertTrue(user_qs.first().check_password(new_password))
159        self.assertEqual(user_qs.first().attributes["foo"], time.isoformat()[:-6] + "Z")
160
161    def test_user_update_source(self):
162        """Test update of existing user with a source"""
163        new_password = generate_key()
164        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
165        plan.context[PLAN_CONTEXT_PENDING_USER] = User.objects.create(
166            username="unittest",
167            email="test@goauthentik.io",
168            attributes={
169                USER_ATTRIBUTE_SOURCES: [
170                    self.source.name,
171                ]
172            },
173        )
174        plan.context[PLAN_CONTEXT_SOURCES_CONNECTION] = UserSourceConnection(source=self.source)
175        plan.context[PLAN_CONTEXT_PROMPT] = {
176            "username": "test-user-new",
177            "password": new_password,
178            "attributes.some.custom-attribute": "test",
179            "attributes": {
180                "foo": "bar",
181            },
182            "some_ignored_attribute": "bar",
183        }
184        session = self.client.session
185        session[SESSION_KEY_PLAN] = plan
186        session.save()
187
188        response = self.client.post(
189            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
190        )
191
192        self.assertEqual(response.status_code, 200)
193        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
194        user_qs = User.objects.filter(username=plan.context[PLAN_CONTEXT_PROMPT]["username"])
195        self.assertTrue(user_qs.exists())
196        self.assertTrue(user_qs.first().check_password(new_password))
197        self.assertEqual(user_qs.first().attributes["some"]["custom-attribute"], "test")
198        self.assertEqual(user_qs.first().attributes["foo"], "bar")
199        self.assertEqual(user_qs.first().attributes[USER_ATTRIBUTE_SOURCES], [self.source.name])
200        self.assertNotIn("some_ignored_attribute", user_qs.first().attributes)
201
202    @patch(
203        "authentik.flows.views.executor.to_stage_response",
204        TO_STAGE_RESPONSE_MOCK,
205    )
206    def test_without_data(self):
207        """Test without data results in error"""
208        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
209        session = self.client.session
210        session[SESSION_KEY_PLAN] = plan
211        session.save()
212
213        response = self.client.get(
214            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
215        )
216
217        self.assertStageResponse(
218            response,
219            self.flow,
220            component="ak-stage-access-denied",
221        )
222
223    @patch(
224        "authentik.flows.views.executor.to_stage_response",
225        TO_STAGE_RESPONSE_MOCK,
226    )
227    def test_blank_username(self):
228        """Test with blank username results in error"""
229        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
230        session = self.client.session
231        plan.context[PLAN_CONTEXT_PROMPT] = {
232            "username": "",
233            "attribute_some-custom-attribute": "test",
234            "some_ignored_attribute": "bar",
235        }
236        session[SESSION_KEY_PLAN] = plan
237        session.save()
238
239        response = self.client.get(
240            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
241        )
242
243        self.assertStageResponse(
244            response,
245            self.flow,
246            component="ak-stage-access-denied",
247        )
248
249    def test_authenticated_no_user(self):
250        """Test user in session and none in plan"""
251        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
252        self.client.force_login(self.user)
253        session = self.client.session
254        plan.context[PLAN_CONTEXT_PROMPT] = {
255            "username": "foo",
256            "attribute_some-custom-attribute": "test",
257            "some_ignored_attribute": "bar",
258        }
259        session[SESSION_KEY_PLAN] = plan
260        session.save()
261
262        response = self.client.get(
263            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
264        )
265        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
266        self.user.refresh_from_db()
267        self.assertEqual(self.user.username, "foo")
268
269    def test_no_create(self):
270        """Test can_create_users set to false"""
271        self.stage.user_creation_mode = UserCreationMode.NEVER_CREATE
272        self.stage.save()
273        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
274        session = self.client.session
275        plan.context[PLAN_CONTEXT_PROMPT] = {
276            "username": "foo",
277            "attribute_some-custom-attribute": "test",
278            "some_ignored_attribute": "bar",
279        }
280        session[SESSION_KEY_PLAN] = plan
281        session.save()
282
283        response = self.client.get(
284            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
285        )
286        self.assertStageResponse(
287            response,
288            self.flow,
289            component="ak-stage-access-denied",
290        )
291
292    @patch(
293        "authentik.flows.views.executor.to_stage_response",
294        TO_STAGE_RESPONSE_MOCK,
295    )
296    def test_duplicate_data(self):
297        """Test with duplicate data, should trigger error"""
298        user = create_test_admin_user()
299        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
300        session = self.client.session
301        plan.context[PLAN_CONTEXT_PROMPT] = {
302            "username": user.username,
303            "attribute_some-custom-attribute": "test",
304            "some_ignored_attribute": "bar",
305        }
306        session[SESSION_KEY_PLAN] = plan
307        session.save()
308
309        response = self.client.get(
310            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
311        )
312
313        self.assertStageResponse(
314            response,
315            self.flow,
316            component="ak-stage-access-denied",
317        )
318
319    def test_write_attribute(self):
320        """Test write_attribute"""
321        user = create_test_admin_user()
322        user.attributes = {
323            "foo": "bar",
324            "baz": {
325                "qwer": [
326                    "quox",
327                ]
328            },
329        }
330        user.save()
331        UserWriteStageView.write_attribute(user, "attributes.foo", "baz")
332        self.assertEqual(
333            user.attributes,
334            {
335                "foo": "baz",
336                "baz": {
337                    "qwer": [
338                        "quox",
339                    ]
340                },
341            },
342        )
343        UserWriteStageView.write_attribute(user, "attributes.foob.bar", "baz")
344        self.assertEqual(
345            user.attributes,
346            {
347                "foo": "baz",
348                "foob": {
349                    "bar": "baz",
350                },
351                "baz": {
352                    "qwer": [
353                        "quox",
354                    ]
355                },
356            },
357        )

Write tests

def setUp(self):
34    def setUp(self):
35        super().setUp()
36        self.flow = create_test_flow()
37        self.group = Group.objects.create(name="test-group")
38        self.other_group = Group.objects.create(name="other-group")
39        self.stage: UserWriteStage = UserWriteStage.objects.create(
40            name="write", create_users_as_inactive=True, create_users_group=self.group
41        )
42        self.binding = FlowStageBinding.objects.create(target=self.flow, stage=self.stage, order=2)
43        self.source = Source.objects.create(name="fake_source")
44        self.user = create_test_admin_user()

Hook method for setting up the test fixture before exercising it.

def test_user_create(self):
46    def test_user_create(self):
47        """Test creation of user"""
48        password = generate_key()
49
50        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
51        plan.context[PLAN_CONTEXT_PROMPT] = {
52            "username": "test-user",
53            "name": "name",
54            "email": "test@goauthentik.io",
55            "password": password,
56        }
57        plan.context[PLAN_CONTEXT_GROUPS] = [self.other_group]
58        plan.context[PLAN_CONTEXT_SOURCES_CONNECTION] = UserSourceConnection(source=self.source)
59        session = self.client.session
60        session[SESSION_KEY_PLAN] = plan
61        session.save()
62
63        response = self.client.get(
64            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
65        )
66
67        self.assertEqual(response.status_code, 200)
68        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
69        user_qs = User.objects.filter(username=plan.context[PLAN_CONTEXT_PROMPT]["username"])
70        self.assertTrue(user_qs.exists())
71        user = user_qs.first()
72        self.assertTrue(user.check_password(password))
73        self.assertEqual(list(user.groups.order_by("name")), [self.other_group, self.group])
74        self.assertEqual(user.attributes, {USER_ATTRIBUTE_SOURCES: [self.source.name]})
75
76        self.assertTrue(
77            Event.objects.filter(
78                action=EventAction.MODEL_CREATED,
79                context__model={
80                    "app": "authentik_core",
81                    "model_name": "user",
82                    "pk": user.pk,
83                    "name": "name",
84                },
85            )
86        )
87        self.assertTrue(
88            Event.objects.filter(
89                action=EventAction.MODEL_UPDATED,
90                context__model={
91                    "app": "authentik_core",
92                    "model_name": "user",
93                    "pk": user.pk,
94                    "name": "name",
95                },
96            )
97        )

Test creation of user

def test_user_update(self):
 99    def test_user_update(self):
100        """Test update of existing user"""
101        new_password = generate_key()
102        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
103        plan.context[PLAN_CONTEXT_PENDING_USER] = User.objects.create(
104            username="unittest", email="test@goauthentik.io"
105        )
106        plan.context[PLAN_CONTEXT_PROMPT] = {
107            "username": "test-user-new",
108            "password": new_password,
109            "attributes.some.custom-attribute": "test",
110            "attributes.some_custom_attribute": "test",
111            "attributes": {
112                "foo": "bar",
113            },
114            "some_ignored_attribute": "bar",
115        }
116        session = self.client.session
117        session[SESSION_KEY_PLAN] = plan
118        session.save()
119
120        response = self.client.post(
121            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
122        )
123
124        self.assertEqual(response.status_code, 200)
125        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
126        user_qs = User.objects.filter(username=plan.context[PLAN_CONTEXT_PROMPT]["username"])
127        self.assertTrue(user_qs.exists())
128        self.assertTrue(user_qs.first().check_password(new_password))
129        self.assertEqual(user_qs.first().attributes["some"]["custom-attribute"], "test")
130        self.assertEqual(user_qs.first().attributes["foo"], "bar")
131        self.assertEqual(user_qs.first().attributes["some_custom_attribute"], "test")

Test update of existing user

def test_user_update_complex(self):
133    def test_user_update_complex(self):
134        """Test update of existing user"""
135        new_password = generate_key()
136        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
137        plan.context[PLAN_CONTEXT_PENDING_USER] = User.objects.create(
138            username="unittest", email="test@goauthentik.io"
139        )
140        time = now()
141        plan.context[PLAN_CONTEXT_PROMPT] = {
142            "username": "test-user-new",
143            "password": new_password,
144            "attributes.foo": time,
145        }
146        session = self.client.session
147        session[SESSION_KEY_PLAN] = plan
148        session.save()
149
150        response = self.client.post(
151            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
152        )
153
154        self.assertEqual(response.status_code, 200)
155        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
156        user_qs = User.objects.filter(username=plan.context[PLAN_CONTEXT_PROMPT]["username"])
157        self.assertTrue(user_qs.exists())
158        self.assertTrue(user_qs.first().check_password(new_password))
159        self.assertEqual(user_qs.first().attributes["foo"], time.isoformat()[:-6] + "Z")

Test update of existing user

def test_user_update_source(self):
161    def test_user_update_source(self):
162        """Test update of existing user with a source"""
163        new_password = generate_key()
164        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
165        plan.context[PLAN_CONTEXT_PENDING_USER] = User.objects.create(
166            username="unittest",
167            email="test@goauthentik.io",
168            attributes={
169                USER_ATTRIBUTE_SOURCES: [
170                    self.source.name,
171                ]
172            },
173        )
174        plan.context[PLAN_CONTEXT_SOURCES_CONNECTION] = UserSourceConnection(source=self.source)
175        plan.context[PLAN_CONTEXT_PROMPT] = {
176            "username": "test-user-new",
177            "password": new_password,
178            "attributes.some.custom-attribute": "test",
179            "attributes": {
180                "foo": "bar",
181            },
182            "some_ignored_attribute": "bar",
183        }
184        session = self.client.session
185        session[SESSION_KEY_PLAN] = plan
186        session.save()
187
188        response = self.client.post(
189            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
190        )
191
192        self.assertEqual(response.status_code, 200)
193        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
194        user_qs = User.objects.filter(username=plan.context[PLAN_CONTEXT_PROMPT]["username"])
195        self.assertTrue(user_qs.exists())
196        self.assertTrue(user_qs.first().check_password(new_password))
197        self.assertEqual(user_qs.first().attributes["some"]["custom-attribute"], "test")
198        self.assertEqual(user_qs.first().attributes["foo"], "bar")
199        self.assertEqual(user_qs.first().attributes[USER_ATTRIBUTE_SOURCES], [self.source.name])
200        self.assertNotIn("some_ignored_attribute", user_qs.first().attributes)

Test update of existing user with a source

@patch('authentik.flows.views.executor.to_stage_response', TO_STAGE_RESPONSE_MOCK)
def test_without_data(self):
202    @patch(
203        "authentik.flows.views.executor.to_stage_response",
204        TO_STAGE_RESPONSE_MOCK,
205    )
206    def test_without_data(self):
207        """Test without data results in error"""
208        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
209        session = self.client.session
210        session[SESSION_KEY_PLAN] = plan
211        session.save()
212
213        response = self.client.get(
214            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
215        )
216
217        self.assertStageResponse(
218            response,
219            self.flow,
220            component="ak-stage-access-denied",
221        )

Test without data results in error

@patch('authentik.flows.views.executor.to_stage_response', TO_STAGE_RESPONSE_MOCK)
def test_blank_username(self):
223    @patch(
224        "authentik.flows.views.executor.to_stage_response",
225        TO_STAGE_RESPONSE_MOCK,
226    )
227    def test_blank_username(self):
228        """Test with blank username results in error"""
229        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
230        session = self.client.session
231        plan.context[PLAN_CONTEXT_PROMPT] = {
232            "username": "",
233            "attribute_some-custom-attribute": "test",
234            "some_ignored_attribute": "bar",
235        }
236        session[SESSION_KEY_PLAN] = plan
237        session.save()
238
239        response = self.client.get(
240            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
241        )
242
243        self.assertStageResponse(
244            response,
245            self.flow,
246            component="ak-stage-access-denied",
247        )

Test with blank username results in error

def test_authenticated_no_user(self):
249    def test_authenticated_no_user(self):
250        """Test user in session and none in plan"""
251        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
252        self.client.force_login(self.user)
253        session = self.client.session
254        plan.context[PLAN_CONTEXT_PROMPT] = {
255            "username": "foo",
256            "attribute_some-custom-attribute": "test",
257            "some_ignored_attribute": "bar",
258        }
259        session[SESSION_KEY_PLAN] = plan
260        session.save()
261
262        response = self.client.get(
263            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
264        )
265        self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
266        self.user.refresh_from_db()
267        self.assertEqual(self.user.username, "foo")

Test user in session and none in plan

def test_no_create(self):
269    def test_no_create(self):
270        """Test can_create_users set to false"""
271        self.stage.user_creation_mode = UserCreationMode.NEVER_CREATE
272        self.stage.save()
273        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
274        session = self.client.session
275        plan.context[PLAN_CONTEXT_PROMPT] = {
276            "username": "foo",
277            "attribute_some-custom-attribute": "test",
278            "some_ignored_attribute": "bar",
279        }
280        session[SESSION_KEY_PLAN] = plan
281        session.save()
282
283        response = self.client.get(
284            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
285        )
286        self.assertStageResponse(
287            response,
288            self.flow,
289            component="ak-stage-access-denied",
290        )

Test can_create_users set to false

@patch('authentik.flows.views.executor.to_stage_response', TO_STAGE_RESPONSE_MOCK)
def test_duplicate_data(self):
292    @patch(
293        "authentik.flows.views.executor.to_stage_response",
294        TO_STAGE_RESPONSE_MOCK,
295    )
296    def test_duplicate_data(self):
297        """Test with duplicate data, should trigger error"""
298        user = create_test_admin_user()
299        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
300        session = self.client.session
301        plan.context[PLAN_CONTEXT_PROMPT] = {
302            "username": user.username,
303            "attribute_some-custom-attribute": "test",
304            "some_ignored_attribute": "bar",
305        }
306        session[SESSION_KEY_PLAN] = plan
307        session.save()
308
309        response = self.client.get(
310            reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
311        )
312
313        self.assertStageResponse(
314            response,
315            self.flow,
316            component="ak-stage-access-denied",
317        )

Test with duplicate data, should trigger error

def test_write_attribute(self):
319    def test_write_attribute(self):
320        """Test write_attribute"""
321        user = create_test_admin_user()
322        user.attributes = {
323            "foo": "bar",
324            "baz": {
325                "qwer": [
326                    "quox",
327                ]
328            },
329        }
330        user.save()
331        UserWriteStageView.write_attribute(user, "attributes.foo", "baz")
332        self.assertEqual(
333            user.attributes,
334            {
335                "foo": "baz",
336                "baz": {
337                    "qwer": [
338                        "quox",
339                    ]
340                },
341            },
342        )
343        UserWriteStageView.write_attribute(user, "attributes.foob.bar", "baz")
344        self.assertEqual(
345            user.attributes,
346            {
347                "foo": "baz",
348                "foob": {
349                    "bar": "baz",
350                },
351                "baz": {
352                    "qwer": [
353                        "quox",
354                    ]
355                },
356            },
357        )

Test write_attribute