authentik.core.sources.flow_manager

Source decision helper

  1"""Source decision helper"""
  2
  3from typing import Any
  4
  5from django.contrib import messages
  6from django.db import IntegrityError, transaction
  7from django.http import HttpRequest, HttpResponse
  8from django.shortcuts import redirect
  9from django.urls import reverse
 10from django.utils.translation import gettext as _
 11from structlog.stdlib import get_logger
 12
 13from authentik.core.models import (
 14    Group,
 15    GroupSourceConnection,
 16    Source,
 17    User,
 18    UserSourceConnection,
 19)
 20from authentik.core.sources.mapper import SourceMapper
 21from authentik.core.sources.matcher import Action, SourceMatcher
 22from authentik.core.sources.stage import (
 23    PLAN_CONTEXT_SOURCES_CONNECTION,
 24    PostSourceStage,
 25)
 26from authentik.events.models import Event, EventAction
 27from authentik.flows.exceptions import FlowNonApplicableException
 28from authentik.flows.models import Flow, FlowToken, Stage, in_memory_stage
 29from authentik.flows.planner import (
 30    PLAN_CONTEXT_IS_RESTORED,
 31    PLAN_CONTEXT_PENDING_USER,
 32    PLAN_CONTEXT_REDIRECT,
 33    PLAN_CONTEXT_SOURCE,
 34    PLAN_CONTEXT_SSO,
 35    FlowPlanner,
 36)
 37from authentik.flows.stage import StageView
 38from authentik.flows.views.executor import NEXT_ARG_NAME, SESSION_KEY_GET
 39from authentik.lib.views import bad_request_message
 40from authentik.policies.denied import AccessDeniedResponse
 41from authentik.policies.utils import delete_none_values
 42from authentik.stages.password import BACKEND_INBUILT
 43from authentik.stages.password.stage import PLAN_CONTEXT_AUTHENTICATION_BACKEND
 44from authentik.stages.prompt.stage import PLAN_CONTEXT_PROMPT
 45from authentik.stages.user_write.stage import PLAN_CONTEXT_USER_PATH
 46
 47LOGGER = get_logger()
 48
 49PLAN_CONTEXT_SOURCE_GROUPS = "source_groups"
 50SESSION_KEY_SOURCE_FLOW_STAGES = "authentik/flows/source_flow_stages"
 51SESSION_KEY_SOURCE_FLOW_CONTEXT = "authentik/flows/source_flow_context"
 52SESSION_KEY_OVERRIDE_FLOW_TOKEN = "authentik/flows/source_override_flow_token"  # nosec
 53
 54
 55class MessageStage(StageView):
 56    """Show a pre-configured message after the flow is done"""
 57
 58    def dispatch(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
 59        """Show a pre-configured message after the flow is done"""
 60        message = getattr(self.executor.current_stage, "message", "")
 61        level = getattr(self.executor.current_stage, "level", messages.SUCCESS)
 62        messages.add_message(
 63            self.request,
 64            level,
 65            message,
 66        )
 67        return self.executor.stage_ok()
 68
 69
 70class SourceFlowManager:
 71    """Help sources decide what they should do after authorization. Based on source settings and
 72    previous connections, authenticate the user, enroll a new user, link to an existing user
 73    or deny the request."""
 74
 75    source: Source
 76    mapper: SourceMapper
 77    matcher: SourceMatcher
 78    request: HttpRequest
 79
 80    identifier: str
 81
 82    user_connection_type: type[UserSourceConnection]
 83    group_connection_type: type[GroupSourceConnection]
 84
 85    user_info: dict[str, Any]
 86    policy_context: dict[str, Any]
 87    user_properties: dict[str, Any | dict[str, Any]]
 88    groups_properties: dict[str, dict[str, Any | dict[str, Any]]]
 89
 90    def __init__(
 91        self,
 92        source: Source,
 93        request: HttpRequest,
 94        identifier: str,
 95        user_info: dict[str, Any],
 96        policy_context: dict[str, Any],
 97    ) -> None:
 98        self.source = source
 99        self.mapper = SourceMapper(self.source)
100        self.matcher = SourceMatcher(
101            self.source, self.user_connection_type, self.group_connection_type
102        )
103        self.request = request
104        self.identifier = identifier
105        self.user_info = user_info
106        self._logger = get_logger().bind(source=source, identifier=identifier)
107        self.policy_context = policy_context
108
109        self.user_properties = self.mapper.build_object_properties(
110            object_type=User, request=request, user=None, **self.user_info
111        )
112        self.groups_properties = {
113            group_id: self.mapper.build_object_properties(
114                object_type=Group,
115                request=request,
116                user=None,
117                group_id=group_id,
118                **self.user_info,
119            )
120            for group_id in self.user_properties.setdefault("groups", [])
121        }
122        del self.user_properties["groups"]
123
124    def get_action(self, **kwargs) -> tuple[Action, UserSourceConnection | None]:  # noqa: PLR0911
125        """decide which action should be taken"""
126        # When request is authenticated, always link
127        if self.request.user.is_authenticated:
128            new_connection = self.user_connection_type(
129                source=self.source, identifier=self.identifier
130            )
131            new_connection.user = self.request.user
132            new_connection = self.update_user_connection(new_connection, **kwargs)
133            if existing := self.user_connection_type.objects.filter(
134                source=self.source, identifier=self.identifier
135            ).first():
136                existing = self.update_user_connection(existing)
137                return Action.AUTH, existing
138            return Action.LINK, new_connection
139
140        action, connection = self.matcher.get_user_action(self.identifier, self.user_properties)
141        if connection:
142            connection = self.update_user_connection(connection, **kwargs)
143        return action, connection
144
145    def update_user_connection(
146        self, connection: UserSourceConnection, **kwargs
147    ) -> UserSourceConnection:  # pragma: no cover
148        """Optionally make changes to the user connection after it is looked up/created."""
149        return connection
150
151    def get_flow(self, **kwargs) -> HttpResponse:
152        """Get the flow response based on user_matching_mode"""
153        try:
154            action, connection = self.get_action(**kwargs)
155        except IntegrityError as exc:
156            self._logger.warning("failed to get action", exc=exc)
157            return redirect(reverse("authentik_core:root-redirect"))
158        self._logger.debug("get_action", action=action, connection=connection)
159        try:
160            if connection:
161                if action == Action.LINK:
162                    self._logger.debug("Linking existing user")
163                    return self.handle_existing_link(connection)
164                if action == Action.AUTH:
165                    self._logger.debug("Handling auth user")
166                    return self.handle_auth(connection)
167                if action == Action.ENROLL:
168                    self._logger.debug("Handling enrollment of new user")
169                    return self.handle_enroll(connection)
170        except FlowNonApplicableException as exc:
171            self._logger.warning("Flow non applicable", exc=exc)
172            return self.error_handler(exc)
173        # Default case, assume deny
174        error = Exception(
175            _(
176                "Request to authenticate with {source} has been denied. Please authenticate "
177                "with the source you've previously signed up with.".format_map(
178                    {"source": self.source.name}
179                )
180            ),
181        )
182        return self.error_handler(error)
183
184    def error_handler(self, error: Exception) -> HttpResponse:
185        """Handle any errors by returning an access denied stage"""
186        response = AccessDeniedResponse(self.request)
187        response.error_message = str(error)
188        if isinstance(error, FlowNonApplicableException):
189            response.policy_result = error.policy_result
190            response.error_message = error.messages
191        return response
192
193    def get_stages_to_append(self, flow: Flow) -> list[Stage]:
194        """Hook to override stages which are appended to the flow"""
195        return [
196            in_memory_stage(PostSourceStage),
197        ]
198
199    def _prepare_flow(
200        self,
201        flow: Flow | None,
202        connection: UserSourceConnection,
203        stages: list[StageView] | None = None,
204        **flow_context,
205    ) -> HttpResponse:
206        """Prepare Authentication Plan, redirect user FlowExecutor"""
207        # Ensure redirect is carried through when user was trying to
208        # authorize application
209        final_redirect = self.request.session.get(SESSION_KEY_GET, {}).get(
210            NEXT_ARG_NAME, "authentik_core:if-user"
211        )
212        flow_context.update(
213            {
214                # Since we authenticate the user by their token, they have no backend set
215                PLAN_CONTEXT_AUTHENTICATION_BACKEND: BACKEND_INBUILT,
216                PLAN_CONTEXT_SSO: True,
217                PLAN_CONTEXT_SOURCE: self.source,
218                PLAN_CONTEXT_SOURCES_CONNECTION: connection,
219                PLAN_CONTEXT_SOURCE_GROUPS: self.groups_properties,
220            }
221        )
222        flow_context.update(self.policy_context)
223        flow_context.setdefault(PLAN_CONTEXT_REDIRECT, final_redirect)
224
225        if not flow:
226            # We only check for the flow token here if we don't have a flow, otherwise we rely on
227            # SESSION_KEY_SOURCE_FLOW_STAGES to delegate the usage of this token and dynamically add
228            # stages that deal with this token to return to another flow
229            if SESSION_KEY_OVERRIDE_FLOW_TOKEN in self.request.session:
230                token: FlowToken = self.request.session.get(SESSION_KEY_OVERRIDE_FLOW_TOKEN)
231                self._logger.info(
232                    "Replacing source flow with overridden flow", flow=token.flow.slug
233                )
234                plan = token.plan
235                plan.context[PLAN_CONTEXT_IS_RESTORED] = token
236                plan.context.update(flow_context)
237                for stage in self.get_stages_to_append(flow):
238                    plan.append_stage(stage)
239                if stages:
240                    for stage in stages:
241                        plan.append_stage(stage)
242                redirect = plan.to_redirect(self.request, token.flow)
243                token.delete()
244                return redirect
245            return bad_request_message(
246                self.request,
247                _("Configured flow does not exist."),
248            )
249        # We run the Flow planner here so we can pass the Pending user in the context
250        planner = FlowPlanner(flow)
251        # We append some stages so the initial flow we get might be empty
252        planner.allow_empty_flows = True
253        planner.use_cache = False
254        plan = planner.plan(self.request, flow_context)
255        for stage in self.get_stages_to_append(flow):
256            plan.append_stage(stage)
257        plan.append_stage(
258            in_memory_stage(GroupUpdateStage, group_connection_type=self.group_connection_type)
259        )
260        if stages:
261            for stage in stages:
262                plan.append_stage(stage)
263        for stage in self.request.session.get(SESSION_KEY_SOURCE_FLOW_STAGES, []):
264            plan.append_stage(stage)
265        plan.context.update(self.request.session.get(SESSION_KEY_SOURCE_FLOW_CONTEXT, {}))
266        return plan.to_redirect(self.request, flow)
267
268    def handle_auth(
269        self,
270        connection: UserSourceConnection,
271    ) -> HttpResponse:
272        """Login user and redirect."""
273        return self._prepare_flow(
274            self.source.authentication_flow,
275            connection,
276            stages=[
277                in_memory_stage(
278                    MessageStage,
279                    message=_(
280                        "Successfully authenticated with {source}!".format_map(
281                            {"source": self.source.name}
282                        )
283                    ),
284                )
285            ],
286            **{
287                PLAN_CONTEXT_PENDING_USER: connection.user,
288                PLAN_CONTEXT_PROMPT: delete_none_values(self.user_properties),
289                PLAN_CONTEXT_USER_PATH: self.source.get_user_path(),
290            },
291        )
292
293    def handle_existing_link(
294        self,
295        connection: UserSourceConnection,
296    ) -> HttpResponse:
297        """Handler when the user was already authenticated and linked an external source
298        to their account."""
299        # When request isn't authenticated we jump straight to auth
300        if not self.request.user.is_authenticated:
301            return self.handle_auth(connection)
302        # When an override flow token exists we actually still use a flow for link
303        # to continue the existing flow we came from
304        if SESSION_KEY_OVERRIDE_FLOW_TOKEN in self.request.session:
305            return self._prepare_flow(None, connection)
306        connection.save()
307        Event.new(
308            EventAction.SOURCE_LINKED,
309            message="Linked Source",
310            source=self.source,
311        ).from_http(self.request)
312        messages.success(
313            self.request,
314            _("Successfully linked {source}!".format_map({"source": self.source.name})),
315        )
316        return redirect(
317            reverse(
318                "authentik_core:if-user",
319            )
320            + "#/settings;page-sources"
321        )
322
323    def handle_enroll(
324        self,
325        connection: UserSourceConnection,
326    ) -> HttpResponse:
327        """User was not authenticated and previous request was not authenticated."""
328        # We run the Flow planner here so we can pass the Pending user in the context
329        if not self.source.enrollment_flow:
330            self._logger.warning("source has no enrollment flow")
331            return bad_request_message(
332                self.request,
333                _("Source is not configured for enrollment."),
334            )
335        return self._prepare_flow(
336            self.source.enrollment_flow,
337            connection,
338            stages=[
339                in_memory_stage(
340                    MessageStage,
341                    message=_(
342                        "Successfully authenticated with {source}!".format_map(
343                            {"source": self.source.name}
344                        )
345                    ),
346                )
347            ],
348            **{
349                PLAN_CONTEXT_PROMPT: delete_none_values(self.user_properties),
350                PLAN_CONTEXT_USER_PATH: self.source.get_user_path(),
351            },
352        )
353
354
355class GroupUpdateStage(StageView):
356    """Dynamically injected stage which updates the user after enrollment/authentication."""
357
358    def handle_group(
359        self, group_id: str, group_properties: dict[str, Any | dict[str, Any]]
360    ) -> Group | None:
361        action, connection = self.matcher.get_group_action(group_id, group_properties)
362        if action == Action.ENROLL:
363            group = Group.objects.create(**group_properties)
364            connection.group = group
365            connection.save()
366            return group
367        elif action in (Action.LINK, Action.AUTH):
368            group = connection.group
369            group.update_attributes(group_properties)
370            connection.save()
371            return group
372
373        return None
374
375    def handle_groups(self) -> bool:
376        self.source: Source = self.executor.plan.context[PLAN_CONTEXT_SOURCE]
377        self.user: User = self.executor.plan.context[PLAN_CONTEXT_PENDING_USER]
378        self.group_connection_type: GroupSourceConnection = (
379            self.executor.current_stage.group_connection_type
380        )
381        self.matcher = SourceMatcher(self.source, None, self.group_connection_type)
382
383        raw_groups: dict[str, dict[str, Any | dict[str, Any]]] = self.executor.plan.context[
384            PLAN_CONTEXT_SOURCE_GROUPS
385        ]
386        groups: list[Group] = []
387
388        for group_id, group_properties in raw_groups.items():
389            group = self.handle_group(group_id, group_properties)
390            if not group:
391                return False
392            groups.append(group)
393
394        with transaction.atomic():
395            self.user.groups.remove(
396                *self.user.groups.filter(groupsourceconnection__source=self.source)
397            )
398            self.user.groups.add(*groups)
399
400        return True
401
402    def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
403        """Stage used after the user has been enrolled to sync their groups from source data"""
404        if self.handle_groups():
405            return self.executor.stage_ok()
406        else:
407            return self.executor.stage_invalid("Failed to update groups. Please try again later.")
408
409    def post(self, request: HttpRequest) -> HttpResponse:
410        """Wrapper for post requests"""
411        return self.get(request)
LOGGER = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
PLAN_CONTEXT_SOURCE_GROUPS = 'source_groups'
SESSION_KEY_SOURCE_FLOW_STAGES = 'authentik/flows/source_flow_stages'
SESSION_KEY_SOURCE_FLOW_CONTEXT = 'authentik/flows/source_flow_context'
SESSION_KEY_OVERRIDE_FLOW_TOKEN = 'authentik/flows/source_override_flow_token'
class MessageStage(authentik.flows.stage.StageView):
56class MessageStage(StageView):
57    """Show a pre-configured message after the flow is done"""
58
59    def dispatch(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
60        """Show a pre-configured message after the flow is done"""
61        message = getattr(self.executor.current_stage, "message", "")
62        level = getattr(self.executor.current_stage, "level", messages.SUCCESS)
63        messages.add_message(
64            self.request,
65            level,
66            message,
67        )
68        return self.executor.stage_ok()

Show a pre-configured message after the flow is done

def dispatch( self, request: django.http.request.HttpRequest, *args, **kwargs) -> django.http.response.HttpResponse:
59    def dispatch(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
60        """Show a pre-configured message after the flow is done"""
61        message = getattr(self.executor.current_stage, "message", "")
62        level = getattr(self.executor.current_stage, "level", messages.SUCCESS)
63        messages.add_message(
64            self.request,
65            level,
66            message,
67        )
68        return self.executor.stage_ok()

Show a pre-configured message after the flow is done

class SourceFlowManager:
 71class SourceFlowManager:
 72    """Help sources decide what they should do after authorization. Based on source settings and
 73    previous connections, authenticate the user, enroll a new user, link to an existing user
 74    or deny the request."""
 75
 76    source: Source
 77    mapper: SourceMapper
 78    matcher: SourceMatcher
 79    request: HttpRequest
 80
 81    identifier: str
 82
 83    user_connection_type: type[UserSourceConnection]
 84    group_connection_type: type[GroupSourceConnection]
 85
 86    user_info: dict[str, Any]
 87    policy_context: dict[str, Any]
 88    user_properties: dict[str, Any | dict[str, Any]]
 89    groups_properties: dict[str, dict[str, Any | dict[str, Any]]]
 90
 91    def __init__(
 92        self,
 93        source: Source,
 94        request: HttpRequest,
 95        identifier: str,
 96        user_info: dict[str, Any],
 97        policy_context: dict[str, Any],
 98    ) -> None:
 99        self.source = source
100        self.mapper = SourceMapper(self.source)
101        self.matcher = SourceMatcher(
102            self.source, self.user_connection_type, self.group_connection_type
103        )
104        self.request = request
105        self.identifier = identifier
106        self.user_info = user_info
107        self._logger = get_logger().bind(source=source, identifier=identifier)
108        self.policy_context = policy_context
109
110        self.user_properties = self.mapper.build_object_properties(
111            object_type=User, request=request, user=None, **self.user_info
112        )
113        self.groups_properties = {
114            group_id: self.mapper.build_object_properties(
115                object_type=Group,
116                request=request,
117                user=None,
118                group_id=group_id,
119                **self.user_info,
120            )
121            for group_id in self.user_properties.setdefault("groups", [])
122        }
123        del self.user_properties["groups"]
124
125    def get_action(self, **kwargs) -> tuple[Action, UserSourceConnection | None]:  # noqa: PLR0911
126        """decide which action should be taken"""
127        # When request is authenticated, always link
128        if self.request.user.is_authenticated:
129            new_connection = self.user_connection_type(
130                source=self.source, identifier=self.identifier
131            )
132            new_connection.user = self.request.user
133            new_connection = self.update_user_connection(new_connection, **kwargs)
134            if existing := self.user_connection_type.objects.filter(
135                source=self.source, identifier=self.identifier
136            ).first():
137                existing = self.update_user_connection(existing)
138                return Action.AUTH, existing
139            return Action.LINK, new_connection
140
141        action, connection = self.matcher.get_user_action(self.identifier, self.user_properties)
142        if connection:
143            connection = self.update_user_connection(connection, **kwargs)
144        return action, connection
145
146    def update_user_connection(
147        self, connection: UserSourceConnection, **kwargs
148    ) -> UserSourceConnection:  # pragma: no cover
149        """Optionally make changes to the user connection after it is looked up/created."""
150        return connection
151
152    def get_flow(self, **kwargs) -> HttpResponse:
153        """Get the flow response based on user_matching_mode"""
154        try:
155            action, connection = self.get_action(**kwargs)
156        except IntegrityError as exc:
157            self._logger.warning("failed to get action", exc=exc)
158            return redirect(reverse("authentik_core:root-redirect"))
159        self._logger.debug("get_action", action=action, connection=connection)
160        try:
161            if connection:
162                if action == Action.LINK:
163                    self._logger.debug("Linking existing user")
164                    return self.handle_existing_link(connection)
165                if action == Action.AUTH:
166                    self._logger.debug("Handling auth user")
167                    return self.handle_auth(connection)
168                if action == Action.ENROLL:
169                    self._logger.debug("Handling enrollment of new user")
170                    return self.handle_enroll(connection)
171        except FlowNonApplicableException as exc:
172            self._logger.warning("Flow non applicable", exc=exc)
173            return self.error_handler(exc)
174        # Default case, assume deny
175        error = Exception(
176            _(
177                "Request to authenticate with {source} has been denied. Please authenticate "
178                "with the source you've previously signed up with.".format_map(
179                    {"source": self.source.name}
180                )
181            ),
182        )
183        return self.error_handler(error)
184
185    def error_handler(self, error: Exception) -> HttpResponse:
186        """Handle any errors by returning an access denied stage"""
187        response = AccessDeniedResponse(self.request)
188        response.error_message = str(error)
189        if isinstance(error, FlowNonApplicableException):
190            response.policy_result = error.policy_result
191            response.error_message = error.messages
192        return response
193
194    def get_stages_to_append(self, flow: Flow) -> list[Stage]:
195        """Hook to override stages which are appended to the flow"""
196        return [
197            in_memory_stage(PostSourceStage),
198        ]
199
200    def _prepare_flow(
201        self,
202        flow: Flow | None,
203        connection: UserSourceConnection,
204        stages: list[StageView] | None = None,
205        **flow_context,
206    ) -> HttpResponse:
207        """Prepare Authentication Plan, redirect user FlowExecutor"""
208        # Ensure redirect is carried through when user was trying to
209        # authorize application
210        final_redirect = self.request.session.get(SESSION_KEY_GET, {}).get(
211            NEXT_ARG_NAME, "authentik_core:if-user"
212        )
213        flow_context.update(
214            {
215                # Since we authenticate the user by their token, they have no backend set
216                PLAN_CONTEXT_AUTHENTICATION_BACKEND: BACKEND_INBUILT,
217                PLAN_CONTEXT_SSO: True,
218                PLAN_CONTEXT_SOURCE: self.source,
219                PLAN_CONTEXT_SOURCES_CONNECTION: connection,
220                PLAN_CONTEXT_SOURCE_GROUPS: self.groups_properties,
221            }
222        )
223        flow_context.update(self.policy_context)
224        flow_context.setdefault(PLAN_CONTEXT_REDIRECT, final_redirect)
225
226        if not flow:
227            # We only check for the flow token here if we don't have a flow, otherwise we rely on
228            # SESSION_KEY_SOURCE_FLOW_STAGES to delegate the usage of this token and dynamically add
229            # stages that deal with this token to return to another flow
230            if SESSION_KEY_OVERRIDE_FLOW_TOKEN in self.request.session:
231                token: FlowToken = self.request.session.get(SESSION_KEY_OVERRIDE_FLOW_TOKEN)
232                self._logger.info(
233                    "Replacing source flow with overridden flow", flow=token.flow.slug
234                )
235                plan = token.plan
236                plan.context[PLAN_CONTEXT_IS_RESTORED] = token
237                plan.context.update(flow_context)
238                for stage in self.get_stages_to_append(flow):
239                    plan.append_stage(stage)
240                if stages:
241                    for stage in stages:
242                        plan.append_stage(stage)
243                redirect = plan.to_redirect(self.request, token.flow)
244                token.delete()
245                return redirect
246            return bad_request_message(
247                self.request,
248                _("Configured flow does not exist."),
249            )
250        # We run the Flow planner here so we can pass the Pending user in the context
251        planner = FlowPlanner(flow)
252        # We append some stages so the initial flow we get might be empty
253        planner.allow_empty_flows = True
254        planner.use_cache = False
255        plan = planner.plan(self.request, flow_context)
256        for stage in self.get_stages_to_append(flow):
257            plan.append_stage(stage)
258        plan.append_stage(
259            in_memory_stage(GroupUpdateStage, group_connection_type=self.group_connection_type)
260        )
261        if stages:
262            for stage in stages:
263                plan.append_stage(stage)
264        for stage in self.request.session.get(SESSION_KEY_SOURCE_FLOW_STAGES, []):
265            plan.append_stage(stage)
266        plan.context.update(self.request.session.get(SESSION_KEY_SOURCE_FLOW_CONTEXT, {}))
267        return plan.to_redirect(self.request, flow)
268
269    def handle_auth(
270        self,
271        connection: UserSourceConnection,
272    ) -> HttpResponse:
273        """Login user and redirect."""
274        return self._prepare_flow(
275            self.source.authentication_flow,
276            connection,
277            stages=[
278                in_memory_stage(
279                    MessageStage,
280                    message=_(
281                        "Successfully authenticated with {source}!".format_map(
282                            {"source": self.source.name}
283                        )
284                    ),
285                )
286            ],
287            **{
288                PLAN_CONTEXT_PENDING_USER: connection.user,
289                PLAN_CONTEXT_PROMPT: delete_none_values(self.user_properties),
290                PLAN_CONTEXT_USER_PATH: self.source.get_user_path(),
291            },
292        )
293
294    def handle_existing_link(
295        self,
296        connection: UserSourceConnection,
297    ) -> HttpResponse:
298        """Handler when the user was already authenticated and linked an external source
299        to their account."""
300        # When request isn't authenticated we jump straight to auth
301        if not self.request.user.is_authenticated:
302            return self.handle_auth(connection)
303        # When an override flow token exists we actually still use a flow for link
304        # to continue the existing flow we came from
305        if SESSION_KEY_OVERRIDE_FLOW_TOKEN in self.request.session:
306            return self._prepare_flow(None, connection)
307        connection.save()
308        Event.new(
309            EventAction.SOURCE_LINKED,
310            message="Linked Source",
311            source=self.source,
312        ).from_http(self.request)
313        messages.success(
314            self.request,
315            _("Successfully linked {source}!".format_map({"source": self.source.name})),
316        )
317        return redirect(
318            reverse(
319                "authentik_core:if-user",
320            )
321            + "#/settings;page-sources"
322        )
323
324    def handle_enroll(
325        self,
326        connection: UserSourceConnection,
327    ) -> HttpResponse:
328        """User was not authenticated and previous request was not authenticated."""
329        # We run the Flow planner here so we can pass the Pending user in the context
330        if not self.source.enrollment_flow:
331            self._logger.warning("source has no enrollment flow")
332            return bad_request_message(
333                self.request,
334                _("Source is not configured for enrollment."),
335            )
336        return self._prepare_flow(
337            self.source.enrollment_flow,
338            connection,
339            stages=[
340                in_memory_stage(
341                    MessageStage,
342                    message=_(
343                        "Successfully authenticated with {source}!".format_map(
344                            {"source": self.source.name}
345                        )
346                    ),
347                )
348            ],
349            **{
350                PLAN_CONTEXT_PROMPT: delete_none_values(self.user_properties),
351                PLAN_CONTEXT_USER_PATH: self.source.get_user_path(),
352            },
353        )

Help sources decide what they should do after authorization. Based on source settings and previous connections, authenticate the user, enroll a new user, link to an existing user or deny the request.

SourceFlowManager( source: authentik.core.models.Source, request: django.http.request.HttpRequest, identifier: str, user_info: dict[str, typing.Any], policy_context: dict[str, typing.Any])
 91    def __init__(
 92        self,
 93        source: Source,
 94        request: HttpRequest,
 95        identifier: str,
 96        user_info: dict[str, Any],
 97        policy_context: dict[str, Any],
 98    ) -> None:
 99        self.source = source
100        self.mapper = SourceMapper(self.source)
101        self.matcher = SourceMatcher(
102            self.source, self.user_connection_type, self.group_connection_type
103        )
104        self.request = request
105        self.identifier = identifier
106        self.user_info = user_info
107        self._logger = get_logger().bind(source=source, identifier=identifier)
108        self.policy_context = policy_context
109
110        self.user_properties = self.mapper.build_object_properties(
111            object_type=User, request=request, user=None, **self.user_info
112        )
113        self.groups_properties = {
114            group_id: self.mapper.build_object_properties(
115                object_type=Group,
116                request=request,
117                user=None,
118                group_id=group_id,
119                **self.user_info,
120            )
121            for group_id in self.user_properties.setdefault("groups", [])
122        }
123        del self.user_properties["groups"]
request: django.http.request.HttpRequest
identifier: str
user_connection_type: type[authentik.core.models.UserSourceConnection]
group_connection_type: type[authentik.core.models.GroupSourceConnection]
user_info: dict[str, typing.Any]
policy_context: dict[str, typing.Any]
user_properties: dict[str, typing.Any | dict[str, typing.Any]]
groups_properties: dict[str, dict[str, typing.Any | dict[str, typing.Any]]]
def get_action( self, **kwargs) -> tuple[authentik.core.sources.matcher.Action, authentik.core.models.UserSourceConnection | None]:
125    def get_action(self, **kwargs) -> tuple[Action, UserSourceConnection | None]:  # noqa: PLR0911
126        """decide which action should be taken"""
127        # When request is authenticated, always link
128        if self.request.user.is_authenticated:
129            new_connection = self.user_connection_type(
130                source=self.source, identifier=self.identifier
131            )
132            new_connection.user = self.request.user
133            new_connection = self.update_user_connection(new_connection, **kwargs)
134            if existing := self.user_connection_type.objects.filter(
135                source=self.source, identifier=self.identifier
136            ).first():
137                existing = self.update_user_connection(existing)
138                return Action.AUTH, existing
139            return Action.LINK, new_connection
140
141        action, connection = self.matcher.get_user_action(self.identifier, self.user_properties)
142        if connection:
143            connection = self.update_user_connection(connection, **kwargs)
144        return action, connection

decide which action should be taken

def update_user_connection( self, connection: authentik.core.models.UserSourceConnection, **kwargs) -> authentik.core.models.UserSourceConnection:
146    def update_user_connection(
147        self, connection: UserSourceConnection, **kwargs
148    ) -> UserSourceConnection:  # pragma: no cover
149        """Optionally make changes to the user connection after it is looked up/created."""
150        return connection

Optionally make changes to the user connection after it is looked up/created.

def get_flow(self, **kwargs) -> django.http.response.HttpResponse:
152    def get_flow(self, **kwargs) -> HttpResponse:
153        """Get the flow response based on user_matching_mode"""
154        try:
155            action, connection = self.get_action(**kwargs)
156        except IntegrityError as exc:
157            self._logger.warning("failed to get action", exc=exc)
158            return redirect(reverse("authentik_core:root-redirect"))
159        self._logger.debug("get_action", action=action, connection=connection)
160        try:
161            if connection:
162                if action == Action.LINK:
163                    self._logger.debug("Linking existing user")
164                    return self.handle_existing_link(connection)
165                if action == Action.AUTH:
166                    self._logger.debug("Handling auth user")
167                    return self.handle_auth(connection)
168                if action == Action.ENROLL:
169                    self._logger.debug("Handling enrollment of new user")
170                    return self.handle_enroll(connection)
171        except FlowNonApplicableException as exc:
172            self._logger.warning("Flow non applicable", exc=exc)
173            return self.error_handler(exc)
174        # Default case, assume deny
175        error = Exception(
176            _(
177                "Request to authenticate with {source} has been denied. Please authenticate "
178                "with the source you've previously signed up with.".format_map(
179                    {"source": self.source.name}
180                )
181            ),
182        )
183        return self.error_handler(error)

Get the flow response based on user_matching_mode

def error_handler(self, error: Exception) -> django.http.response.HttpResponse:
185    def error_handler(self, error: Exception) -> HttpResponse:
186        """Handle any errors by returning an access denied stage"""
187        response = AccessDeniedResponse(self.request)
188        response.error_message = str(error)
189        if isinstance(error, FlowNonApplicableException):
190            response.policy_result = error.policy_result
191            response.error_message = error.messages
192        return response

Handle any errors by returning an access denied stage

def get_stages_to_append( self, flow: authentik.flows.models.Flow) -> list[authentik.flows.models.Stage]:
194    def get_stages_to_append(self, flow: Flow) -> list[Stage]:
195        """Hook to override stages which are appended to the flow"""
196        return [
197            in_memory_stage(PostSourceStage),
198        ]

Hook to override stages which are appended to the flow

def handle_auth( self, connection: authentik.core.models.UserSourceConnection) -> django.http.response.HttpResponse:
269    def handle_auth(
270        self,
271        connection: UserSourceConnection,
272    ) -> HttpResponse:
273        """Login user and redirect."""
274        return self._prepare_flow(
275            self.source.authentication_flow,
276            connection,
277            stages=[
278                in_memory_stage(
279                    MessageStage,
280                    message=_(
281                        "Successfully authenticated with {source}!".format_map(
282                            {"source": self.source.name}
283                        )
284                    ),
285                )
286            ],
287            **{
288                PLAN_CONTEXT_PENDING_USER: connection.user,
289                PLAN_CONTEXT_PROMPT: delete_none_values(self.user_properties),
290                PLAN_CONTEXT_USER_PATH: self.source.get_user_path(),
291            },
292        )

Login user and redirect.

def handle_enroll( self, connection: authentik.core.models.UserSourceConnection) -> django.http.response.HttpResponse:
324    def handle_enroll(
325        self,
326        connection: UserSourceConnection,
327    ) -> HttpResponse:
328        """User was not authenticated and previous request was not authenticated."""
329        # We run the Flow planner here so we can pass the Pending user in the context
330        if not self.source.enrollment_flow:
331            self._logger.warning("source has no enrollment flow")
332            return bad_request_message(
333                self.request,
334                _("Source is not configured for enrollment."),
335            )
336        return self._prepare_flow(
337            self.source.enrollment_flow,
338            connection,
339            stages=[
340                in_memory_stage(
341                    MessageStage,
342                    message=_(
343                        "Successfully authenticated with {source}!".format_map(
344                            {"source": self.source.name}
345                        )
346                    ),
347                )
348            ],
349            **{
350                PLAN_CONTEXT_PROMPT: delete_none_values(self.user_properties),
351                PLAN_CONTEXT_USER_PATH: self.source.get_user_path(),
352            },
353        )

User was not authenticated and previous request was not authenticated.

class GroupUpdateStage(authentik.flows.stage.StageView):
356class GroupUpdateStage(StageView):
357    """Dynamically injected stage which updates the user after enrollment/authentication."""
358
359    def handle_group(
360        self, group_id: str, group_properties: dict[str, Any | dict[str, Any]]
361    ) -> Group | None:
362        action, connection = self.matcher.get_group_action(group_id, group_properties)
363        if action == Action.ENROLL:
364            group = Group.objects.create(**group_properties)
365            connection.group = group
366            connection.save()
367            return group
368        elif action in (Action.LINK, Action.AUTH):
369            group = connection.group
370            group.update_attributes(group_properties)
371            connection.save()
372            return group
373
374        return None
375
376    def handle_groups(self) -> bool:
377        self.source: Source = self.executor.plan.context[PLAN_CONTEXT_SOURCE]
378        self.user: User = self.executor.plan.context[PLAN_CONTEXT_PENDING_USER]
379        self.group_connection_type: GroupSourceConnection = (
380            self.executor.current_stage.group_connection_type
381        )
382        self.matcher = SourceMatcher(self.source, None, self.group_connection_type)
383
384        raw_groups: dict[str, dict[str, Any | dict[str, Any]]] = self.executor.plan.context[
385            PLAN_CONTEXT_SOURCE_GROUPS
386        ]
387        groups: list[Group] = []
388
389        for group_id, group_properties in raw_groups.items():
390            group = self.handle_group(group_id, group_properties)
391            if not group:
392                return False
393            groups.append(group)
394
395        with transaction.atomic():
396            self.user.groups.remove(
397                *self.user.groups.filter(groupsourceconnection__source=self.source)
398            )
399            self.user.groups.add(*groups)
400
401        return True
402
403    def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
404        """Stage used after the user has been enrolled to sync their groups from source data"""
405        if self.handle_groups():
406            return self.executor.stage_ok()
407        else:
408            return self.executor.stage_invalid("Failed to update groups. Please try again later.")
409
410    def post(self, request: HttpRequest) -> HttpResponse:
411        """Wrapper for post requests"""
412        return self.get(request)

Dynamically injected stage which updates the user after enrollment/authentication.

def handle_group( self, group_id: str, group_properties: dict[str, typing.Any | dict[str, typing.Any]]) -> authentik.core.models.Group | None:
359    def handle_group(
360        self, group_id: str, group_properties: dict[str, Any | dict[str, Any]]
361    ) -> Group | None:
362        action, connection = self.matcher.get_group_action(group_id, group_properties)
363        if action == Action.ENROLL:
364            group = Group.objects.create(**group_properties)
365            connection.group = group
366            connection.save()
367            return group
368        elif action in (Action.LINK, Action.AUTH):
369            group = connection.group
370            group.update_attributes(group_properties)
371            connection.save()
372            return group
373
374        return None
def handle_groups(self) -> bool:
376    def handle_groups(self) -> bool:
377        self.source: Source = self.executor.plan.context[PLAN_CONTEXT_SOURCE]
378        self.user: User = self.executor.plan.context[PLAN_CONTEXT_PENDING_USER]
379        self.group_connection_type: GroupSourceConnection = (
380            self.executor.current_stage.group_connection_type
381        )
382        self.matcher = SourceMatcher(self.source, None, self.group_connection_type)
383
384        raw_groups: dict[str, dict[str, Any | dict[str, Any]]] = self.executor.plan.context[
385            PLAN_CONTEXT_SOURCE_GROUPS
386        ]
387        groups: list[Group] = []
388
389        for group_id, group_properties in raw_groups.items():
390            group = self.handle_group(group_id, group_properties)
391            if not group:
392                return False
393            groups.append(group)
394
395        with transaction.atomic():
396            self.user.groups.remove(
397                *self.user.groups.filter(groupsourceconnection__source=self.source)
398            )
399            self.user.groups.add(*groups)
400
401        return True
def get( self, request: django.http.request.HttpRequest, *args, **kwargs) -> django.http.response.HttpResponse:
403    def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
404        """Stage used after the user has been enrolled to sync their groups from source data"""
405        if self.handle_groups():
406            return self.executor.stage_ok()
407        else:
408            return self.executor.stage_invalid("Failed to update groups. Please try again later.")

Stage used after the user has been enrolled to sync their groups from source data

def post( self, request: django.http.request.HttpRequest) -> django.http.response.HttpResponse:
410    def post(self, request: HttpRequest) -> HttpResponse:
411        """Wrapper for post requests"""
412        return self.get(request)

Wrapper for post requests