authentik.stages.user_write.stage

Write stage logic

  1"""Write stage logic"""
  2
  3from typing import Any
  4
  5from django.contrib.auth import update_session_auth_hash
  6from django.db import transaction
  7from django.db.utils import IntegrityError, InternalError
  8from django.http import HttpRequest, HttpResponse
  9from django.utils.functional import SimpleLazyObject
 10from django.utils.translation import gettext as _
 11from rest_framework.exceptions import ValidationError
 12
 13from authentik.core.middleware import SESSION_KEY_IMPERSONATE_USER
 14from authentik.core.models import USER_ATTRIBUTE_SOURCES, User, UserSourceConnection, UserTypes
 15from authentik.core.sources.stage import PLAN_CONTEXT_SOURCES_CONNECTION
 16from authentik.events.utils import sanitize_dict, sanitize_item
 17from authentik.flows.planner import PLAN_CONTEXT_PENDING_USER
 18from authentik.flows.stage import StageView
 19from authentik.flows.views.executor import FlowExecutorView
 20from authentik.lib.utils.dict import set_path_in_dict
 21from authentik.stages.password import BACKEND_INBUILT
 22from authentik.stages.password.stage import PLAN_CONTEXT_AUTHENTICATION_BACKEND
 23from authentik.stages.prompt.stage import PLAN_CONTEXT_PROMPT
 24from authentik.stages.user_write.models import UserCreationMode
 25from authentik.stages.user_write.signals import user_write
 26
 27PLAN_CONTEXT_GROUPS = "groups"
 28PLAN_CONTEXT_USER_TYPE = "user_type"
 29PLAN_CONTEXT_USER_PATH = "user_path"
 30
 31
 32class UserWriteStageView(StageView):
 33    """Finalise Enrollment flow by creating a user object."""
 34
 35    def __init__(self, executor: FlowExecutorView, **kwargs):
 36        super().__init__(executor, **kwargs)
 37        self.disallowed_user_attributes = [
 38            "groups",
 39            # Block attribute writes that would otherwise land on the model's
 40            # primary key. An IdP that returns an `id` claim (mocksaml is one
 41            # example) used to crash the enrollment flow with
 42            # ValueError: Field 'id' expected a number but got '<hex>'
 43            # because hasattr(user, "id") is true and setattr(user, "id", ...)
 44            # was taken unchecked. See #21580.
 45            "id",
 46            "pk",
 47        ]
 48
 49    @staticmethod
 50    def write_attribute(user: User, key: str, value: Any):
 51        """Allow use of attributes.foo.bar when writing to a user, with full
 52        recursion"""
 53        parts = key.replace("attributes_", "attributes.", 1).split(".")
 54        if len(parts) < 1:  # pragma: no cover
 55            return
 56        # Function will always be called with a key like attributes.
 57        # this is just a sanity check to ensure that is removed
 58        if parts[0] == "attributes":
 59            parts = parts[1:]
 60        set_path_in_dict(user.attributes, ".".join(parts), sanitize_item(value))
 61
 62    def ensure_user(self) -> tuple[User | None, bool]:
 63        """Ensure a user exists"""
 64        user_created = False
 65        path = self.executor.plan.context.get(
 66            PLAN_CONTEXT_USER_PATH, self.executor.current_stage.user_path_template
 67        )
 68        if path == "":
 69            path = User.default_path()
 70
 71        try:
 72            user_type = UserTypes(
 73                self.executor.plan.context.get(
 74                    PLAN_CONTEXT_USER_TYPE,
 75                    self.executor.current_stage.user_type,
 76                )
 77            )
 78        except ValueError:
 79            user_type = self.executor.current_stage.user_type
 80        if user_type == UserTypes.INTERNAL_SERVICE_ACCOUNT:
 81            user_type = UserTypes.SERVICE_ACCOUNT
 82
 83        if not self.request.user.is_anonymous:
 84            self.executor.plan.context.setdefault(PLAN_CONTEXT_PENDING_USER, self.request.user)
 85        if (
 86            PLAN_CONTEXT_PENDING_USER not in self.executor.plan.context
 87            or self.executor.current_stage.user_creation_mode == UserCreationMode.ALWAYS_CREATE
 88        ):
 89            if self.executor.current_stage.user_creation_mode == UserCreationMode.NEVER_CREATE:
 90                return None, False
 91            self.executor.plan.context[PLAN_CONTEXT_PENDING_USER] = User(
 92                is_active=not self.executor.current_stage.create_users_as_inactive,
 93                path=path,
 94                type=user_type,
 95            )
 96            self.executor.plan.context[PLAN_CONTEXT_AUTHENTICATION_BACKEND] = BACKEND_INBUILT
 97            self.logger.debug(
 98                "Created new user",
 99                flow_slug=self.executor.flow.slug,
100            )
101            user_created = True
102        user: User = self.executor.plan.context[PLAN_CONTEXT_PENDING_USER]
103        return user, user_created
104
105    def update_user(self, user: User):
106        """Update `user` with data from plan context
107
108        Only simple attributes are updated, nothing which requires a foreign key or m2m"""
109        data: dict = self.executor.plan.context[PLAN_CONTEXT_PROMPT]
110        # This is always sent back but not written to the user
111        data.pop("component", None)
112        for key, value in data.items():
113            setter_name = f"set_{key}"
114            # Check if user has a setter for this key, like set_password
115            if key == "password":
116                user.set_password(value, request=self.request)
117            elif hasattr(user, setter_name):
118                setter = getattr(user, setter_name)
119                if callable(setter):
120                    setter(value)
121            elif key in self.disallowed_user_attributes:
122                self.logger.info("discarding key", key=key)
123                continue
124            # For exact attributes match, update the dictionary in place
125            elif key == "attributes":
126                if isinstance(value, dict):
127                    user.attributes.update(sanitize_dict(value))
128                else:
129                    raise ValidationError("Attempt to overwrite complete attributes")
130            # If using dot notation, use the correct helper to update the nested value
131            elif key.startswith("attributes.") or key.startswith("attributes_"):
132                UserWriteStageView.write_attribute(user, key, value)
133            # User has this key already
134            elif hasattr(user, key):
135                if isinstance(user, SimpleLazyObject):
136                    user._setup()
137                    user = user._wrapped
138                attr = getattr(type(user), key)
139                if isinstance(attr, property):
140                    if not attr.fset:
141                        self.logger.info("discarding key", key=key)
142                        continue
143                setattr(user, key, value)
144            # If none of the cases above matched, we have an attribute that the user doesn't have,
145            # has no setter for, is not a nested attributes value and as such is invalid
146            else:
147                self.logger.info("discarding key", key=key)
148                continue
149        # Check if we're writing from a source, and save the source to the attributes
150        if PLAN_CONTEXT_SOURCES_CONNECTION in self.executor.plan.context:
151            if USER_ATTRIBUTE_SOURCES not in user.attributes or not isinstance(
152                user.attributes.get(USER_ATTRIBUTE_SOURCES), list
153            ):
154                user.attributes[USER_ATTRIBUTE_SOURCES] = []
155            connection: UserSourceConnection = self.executor.plan.context[
156                PLAN_CONTEXT_SOURCES_CONNECTION
157            ]
158            if connection.source.name not in user.attributes[USER_ATTRIBUTE_SOURCES]:
159                user.attributes[USER_ATTRIBUTE_SOURCES].append(connection.source.name)
160
161    def dispatch(self, request: HttpRequest) -> HttpResponse:
162        """Save data in the current flow to the currently pending user. If no user is pending,
163        a new user is created."""
164        if PLAN_CONTEXT_PROMPT not in self.executor.plan.context:
165            message = _("No Pending data.")
166            self.logger.debug(message)
167            return self.executor.stage_invalid(message)
168        data = self.executor.plan.context[PLAN_CONTEXT_PROMPT]
169        user, user_created = self.ensure_user()
170        if not user:
171            message = _("No user found and can't create new user.")
172            self.logger.info(message)
173            return self.executor.stage_invalid(message)
174        # Before we change anything, check if the user is the same as in the request
175        # and we're updating a password. In that case we need to update the session hash
176        # Also check that we're not currently impersonating, so we don't update the session
177        should_update_session = False
178        if (
179            any("password" in x for x in data.keys())
180            and self.request.user.pk == user.pk
181            and SESSION_KEY_IMPERSONATE_USER not in self.request.session
182        ):
183            should_update_session = True
184        try:
185            self.update_user(user)
186        except ValidationError as exc:
187            self.logger.warning("failed to update user", exc=exc)
188            return self.executor.stage_invalid(_("Failed to update user. Please try again later."))
189        # Extra check to prevent flows from saving a user with a blank username
190        if user.username == "":
191            self.logger.warning("Aborting write to empty username", user=user)
192            return self.executor.stage_invalid()
193        try:
194            with transaction.atomic():
195                user.save()
196                if self.executor.current_stage.create_users_group:
197                    user.groups.add(self.executor.current_stage.create_users_group)
198                if PLAN_CONTEXT_GROUPS in self.executor.plan.context:
199                    user.groups.add(*self.executor.plan.context[PLAN_CONTEXT_GROUPS])
200        except (IntegrityError, ValueError, TypeError, InternalError) as exc:
201            self.logger.warning("Failed to save user", exc=exc)
202            return self.executor.stage_invalid(_("Failed to update user. Please try again later."))
203        user_write.send(sender=self, request=request, user=user, data=data, created=user_created)
204        # Check if the password has been updated, and update the session auth hash
205        if should_update_session:
206            update_session_auth_hash(self.request, user)
207            self.logger.debug("Updated session hash", user=user)
208        self.logger.debug(
209            "Updated existing user",
210            user=user,
211            flow_slug=self.executor.flow.slug,
212        )
213        return self.executor.stage_ok()
PLAN_CONTEXT_GROUPS = 'groups'
PLAN_CONTEXT_USER_TYPE = 'user_type'
PLAN_CONTEXT_USER_PATH = 'user_path'
class UserWriteStageView(authentik.flows.stage.StageView):
 33class UserWriteStageView(StageView):
 34    """Finalise Enrollment flow by creating a user object."""
 35
 36    def __init__(self, executor: FlowExecutorView, **kwargs):
 37        super().__init__(executor, **kwargs)
 38        self.disallowed_user_attributes = [
 39            "groups",
 40            # Block attribute writes that would otherwise land on the model's
 41            # primary key. An IdP that returns an `id` claim (mocksaml is one
 42            # example) used to crash the enrollment flow with
 43            # ValueError: Field 'id' expected a number but got '<hex>'
 44            # because hasattr(user, "id") is true and setattr(user, "id", ...)
 45            # was taken unchecked. See #21580.
 46            "id",
 47            "pk",
 48        ]
 49
 50    @staticmethod
 51    def write_attribute(user: User, key: str, value: Any):
 52        """Allow use of attributes.foo.bar when writing to a user, with full
 53        recursion"""
 54        parts = key.replace("attributes_", "attributes.", 1).split(".")
 55        if len(parts) < 1:  # pragma: no cover
 56            return
 57        # Function will always be called with a key like attributes.
 58        # this is just a sanity check to ensure that is removed
 59        if parts[0] == "attributes":
 60            parts = parts[1:]
 61        set_path_in_dict(user.attributes, ".".join(parts), sanitize_item(value))
 62
 63    def ensure_user(self) -> tuple[User | None, bool]:
 64        """Ensure a user exists"""
 65        user_created = False
 66        path = self.executor.plan.context.get(
 67            PLAN_CONTEXT_USER_PATH, self.executor.current_stage.user_path_template
 68        )
 69        if path == "":
 70            path = User.default_path()
 71
 72        try:
 73            user_type = UserTypes(
 74                self.executor.plan.context.get(
 75                    PLAN_CONTEXT_USER_TYPE,
 76                    self.executor.current_stage.user_type,
 77                )
 78            )
 79        except ValueError:
 80            user_type = self.executor.current_stage.user_type
 81        if user_type == UserTypes.INTERNAL_SERVICE_ACCOUNT:
 82            user_type = UserTypes.SERVICE_ACCOUNT
 83
 84        if not self.request.user.is_anonymous:
 85            self.executor.plan.context.setdefault(PLAN_CONTEXT_PENDING_USER, self.request.user)
 86        if (
 87            PLAN_CONTEXT_PENDING_USER not in self.executor.plan.context
 88            or self.executor.current_stage.user_creation_mode == UserCreationMode.ALWAYS_CREATE
 89        ):
 90            if self.executor.current_stage.user_creation_mode == UserCreationMode.NEVER_CREATE:
 91                return None, False
 92            self.executor.plan.context[PLAN_CONTEXT_PENDING_USER] = User(
 93                is_active=not self.executor.current_stage.create_users_as_inactive,
 94                path=path,
 95                type=user_type,
 96            )
 97            self.executor.plan.context[PLAN_CONTEXT_AUTHENTICATION_BACKEND] = BACKEND_INBUILT
 98            self.logger.debug(
 99                "Created new user",
100                flow_slug=self.executor.flow.slug,
101            )
102            user_created = True
103        user: User = self.executor.plan.context[PLAN_CONTEXT_PENDING_USER]
104        return user, user_created
105
106    def update_user(self, user: User):
107        """Update `user` with data from plan context
108
109        Only simple attributes are updated, nothing which requires a foreign key or m2m"""
110        data: dict = self.executor.plan.context[PLAN_CONTEXT_PROMPT]
111        # This is always sent back but not written to the user
112        data.pop("component", None)
113        for key, value in data.items():
114            setter_name = f"set_{key}"
115            # Check if user has a setter for this key, like set_password
116            if key == "password":
117                user.set_password(value, request=self.request)
118            elif hasattr(user, setter_name):
119                setter = getattr(user, setter_name)
120                if callable(setter):
121                    setter(value)
122            elif key in self.disallowed_user_attributes:
123                self.logger.info("discarding key", key=key)
124                continue
125            # For exact attributes match, update the dictionary in place
126            elif key == "attributes":
127                if isinstance(value, dict):
128                    user.attributes.update(sanitize_dict(value))
129                else:
130                    raise ValidationError("Attempt to overwrite complete attributes")
131            # If using dot notation, use the correct helper to update the nested value
132            elif key.startswith("attributes.") or key.startswith("attributes_"):
133                UserWriteStageView.write_attribute(user, key, value)
134            # User has this key already
135            elif hasattr(user, key):
136                if isinstance(user, SimpleLazyObject):
137                    user._setup()
138                    user = user._wrapped
139                attr = getattr(type(user), key)
140                if isinstance(attr, property):
141                    if not attr.fset:
142                        self.logger.info("discarding key", key=key)
143                        continue
144                setattr(user, key, value)
145            # If none of the cases above matched, we have an attribute that the user doesn't have,
146            # has no setter for, is not a nested attributes value and as such is invalid
147            else:
148                self.logger.info("discarding key", key=key)
149                continue
150        # Check if we're writing from a source, and save the source to the attributes
151        if PLAN_CONTEXT_SOURCES_CONNECTION in self.executor.plan.context:
152            if USER_ATTRIBUTE_SOURCES not in user.attributes or not isinstance(
153                user.attributes.get(USER_ATTRIBUTE_SOURCES), list
154            ):
155                user.attributes[USER_ATTRIBUTE_SOURCES] = []
156            connection: UserSourceConnection = self.executor.plan.context[
157                PLAN_CONTEXT_SOURCES_CONNECTION
158            ]
159            if connection.source.name not in user.attributes[USER_ATTRIBUTE_SOURCES]:
160                user.attributes[USER_ATTRIBUTE_SOURCES].append(connection.source.name)
161
162    def dispatch(self, request: HttpRequest) -> HttpResponse:
163        """Save data in the current flow to the currently pending user. If no user is pending,
164        a new user is created."""
165        if PLAN_CONTEXT_PROMPT not in self.executor.plan.context:
166            message = _("No Pending data.")
167            self.logger.debug(message)
168            return self.executor.stage_invalid(message)
169        data = self.executor.plan.context[PLAN_CONTEXT_PROMPT]
170        user, user_created = self.ensure_user()
171        if not user:
172            message = _("No user found and can't create new user.")
173            self.logger.info(message)
174            return self.executor.stage_invalid(message)
175        # Before we change anything, check if the user is the same as in the request
176        # and we're updating a password. In that case we need to update the session hash
177        # Also check that we're not currently impersonating, so we don't update the session
178        should_update_session = False
179        if (
180            any("password" in x for x in data.keys())
181            and self.request.user.pk == user.pk
182            and SESSION_KEY_IMPERSONATE_USER not in self.request.session
183        ):
184            should_update_session = True
185        try:
186            self.update_user(user)
187        except ValidationError as exc:
188            self.logger.warning("failed to update user", exc=exc)
189            return self.executor.stage_invalid(_("Failed to update user. Please try again later."))
190        # Extra check to prevent flows from saving a user with a blank username
191        if user.username == "":
192            self.logger.warning("Aborting write to empty username", user=user)
193            return self.executor.stage_invalid()
194        try:
195            with transaction.atomic():
196                user.save()
197                if self.executor.current_stage.create_users_group:
198                    user.groups.add(self.executor.current_stage.create_users_group)
199                if PLAN_CONTEXT_GROUPS in self.executor.plan.context:
200                    user.groups.add(*self.executor.plan.context[PLAN_CONTEXT_GROUPS])
201        except (IntegrityError, ValueError, TypeError, InternalError) as exc:
202            self.logger.warning("Failed to save user", exc=exc)
203            return self.executor.stage_invalid(_("Failed to update user. Please try again later."))
204        user_write.send(sender=self, request=request, user=user, data=data, created=user_created)
205        # Check if the password has been updated, and update the session auth hash
206        if should_update_session:
207            update_session_auth_hash(self.request, user)
208            self.logger.debug("Updated session hash", user=user)
209        self.logger.debug(
210            "Updated existing user",
211            user=user,
212            flow_slug=self.executor.flow.slug,
213        )
214        return self.executor.stage_ok()

Finalise Enrollment flow by creating a user object.

UserWriteStageView(executor: authentik.flows.views.executor.FlowExecutorView, **kwargs)
36    def __init__(self, executor: FlowExecutorView, **kwargs):
37        super().__init__(executor, **kwargs)
38        self.disallowed_user_attributes = [
39            "groups",
40            # Block attribute writes that would otherwise land on the model's
41            # primary key. An IdP that returns an `id` claim (mocksaml is one
42            # example) used to crash the enrollment flow with
43            # ValueError: Field 'id' expected a number but got '<hex>'
44            # because hasattr(user, "id") is true and setattr(user, "id", ...)
45            # was taken unchecked. See #21580.
46            "id",
47            "pk",
48        ]

Constructor. Called in the URLconf; can contain helpful extra keyword arguments, and other things.

disallowed_user_attributes
@staticmethod
def write_attribute(user: authentik.core.models.User, key: str, value: Any):
50    @staticmethod
51    def write_attribute(user: User, key: str, value: Any):
52        """Allow use of attributes.foo.bar when writing to a user, with full
53        recursion"""
54        parts = key.replace("attributes_", "attributes.", 1).split(".")
55        if len(parts) < 1:  # pragma: no cover
56            return
57        # Function will always be called with a key like attributes.
58        # this is just a sanity check to ensure that is removed
59        if parts[0] == "attributes":
60            parts = parts[1:]
61        set_path_in_dict(user.attributes, ".".join(parts), sanitize_item(value))

Allow use of attributes.foo.bar when writing to a user, with full recursion

def ensure_user(self) -> tuple[authentik.core.models.User | None, bool]:
 63    def ensure_user(self) -> tuple[User | None, bool]:
 64        """Ensure a user exists"""
 65        user_created = False
 66        path = self.executor.plan.context.get(
 67            PLAN_CONTEXT_USER_PATH, self.executor.current_stage.user_path_template
 68        )
 69        if path == "":
 70            path = User.default_path()
 71
 72        try:
 73            user_type = UserTypes(
 74                self.executor.plan.context.get(
 75                    PLAN_CONTEXT_USER_TYPE,
 76                    self.executor.current_stage.user_type,
 77                )
 78            )
 79        except ValueError:
 80            user_type = self.executor.current_stage.user_type
 81        if user_type == UserTypes.INTERNAL_SERVICE_ACCOUNT:
 82            user_type = UserTypes.SERVICE_ACCOUNT
 83
 84        if not self.request.user.is_anonymous:
 85            self.executor.plan.context.setdefault(PLAN_CONTEXT_PENDING_USER, self.request.user)
 86        if (
 87            PLAN_CONTEXT_PENDING_USER not in self.executor.plan.context
 88            or self.executor.current_stage.user_creation_mode == UserCreationMode.ALWAYS_CREATE
 89        ):
 90            if self.executor.current_stage.user_creation_mode == UserCreationMode.NEVER_CREATE:
 91                return None, False
 92            self.executor.plan.context[PLAN_CONTEXT_PENDING_USER] = User(
 93                is_active=not self.executor.current_stage.create_users_as_inactive,
 94                path=path,
 95                type=user_type,
 96            )
 97            self.executor.plan.context[PLAN_CONTEXT_AUTHENTICATION_BACKEND] = BACKEND_INBUILT
 98            self.logger.debug(
 99                "Created new user",
100                flow_slug=self.executor.flow.slug,
101            )
102            user_created = True
103        user: User = self.executor.plan.context[PLAN_CONTEXT_PENDING_USER]
104        return user, user_created

Ensure a user exists

def update_user(self, user: authentik.core.models.User):
106    def update_user(self, user: User):
107        """Update `user` with data from plan context
108
109        Only simple attributes are updated, nothing which requires a foreign key or m2m"""
110        data: dict = self.executor.plan.context[PLAN_CONTEXT_PROMPT]
111        # This is always sent back but not written to the user
112        data.pop("component", None)
113        for key, value in data.items():
114            setter_name = f"set_{key}"
115            # Check if user has a setter for this key, like set_password
116            if key == "password":
117                user.set_password(value, request=self.request)
118            elif hasattr(user, setter_name):
119                setter = getattr(user, setter_name)
120                if callable(setter):
121                    setter(value)
122            elif key in self.disallowed_user_attributes:
123                self.logger.info("discarding key", key=key)
124                continue
125            # For exact attributes match, update the dictionary in place
126            elif key == "attributes":
127                if isinstance(value, dict):
128                    user.attributes.update(sanitize_dict(value))
129                else:
130                    raise ValidationError("Attempt to overwrite complete attributes")
131            # If using dot notation, use the correct helper to update the nested value
132            elif key.startswith("attributes.") or key.startswith("attributes_"):
133                UserWriteStageView.write_attribute(user, key, value)
134            # User has this key already
135            elif hasattr(user, key):
136                if isinstance(user, SimpleLazyObject):
137                    user._setup()
138                    user = user._wrapped
139                attr = getattr(type(user), key)
140                if isinstance(attr, property):
141                    if not attr.fset:
142                        self.logger.info("discarding key", key=key)
143                        continue
144                setattr(user, key, value)
145            # If none of the cases above matched, we have an attribute that the user doesn't have,
146            # has no setter for, is not a nested attributes value and as such is invalid
147            else:
148                self.logger.info("discarding key", key=key)
149                continue
150        # Check if we're writing from a source, and save the source to the attributes
151        if PLAN_CONTEXT_SOURCES_CONNECTION in self.executor.plan.context:
152            if USER_ATTRIBUTE_SOURCES not in user.attributes or not isinstance(
153                user.attributes.get(USER_ATTRIBUTE_SOURCES), list
154            ):
155                user.attributes[USER_ATTRIBUTE_SOURCES] = []
156            connection: UserSourceConnection = self.executor.plan.context[
157                PLAN_CONTEXT_SOURCES_CONNECTION
158            ]
159            if connection.source.name not in user.attributes[USER_ATTRIBUTE_SOURCES]:
160                user.attributes[USER_ATTRIBUTE_SOURCES].append(connection.source.name)

Update user with data from plan context

Only simple attributes are updated, nothing which requires a foreign key or m2m

def dispatch( self, request: django.http.request.HttpRequest) -> django.http.response.HttpResponse:
162    def dispatch(self, request: HttpRequest) -> HttpResponse:
163        """Save data in the current flow to the currently pending user. If no user is pending,
164        a new user is created."""
165        if PLAN_CONTEXT_PROMPT not in self.executor.plan.context:
166            message = _("No Pending data.")
167            self.logger.debug(message)
168            return self.executor.stage_invalid(message)
169        data = self.executor.plan.context[PLAN_CONTEXT_PROMPT]
170        user, user_created = self.ensure_user()
171        if not user:
172            message = _("No user found and can't create new user.")
173            self.logger.info(message)
174            return self.executor.stage_invalid(message)
175        # Before we change anything, check if the user is the same as in the request
176        # and we're updating a password. In that case we need to update the session hash
177        # Also check that we're not currently impersonating, so we don't update the session
178        should_update_session = False
179        if (
180            any("password" in x for x in data.keys())
181            and self.request.user.pk == user.pk
182            and SESSION_KEY_IMPERSONATE_USER not in self.request.session
183        ):
184            should_update_session = True
185        try:
186            self.update_user(user)
187        except ValidationError as exc:
188            self.logger.warning("failed to update user", exc=exc)
189            return self.executor.stage_invalid(_("Failed to update user. Please try again later."))
190        # Extra check to prevent flows from saving a user with a blank username
191        if user.username == "":
192            self.logger.warning("Aborting write to empty username", user=user)
193            return self.executor.stage_invalid()
194        try:
195            with transaction.atomic():
196                user.save()
197                if self.executor.current_stage.create_users_group:
198                    user.groups.add(self.executor.current_stage.create_users_group)
199                if PLAN_CONTEXT_GROUPS in self.executor.plan.context:
200                    user.groups.add(*self.executor.plan.context[PLAN_CONTEXT_GROUPS])
201        except (IntegrityError, ValueError, TypeError, InternalError) as exc:
202            self.logger.warning("Failed to save user", exc=exc)
203            return self.executor.stage_invalid(_("Failed to update user. Please try again later."))
204        user_write.send(sender=self, request=request, user=user, data=data, created=user_created)
205        # Check if the password has been updated, and update the session auth hash
206        if should_update_session:
207            update_session_auth_hash(self.request, user)
208            self.logger.debug("Updated session hash", user=user)
209        self.logger.debug(
210            "Updated existing user",
211            user=user,
212            flow_slug=self.executor.flow.slug,
213        )
214        return self.executor.stage_ok()

Save data in the current flow to the currently pending user. If no user is pending, a new user is created.