authentik.flows.planner

Flows Planner

  1"""Flows Planner"""
  2
  3from dataclasses import dataclass, field
  4from typing import TYPE_CHECKING, Any
  5
  6from django.core.cache import cache
  7from django.http import HttpRequest, HttpResponse
  8from django.utils.translation import gettext as _
  9from sentry_sdk import start_span
 10from sentry_sdk.tracing import Span
 11from structlog.stdlib import BoundLogger, get_logger
 12
 13from authentik.core.models import User
 14from authentik.events.models import cleanse_dict
 15from authentik.flows.apps import HIST_FLOWS_PLAN_TIME
 16from authentik.flows.exceptions import EmptyFlowException, FlowNonApplicableException
 17from authentik.flows.markers import ReevaluateMarker, StageMarker
 18from authentik.flows.models import (
 19    Flow,
 20    FlowAuthenticationRequirement,
 21    FlowDesignation,
 22    FlowStageBinding,
 23    Stage,
 24    in_memory_stage,
 25)
 26from authentik.lib.config import CONFIG
 27from authentik.lib.utils.urls import redirect_with_qs
 28from authentik.outposts.models import Outpost
 29from authentik.policies.engine import PolicyEngine
 30from authentik.policies.types import PolicyResult
 31from authentik.root.middleware import ClientIPMiddleware
 32
 33if TYPE_CHECKING:
 34    from authentik.flows.stage import StageView
 35
 36
 37LOGGER = get_logger()
 38PLAN_CONTEXT_PENDING_USER = "pending_user"
 39PLAN_CONTEXT_SSO = "is_sso"
 40PLAN_CONTEXT_REDIRECT = "redirect"
 41PLAN_CONTEXT_APPLICATION = "application"
 42PLAN_CONTEXT_DEVICE = "device"
 43PLAN_CONTEXT_SOURCE = "source"
 44PLAN_CONTEXT_OUTPOST = "outpost"
 45PLAN_CONTEXT_POST = "goauthentik.io/http/post"
 46# Is set by the Flow Planner when a FlowToken was used, and the currently active flow plan
 47# was restored.
 48PLAN_CONTEXT_IS_RESTORED = "is_restored"
 49PLAN_CONTEXT_IS_REDIRECTED = "is_redirected"
 50PLAN_CONTEXT_REDIRECT_STAGE_TARGET = "redirect_stage_target"
 51CACHE_TIMEOUT = CONFIG.get_int("cache.timeout_flows")
 52CACHE_PREFIX = "goauthentik.io/flows/planner/"
 53
 54
 55def cache_key(flow: Flow, user: User | None = None) -> str:
 56    """Generate Cache key for flow"""
 57    prefix = CACHE_PREFIX + str(flow.pk)
 58    if user:
 59        prefix += f"#{user.pk}"
 60    return prefix
 61
 62
 63@dataclass(slots=True)
 64class FlowPlan:
 65    """This data-class is the output of a FlowPlanner. It holds a flat list
 66    of all Stages that should be run."""
 67
 68    flow_pk: str
 69
 70    bindings: list[FlowStageBinding] = field(default_factory=list)
 71    context: dict[str, Any] = field(default_factory=dict)
 72    markers: list[StageMarker] = field(default_factory=list)
 73
 74    def append_stage(self, stage: Stage, marker: StageMarker | None = None):
 75        """Append `stage` to the end of the plan, optionally with stage marker"""
 76        return self.append(FlowStageBinding(stage=stage), marker)
 77
 78    def append(self, binding: FlowStageBinding, marker: StageMarker | None = None):
 79        """Append `stage` to the end of the plan, optionally with stage marker"""
 80        self.bindings.append(binding)
 81        self.markers.append(marker or StageMarker())
 82
 83    def insert_stage(self, stage: Stage, marker: StageMarker | None = None, index=1):
 84        """Insert stage into plan, as immediate next stage"""
 85        self.bindings.insert(index, FlowStageBinding(stage=stage, order=0))
 86        self.markers.insert(index, marker or StageMarker())
 87
 88    def redirect(self, destination: str):
 89        """Insert a redirect stage as next stage"""
 90        from authentik.flows.stage import RedirectStage
 91
 92        self.insert_stage(in_memory_stage(RedirectStage, destination=destination))
 93
 94    def next(self, http_request: HttpRequest | None) -> FlowStageBinding | None:
 95        """Return next pending stage from the bottom of the list"""
 96        if not self.has_stages:
 97            return None
 98        binding = self.bindings[0]
 99        marker = self.markers[0]
100
101        if marker.__class__ is not StageMarker:
102            LOGGER.debug("f(plan_inst): stage has marker", binding=binding, marker=marker)
103        marked_stage = marker.process(self, binding, http_request)
104        if not marked_stage:
105            LOGGER.debug("f(plan_inst): marker returned none, next stage", binding=binding)
106            self.bindings.remove(binding)
107            self.markers.remove(marker)
108            if not self.has_stages:
109                return None
110
111            return self.next(http_request)
112        return marked_stage
113
114    def pop(self):
115        """Pop next pending stage from bottom of list"""
116        if not self.markers and not self.bindings:
117            return
118        self.markers.pop(0)
119        self.bindings.pop(0)
120
121    @property
122    def has_stages(self) -> bool:
123        """Check if there are any stages left in this plan"""
124        return len(self.markers) + len(self.bindings) > 0
125
126    def requires_flow_executor(
127        self,
128        allowed_silent_types: list[StageView] | None = None,
129    ):
130        # Check if we actually need to show the Flow executor, or if we can jump straight to the end
131        found_unskippable = True
132        if allowed_silent_types:
133            LOGGER.debug("Checking if we can skip the flow executor...")
134            # Policies applied to the flow have already been evaluated, so we're checking for stages
135            # allow-listed or bindings that require a policy re-eval
136            found_unskippable = False
137            for binding, marker in zip(self.bindings, self.markers, strict=True):
138                if binding.stage.view not in allowed_silent_types:
139                    found_unskippable = True
140                if marker and isinstance(marker, ReevaluateMarker):
141                    found_unskippable = True
142        LOGGER.debug("Required flow executor status", status=found_unskippable)
143        return found_unskippable
144
145    def to_redirect(
146        self,
147        request: HttpRequest,
148        flow: Flow,
149        next: str | None = None,
150        allowed_silent_types: list[StageView] | None = None,
151    ) -> HttpResponse:
152        """Redirect to the flow executor for this flow plan"""
153        from authentik.flows.views.executor import (
154            NEXT_ARG_NAME,
155            SESSION_KEY_PLAN,
156            FlowExecutorView,
157        )
158
159        request.session[SESSION_KEY_PLAN] = self
160        requires_flow_executor = self.requires_flow_executor(allowed_silent_types)
161
162        if not requires_flow_executor:
163            # No unskippable stages found, so we can directly return the response of the last stage
164            final_stage: type[StageView] = self.bindings[-1].stage.view
165            temp_exec = FlowExecutorView(flow=flow, request=request, plan=self)
166            temp_exec.current_stage = self.bindings[-1].stage
167            temp_exec.current_stage_view = final_stage
168            temp_exec.setup(request, flow.slug)
169            stage = final_stage(request=request, executor=temp_exec)
170            response = stage.dispatch(request)
171            # Ensure we clean the flow state we have in the session before we redirect away
172            temp_exec.stage_ok()
173            return response
174
175        get_qs = request.GET.copy()
176        if request.user.is_authenticated and (
177            # Object-scoped permission or global permission
178            request.user.has_perm("authentik_flows.inspect_flow", flow)
179            or request.user.has_perm("authentik_flows.inspect_flow")
180        ):
181            get_qs["inspector"] = "available"
182        if next:
183            get_qs[NEXT_ARG_NAME] = next
184
185        return redirect_with_qs(
186            "authentik_core:if-flow",
187            get_qs,
188            flow_slug=flow.slug,
189        )
190
191
192class FlowPlanner:
193    """Execute all policies to plan out a flat list of all Stages
194    that should be applied."""
195
196    use_cache: bool
197    allow_empty_flows: bool
198
199    flow: Flow
200
201    _logger: BoundLogger
202
203    def __init__(self, flow: Flow):
204        self.use_cache = True
205        self.allow_empty_flows = False
206        self.flow = flow
207        self._logger = get_logger().bind(flow_slug=flow.slug)
208
209    def _check_authentication(self, request: HttpRequest, context: dict[str, Any]):
210        """Check the flow's authentication level is matched by `request`"""
211        if (
212            self.flow.authentication == FlowAuthenticationRequirement.REQUIRE_AUTHENTICATED
213            and not request.user.is_authenticated
214        ):
215            raise FlowNonApplicableException()
216        if (
217            self.flow.authentication == FlowAuthenticationRequirement.REQUIRE_UNAUTHENTICATED
218            and request.user.is_authenticated
219        ):
220            raise FlowNonApplicableException()
221        if (
222            self.flow.authentication == FlowAuthenticationRequirement.REQUIRE_SUPERUSER
223            and not request.user.is_superuser
224        ):
225            raise FlowNonApplicableException()
226        if (
227            self.flow.authentication == FlowAuthenticationRequirement.REQUIRE_REDIRECT
228            and context.get(PLAN_CONTEXT_IS_REDIRECTED) is None
229        ):
230            raise FlowNonApplicableException()
231        if (
232            self.flow.authentication == FlowAuthenticationRequirement.REQUIRE_TOKEN
233            and context.get(PLAN_CONTEXT_IS_RESTORED) is None
234        ):
235            raise FlowNonApplicableException(
236                PolicyResult(
237                    False, _("This link is invalid or has expired. Please request a new one.")
238                )
239            )
240        outpost_user = ClientIPMiddleware.get_outpost_user(request)
241        if self.flow.authentication == FlowAuthenticationRequirement.REQUIRE_OUTPOST:
242            if not outpost_user:
243                raise FlowNonApplicableException()
244        if outpost_user:
245            outpost = Outpost.objects.filter(
246                # TODO: Since Outpost and user are not directly connected, we have to look up a user
247                # like this. This should ideally by in authentik/outposts/models.py
248                pk=outpost_user.username.replace("ak-outpost-", "")
249            ).first()
250            if outpost:
251                return {
252                    PLAN_CONTEXT_OUTPOST: {
253                        "instance": outpost,
254                    }
255                }
256        return {}
257
258    def plan(self, request: HttpRequest, default_context: dict[str, Any] | None = None) -> FlowPlan:
259        """Check each of the flows' policies, check policies for each stage with PolicyBinding
260        and return ordered list"""
261        with start_span(op="authentik.flow.planner.plan", name=self.flow.slug) as span:
262            span: Span
263            span.set_data("flow", self.flow)
264            span.set_data("request", request)
265
266            self._logger.debug(
267                "f(plan): starting planning process",
268            )
269            context = default_context or {}
270            # Bit of a workaround here, if there is a pending user set in the default context
271            # we use that user for our cache key to make sure they don't get the generic response
272            if context and PLAN_CONTEXT_PENDING_USER in context:
273                user = context[PLAN_CONTEXT_PENDING_USER]
274            else:
275                user = request.user
276
277            context.update(self._check_authentication(request, context))
278            # First off, check the flow's direct policy bindings
279            # to make sure the user even has access to the flow
280            engine = PolicyEngine(self.flow, user, request)
281            engine.use_cache = self.use_cache
282            span.set_data("context", cleanse_dict(context))
283            engine.request.context.update(context)
284            engine.build()
285            result = engine.result
286            if not result.passing:
287                raise FlowNonApplicableException(result)
288            # User is passing so far, check if we have a cached plan
289            cached_plan_key = cache_key(self.flow, user)
290            cached_plan = cache.get(cached_plan_key, None)
291            if self.flow.designation not in [FlowDesignation.STAGE_CONFIGURATION]:
292                if cached_plan and self.use_cache:
293                    self._logger.debug(
294                        "f(plan): taking plan from cache",
295                        key=cached_plan_key,
296                    )
297                    # Reset the context as this isn't factored into caching
298                    cached_plan.context = context
299                    return cached_plan
300            self._logger.debug(
301                "f(plan): building plan",
302            )
303            plan = self._build_plan(user, request, context)
304            if self.use_cache:
305                cache.set(cache_key(self.flow, user), plan, CACHE_TIMEOUT)
306            if not plan.bindings and not self.allow_empty_flows:
307                raise EmptyFlowException()
308            return plan
309
310    def _build_plan(
311        self,
312        user: User,
313        request: HttpRequest,
314        default_context: dict[str, Any] | None,
315    ) -> FlowPlan:
316        """Build flow plan by checking each stage in their respective
317        order and checking the applied policies"""
318        with (
319            start_span(
320                op="authentik.flow.planner.build_plan",
321                name=self.flow.slug,
322            ) as span,
323            HIST_FLOWS_PLAN_TIME.labels(flow_slug=self.flow.slug).time(),
324        ):
325            span: Span
326            span.set_data("flow", self.flow)
327            span.set_data("user", user)
328            span.set_data("request", request)
329
330            plan = FlowPlan(flow_pk=self.flow.pk.hex)
331            if default_context:
332                plan.context = default_context
333            # Check Flow policies
334            bindings = list(
335                FlowStageBinding.objects.filter(target__pk=self.flow.pk).order_by("order")
336            )
337            stages = Stage.objects.filter(flowstagebinding__in=[binding.pk for binding in bindings])
338            for binding in bindings:
339                binding: FlowStageBinding
340                stage = [stage for stage in stages if stage.pk == binding.stage_id][0]
341                marker = StageMarker()
342                if binding.evaluate_on_plan:
343                    self._logger.debug(
344                        "f(plan): evaluating on plan",
345                        stage=stage,
346                    )
347                    engine = PolicyEngine(binding, user, request)
348                    engine.use_cache = self.use_cache
349                    engine.request.context["flow_plan"] = plan
350                    engine.request.context.update(plan.context)
351                    engine.build()
352                    if engine.passing:
353                        self._logger.debug(
354                            "f(plan): stage passing",
355                            stage=stage,
356                        )
357                    else:
358                        stage = None
359                else:
360                    self._logger.debug(
361                        "f(plan): not evaluating on plan",
362                        stage=stage,
363                    )
364                if binding.re_evaluate_policies and stage:
365                    self._logger.debug(
366                        "f(plan): stage has re-evaluate marker",
367                        stage=stage,
368                    )
369                    marker = ReevaluateMarker(binding=binding)
370                if stage:
371                    plan.append(binding, marker)
372        self._logger.debug(
373            "f(plan): finished building",
374        )
375        return plan
LOGGER = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
PLAN_CONTEXT_PENDING_USER = 'pending_user'
PLAN_CONTEXT_SSO = 'is_sso'
PLAN_CONTEXT_REDIRECT = 'redirect'
PLAN_CONTEXT_APPLICATION = 'application'
PLAN_CONTEXT_DEVICE = 'device'
PLAN_CONTEXT_SOURCE = 'source'
PLAN_CONTEXT_OUTPOST = 'outpost'
PLAN_CONTEXT_POST = 'goauthentik.io/http/post'
PLAN_CONTEXT_IS_RESTORED = 'is_restored'
PLAN_CONTEXT_IS_REDIRECTED = 'is_redirected'
PLAN_CONTEXT_REDIRECT_STAGE_TARGET = 'redirect_stage_target'
CACHE_TIMEOUT = 300
CACHE_PREFIX = 'goauthentik.io/flows/planner/'
def cache_key( flow: authentik.flows.models.Flow, user: authentik.core.models.User | None = None) -> str:
56def cache_key(flow: Flow, user: User | None = None) -> str:
57    """Generate Cache key for flow"""
58    prefix = CACHE_PREFIX + str(flow.pk)
59    if user:
60        prefix += f"#{user.pk}"
61    return prefix

Generate Cache key for flow

@dataclass(slots=True)
class FlowPlan:
 64@dataclass(slots=True)
 65class FlowPlan:
 66    """This data-class is the output of a FlowPlanner. It holds a flat list
 67    of all Stages that should be run."""
 68
 69    flow_pk: str
 70
 71    bindings: list[FlowStageBinding] = field(default_factory=list)
 72    context: dict[str, Any] = field(default_factory=dict)
 73    markers: list[StageMarker] = field(default_factory=list)
 74
 75    def append_stage(self, stage: Stage, marker: StageMarker | None = None):
 76        """Append `stage` to the end of the plan, optionally with stage marker"""
 77        return self.append(FlowStageBinding(stage=stage), marker)
 78
 79    def append(self, binding: FlowStageBinding, marker: StageMarker | None = None):
 80        """Append `stage` to the end of the plan, optionally with stage marker"""
 81        self.bindings.append(binding)
 82        self.markers.append(marker or StageMarker())
 83
 84    def insert_stage(self, stage: Stage, marker: StageMarker | None = None, index=1):
 85        """Insert stage into plan, as immediate next stage"""
 86        self.bindings.insert(index, FlowStageBinding(stage=stage, order=0))
 87        self.markers.insert(index, marker or StageMarker())
 88
 89    def redirect(self, destination: str):
 90        """Insert a redirect stage as next stage"""
 91        from authentik.flows.stage import RedirectStage
 92
 93        self.insert_stage(in_memory_stage(RedirectStage, destination=destination))
 94
 95    def next(self, http_request: HttpRequest | None) -> FlowStageBinding | None:
 96        """Return next pending stage from the bottom of the list"""
 97        if not self.has_stages:
 98            return None
 99        binding = self.bindings[0]
100        marker = self.markers[0]
101
102        if marker.__class__ is not StageMarker:
103            LOGGER.debug("f(plan_inst): stage has marker", binding=binding, marker=marker)
104        marked_stage = marker.process(self, binding, http_request)
105        if not marked_stage:
106            LOGGER.debug("f(plan_inst): marker returned none, next stage", binding=binding)
107            self.bindings.remove(binding)
108            self.markers.remove(marker)
109            if not self.has_stages:
110                return None
111
112            return self.next(http_request)
113        return marked_stage
114
115    def pop(self):
116        """Pop next pending stage from bottom of list"""
117        if not self.markers and not self.bindings:
118            return
119        self.markers.pop(0)
120        self.bindings.pop(0)
121
122    @property
123    def has_stages(self) -> bool:
124        """Check if there are any stages left in this plan"""
125        return len(self.markers) + len(self.bindings) > 0
126
127    def requires_flow_executor(
128        self,
129        allowed_silent_types: list[StageView] | None = None,
130    ):
131        # Check if we actually need to show the Flow executor, or if we can jump straight to the end
132        found_unskippable = True
133        if allowed_silent_types:
134            LOGGER.debug("Checking if we can skip the flow executor...")
135            # Policies applied to the flow have already been evaluated, so we're checking for stages
136            # allow-listed or bindings that require a policy re-eval
137            found_unskippable = False
138            for binding, marker in zip(self.bindings, self.markers, strict=True):
139                if binding.stage.view not in allowed_silent_types:
140                    found_unskippable = True
141                if marker and isinstance(marker, ReevaluateMarker):
142                    found_unskippable = True
143        LOGGER.debug("Required flow executor status", status=found_unskippable)
144        return found_unskippable
145
146    def to_redirect(
147        self,
148        request: HttpRequest,
149        flow: Flow,
150        next: str | None = None,
151        allowed_silent_types: list[StageView] | None = None,
152    ) -> HttpResponse:
153        """Redirect to the flow executor for this flow plan"""
154        from authentik.flows.views.executor import (
155            NEXT_ARG_NAME,
156            SESSION_KEY_PLAN,
157            FlowExecutorView,
158        )
159
160        request.session[SESSION_KEY_PLAN] = self
161        requires_flow_executor = self.requires_flow_executor(allowed_silent_types)
162
163        if not requires_flow_executor:
164            # No unskippable stages found, so we can directly return the response of the last stage
165            final_stage: type[StageView] = self.bindings[-1].stage.view
166            temp_exec = FlowExecutorView(flow=flow, request=request, plan=self)
167            temp_exec.current_stage = self.bindings[-1].stage
168            temp_exec.current_stage_view = final_stage
169            temp_exec.setup(request, flow.slug)
170            stage = final_stage(request=request, executor=temp_exec)
171            response = stage.dispatch(request)
172            # Ensure we clean the flow state we have in the session before we redirect away
173            temp_exec.stage_ok()
174            return response
175
176        get_qs = request.GET.copy()
177        if request.user.is_authenticated and (
178            # Object-scoped permission or global permission
179            request.user.has_perm("authentik_flows.inspect_flow", flow)
180            or request.user.has_perm("authentik_flows.inspect_flow")
181        ):
182            get_qs["inspector"] = "available"
183        if next:
184            get_qs[NEXT_ARG_NAME] = next
185
186        return redirect_with_qs(
187            "authentik_core:if-flow",
188            get_qs,
189            flow_slug=flow.slug,
190        )

This data-class is the output of a FlowPlanner. It holds a flat list of all Stages that should be run.

FlowPlan( flow_pk: str, bindings: list[authentik.flows.models.FlowStageBinding] = <factory>, context: dict[str, typing.Any] = <factory>, markers: list[authentik.flows.markers.StageMarker] = <factory>)
flow_pk: str
context: dict[str, typing.Any]
def append_stage( self, stage: authentik.flows.models.Stage, marker: authentik.flows.markers.StageMarker | None = None):
75    def append_stage(self, stage: Stage, marker: StageMarker | None = None):
76        """Append `stage` to the end of the plan, optionally with stage marker"""
77        return self.append(FlowStageBinding(stage=stage), marker)

Append stage to the end of the plan, optionally with stage marker

def append( self, binding: authentik.flows.models.FlowStageBinding, marker: authentik.flows.markers.StageMarker | None = None):
79    def append(self, binding: FlowStageBinding, marker: StageMarker | None = None):
80        """Append `stage` to the end of the plan, optionally with stage marker"""
81        self.bindings.append(binding)
82        self.markers.append(marker or StageMarker())

Append stage to the end of the plan, optionally with stage marker

def insert_stage( self, stage: authentik.flows.models.Stage, marker: authentik.flows.markers.StageMarker | None = None, index=1):
84    def insert_stage(self, stage: Stage, marker: StageMarker | None = None, index=1):
85        """Insert stage into plan, as immediate next stage"""
86        self.bindings.insert(index, FlowStageBinding(stage=stage, order=0))
87        self.markers.insert(index, marker or StageMarker())

Insert stage into plan, as immediate next stage

def redirect(self, destination: str):
89    def redirect(self, destination: str):
90        """Insert a redirect stage as next stage"""
91        from authentik.flows.stage import RedirectStage
92
93        self.insert_stage(in_memory_stage(RedirectStage, destination=destination))

Insert a redirect stage as next stage

def next( self, http_request: django.http.request.HttpRequest | None) -> authentik.flows.models.FlowStageBinding | None:
 95    def next(self, http_request: HttpRequest | None) -> FlowStageBinding | None:
 96        """Return next pending stage from the bottom of the list"""
 97        if not self.has_stages:
 98            return None
 99        binding = self.bindings[0]
100        marker = self.markers[0]
101
102        if marker.__class__ is not StageMarker:
103            LOGGER.debug("f(plan_inst): stage has marker", binding=binding, marker=marker)
104        marked_stage = marker.process(self, binding, http_request)
105        if not marked_stage:
106            LOGGER.debug("f(plan_inst): marker returned none, next stage", binding=binding)
107            self.bindings.remove(binding)
108            self.markers.remove(marker)
109            if not self.has_stages:
110                return None
111
112            return self.next(http_request)
113        return marked_stage

Return next pending stage from the bottom of the list

def pop(self):
115    def pop(self):
116        """Pop next pending stage from bottom of list"""
117        if not self.markers and not self.bindings:
118            return
119        self.markers.pop(0)
120        self.bindings.pop(0)

Pop next pending stage from bottom of list

has_stages: bool
122    @property
123    def has_stages(self) -> bool:
124        """Check if there are any stages left in this plan"""
125        return len(self.markers) + len(self.bindings) > 0

Check if there are any stages left in this plan

def requires_flow_executor(unknown):
127    def requires_flow_executor(
128        self,
129        allowed_silent_types: list[StageView] | None = None,
130    ):
131        # Check if we actually need to show the Flow executor, or if we can jump straight to the end
132        found_unskippable = True
133        if allowed_silent_types:
134            LOGGER.debug("Checking if we can skip the flow executor...")
135            # Policies applied to the flow have already been evaluated, so we're checking for stages
136            # allow-listed or bindings that require a policy re-eval
137            found_unskippable = False
138            for binding, marker in zip(self.bindings, self.markers, strict=True):
139                if binding.stage.view not in allowed_silent_types:
140                    found_unskippable = True
141                if marker and isinstance(marker, ReevaluateMarker):
142                    found_unskippable = True
143        LOGGER.debug("Required flow executor status", status=found_unskippable)
144        return found_unskippable
def to_redirect(unknown):
146    def to_redirect(
147        self,
148        request: HttpRequest,
149        flow: Flow,
150        next: str | None = None,
151        allowed_silent_types: list[StageView] | None = None,
152    ) -> HttpResponse:
153        """Redirect to the flow executor for this flow plan"""
154        from authentik.flows.views.executor import (
155            NEXT_ARG_NAME,
156            SESSION_KEY_PLAN,
157            FlowExecutorView,
158        )
159
160        request.session[SESSION_KEY_PLAN] = self
161        requires_flow_executor = self.requires_flow_executor(allowed_silent_types)
162
163        if not requires_flow_executor:
164            # No unskippable stages found, so we can directly return the response of the last stage
165            final_stage: type[StageView] = self.bindings[-1].stage.view
166            temp_exec = FlowExecutorView(flow=flow, request=request, plan=self)
167            temp_exec.current_stage = self.bindings[-1].stage
168            temp_exec.current_stage_view = final_stage
169            temp_exec.setup(request, flow.slug)
170            stage = final_stage(request=request, executor=temp_exec)
171            response = stage.dispatch(request)
172            # Ensure we clean the flow state we have in the session before we redirect away
173            temp_exec.stage_ok()
174            return response
175
176        get_qs = request.GET.copy()
177        if request.user.is_authenticated and (
178            # Object-scoped permission or global permission
179            request.user.has_perm("authentik_flows.inspect_flow", flow)
180            or request.user.has_perm("authentik_flows.inspect_flow")
181        ):
182            get_qs["inspector"] = "available"
183        if next:
184            get_qs[NEXT_ARG_NAME] = next
185
186        return redirect_with_qs(
187            "authentik_core:if-flow",
188            get_qs,
189            flow_slug=flow.slug,
190        )

Redirect to the flow executor for this flow plan

class FlowPlanner:
193class FlowPlanner:
194    """Execute all policies to plan out a flat list of all Stages
195    that should be applied."""
196
197    use_cache: bool
198    allow_empty_flows: bool
199
200    flow: Flow
201
202    _logger: BoundLogger
203
204    def __init__(self, flow: Flow):
205        self.use_cache = True
206        self.allow_empty_flows = False
207        self.flow = flow
208        self._logger = get_logger().bind(flow_slug=flow.slug)
209
210    def _check_authentication(self, request: HttpRequest, context: dict[str, Any]):
211        """Check the flow's authentication level is matched by `request`"""
212        if (
213            self.flow.authentication == FlowAuthenticationRequirement.REQUIRE_AUTHENTICATED
214            and not request.user.is_authenticated
215        ):
216            raise FlowNonApplicableException()
217        if (
218            self.flow.authentication == FlowAuthenticationRequirement.REQUIRE_UNAUTHENTICATED
219            and request.user.is_authenticated
220        ):
221            raise FlowNonApplicableException()
222        if (
223            self.flow.authentication == FlowAuthenticationRequirement.REQUIRE_SUPERUSER
224            and not request.user.is_superuser
225        ):
226            raise FlowNonApplicableException()
227        if (
228            self.flow.authentication == FlowAuthenticationRequirement.REQUIRE_REDIRECT
229            and context.get(PLAN_CONTEXT_IS_REDIRECTED) is None
230        ):
231            raise FlowNonApplicableException()
232        if (
233            self.flow.authentication == FlowAuthenticationRequirement.REQUIRE_TOKEN
234            and context.get(PLAN_CONTEXT_IS_RESTORED) is None
235        ):
236            raise FlowNonApplicableException(
237                PolicyResult(
238                    False, _("This link is invalid or has expired. Please request a new one.")
239                )
240            )
241        outpost_user = ClientIPMiddleware.get_outpost_user(request)
242        if self.flow.authentication == FlowAuthenticationRequirement.REQUIRE_OUTPOST:
243            if not outpost_user:
244                raise FlowNonApplicableException()
245        if outpost_user:
246            outpost = Outpost.objects.filter(
247                # TODO: Since Outpost and user are not directly connected, we have to look up a user
248                # like this. This should ideally by in authentik/outposts/models.py
249                pk=outpost_user.username.replace("ak-outpost-", "")
250            ).first()
251            if outpost:
252                return {
253                    PLAN_CONTEXT_OUTPOST: {
254                        "instance": outpost,
255                    }
256                }
257        return {}
258
259    def plan(self, request: HttpRequest, default_context: dict[str, Any] | None = None) -> FlowPlan:
260        """Check each of the flows' policies, check policies for each stage with PolicyBinding
261        and return ordered list"""
262        with start_span(op="authentik.flow.planner.plan", name=self.flow.slug) as span:
263            span: Span
264            span.set_data("flow", self.flow)
265            span.set_data("request", request)
266
267            self._logger.debug(
268                "f(plan): starting planning process",
269            )
270            context = default_context or {}
271            # Bit of a workaround here, if there is a pending user set in the default context
272            # we use that user for our cache key to make sure they don't get the generic response
273            if context and PLAN_CONTEXT_PENDING_USER in context:
274                user = context[PLAN_CONTEXT_PENDING_USER]
275            else:
276                user = request.user
277
278            context.update(self._check_authentication(request, context))
279            # First off, check the flow's direct policy bindings
280            # to make sure the user even has access to the flow
281            engine = PolicyEngine(self.flow, user, request)
282            engine.use_cache = self.use_cache
283            span.set_data("context", cleanse_dict(context))
284            engine.request.context.update(context)
285            engine.build()
286            result = engine.result
287            if not result.passing:
288                raise FlowNonApplicableException(result)
289            # User is passing so far, check if we have a cached plan
290            cached_plan_key = cache_key(self.flow, user)
291            cached_plan = cache.get(cached_plan_key, None)
292            if self.flow.designation not in [FlowDesignation.STAGE_CONFIGURATION]:
293                if cached_plan and self.use_cache:
294                    self._logger.debug(
295                        "f(plan): taking plan from cache",
296                        key=cached_plan_key,
297                    )
298                    # Reset the context as this isn't factored into caching
299                    cached_plan.context = context
300                    return cached_plan
301            self._logger.debug(
302                "f(plan): building plan",
303            )
304            plan = self._build_plan(user, request, context)
305            if self.use_cache:
306                cache.set(cache_key(self.flow, user), plan, CACHE_TIMEOUT)
307            if not plan.bindings and not self.allow_empty_flows:
308                raise EmptyFlowException()
309            return plan
310
311    def _build_plan(
312        self,
313        user: User,
314        request: HttpRequest,
315        default_context: dict[str, Any] | None,
316    ) -> FlowPlan:
317        """Build flow plan by checking each stage in their respective
318        order and checking the applied policies"""
319        with (
320            start_span(
321                op="authentik.flow.planner.build_plan",
322                name=self.flow.slug,
323            ) as span,
324            HIST_FLOWS_PLAN_TIME.labels(flow_slug=self.flow.slug).time(),
325        ):
326            span: Span
327            span.set_data("flow", self.flow)
328            span.set_data("user", user)
329            span.set_data("request", request)
330
331            plan = FlowPlan(flow_pk=self.flow.pk.hex)
332            if default_context:
333                plan.context = default_context
334            # Check Flow policies
335            bindings = list(
336                FlowStageBinding.objects.filter(target__pk=self.flow.pk).order_by("order")
337            )
338            stages = Stage.objects.filter(flowstagebinding__in=[binding.pk for binding in bindings])
339            for binding in bindings:
340                binding: FlowStageBinding
341                stage = [stage for stage in stages if stage.pk == binding.stage_id][0]
342                marker = StageMarker()
343                if binding.evaluate_on_plan:
344                    self._logger.debug(
345                        "f(plan): evaluating on plan",
346                        stage=stage,
347                    )
348                    engine = PolicyEngine(binding, user, request)
349                    engine.use_cache = self.use_cache
350                    engine.request.context["flow_plan"] = plan
351                    engine.request.context.update(plan.context)
352                    engine.build()
353                    if engine.passing:
354                        self._logger.debug(
355                            "f(plan): stage passing",
356                            stage=stage,
357                        )
358                    else:
359                        stage = None
360                else:
361                    self._logger.debug(
362                        "f(plan): not evaluating on plan",
363                        stage=stage,
364                    )
365                if binding.re_evaluate_policies and stage:
366                    self._logger.debug(
367                        "f(plan): stage has re-evaluate marker",
368                        stage=stage,
369                    )
370                    marker = ReevaluateMarker(binding=binding)
371                if stage:
372                    plan.append(binding, marker)
373        self._logger.debug(
374            "f(plan): finished building",
375        )
376        return plan

Execute all policies to plan out a flat list of all Stages that should be applied.

FlowPlanner(flow: authentik.flows.models.Flow)
204    def __init__(self, flow: Flow):
205        self.use_cache = True
206        self.allow_empty_flows = False
207        self.flow = flow
208        self._logger = get_logger().bind(flow_slug=flow.slug)
use_cache: bool
allow_empty_flows: bool
def plan( self, request: django.http.request.HttpRequest, default_context: dict[str, Any] | None = None) -> FlowPlan:
259    def plan(self, request: HttpRequest, default_context: dict[str, Any] | None = None) -> FlowPlan:
260        """Check each of the flows' policies, check policies for each stage with PolicyBinding
261        and return ordered list"""
262        with start_span(op="authentik.flow.planner.plan", name=self.flow.slug) as span:
263            span: Span
264            span.set_data("flow", self.flow)
265            span.set_data("request", request)
266
267            self._logger.debug(
268                "f(plan): starting planning process",
269            )
270            context = default_context or {}
271            # Bit of a workaround here, if there is a pending user set in the default context
272            # we use that user for our cache key to make sure they don't get the generic response
273            if context and PLAN_CONTEXT_PENDING_USER in context:
274                user = context[PLAN_CONTEXT_PENDING_USER]
275            else:
276                user = request.user
277
278            context.update(self._check_authentication(request, context))
279            # First off, check the flow's direct policy bindings
280            # to make sure the user even has access to the flow
281            engine = PolicyEngine(self.flow, user, request)
282            engine.use_cache = self.use_cache
283            span.set_data("context", cleanse_dict(context))
284            engine.request.context.update(context)
285            engine.build()
286            result = engine.result
287            if not result.passing:
288                raise FlowNonApplicableException(result)
289            # User is passing so far, check if we have a cached plan
290            cached_plan_key = cache_key(self.flow, user)
291            cached_plan = cache.get(cached_plan_key, None)
292            if self.flow.designation not in [FlowDesignation.STAGE_CONFIGURATION]:
293                if cached_plan and self.use_cache:
294                    self._logger.debug(
295                        "f(plan): taking plan from cache",
296                        key=cached_plan_key,
297                    )
298                    # Reset the context as this isn't factored into caching
299                    cached_plan.context = context
300                    return cached_plan
301            self._logger.debug(
302                "f(plan): building plan",
303            )
304            plan = self._build_plan(user, request, context)
305            if self.use_cache:
306                cache.set(cache_key(self.flow, user), plan, CACHE_TIMEOUT)
307            if not plan.bindings and not self.allow_empty_flows:
308                raise EmptyFlowException()
309            return plan

Check each of the flows' policies, check policies for each stage with PolicyBinding and return ordered list