authentik.flows.planner

Flows Planner

  1"""Flows Planner"""
  2
  3from dataclasses import dataclass, field
  4from typing import TYPE_CHECKING, Any
  5
  6from django.core.cache import cache
  7from django.http import HttpRequest, HttpResponse
  8from sentry_sdk import start_span
  9from sentry_sdk.tracing import Span
 10from structlog.stdlib import BoundLogger, get_logger
 11
 12from authentik.core.models import User
 13from authentik.events.models import cleanse_dict
 14from authentik.flows.apps import HIST_FLOWS_PLAN_TIME
 15from authentik.flows.exceptions import EmptyFlowException, FlowNonApplicableException
 16from authentik.flows.markers import ReevaluateMarker, StageMarker
 17from authentik.flows.models import (
 18    Flow,
 19    FlowAuthenticationRequirement,
 20    FlowDesignation,
 21    FlowStageBinding,
 22    Stage,
 23    in_memory_stage,
 24)
 25from authentik.lib.config import CONFIG
 26from authentik.lib.utils.urls import redirect_with_qs
 27from authentik.outposts.models import Outpost
 28from authentik.policies.engine import PolicyEngine
 29from authentik.root.middleware import ClientIPMiddleware
 30
 31if TYPE_CHECKING:
 32    from authentik.flows.stage import StageView
 33
 34
 35LOGGER = get_logger()
 36PLAN_CONTEXT_PENDING_USER = "pending_user"
 37PLAN_CONTEXT_SSO = "is_sso"
 38PLAN_CONTEXT_REDIRECT = "redirect"
 39PLAN_CONTEXT_APPLICATION = "application"
 40PLAN_CONTEXT_DEVICE = "device"
 41PLAN_CONTEXT_SOURCE = "source"
 42PLAN_CONTEXT_OUTPOST = "outpost"
 43PLAN_CONTEXT_POST = "goauthentik.io/http/post"
 44# Is set by the Flow Planner when a FlowToken was used, and the currently active flow plan
 45# was restored.
 46PLAN_CONTEXT_IS_RESTORED = "is_restored"
 47PLAN_CONTEXT_IS_REDIRECTED = "is_redirected"
 48PLAN_CONTEXT_REDIRECT_STAGE_TARGET = "redirect_stage_target"
 49CACHE_TIMEOUT = CONFIG.get_int("cache.timeout_flows")
 50CACHE_PREFIX = "goauthentik.io/flows/planner/"
 51
 52
 53def cache_key(flow: Flow, user: User | None = None) -> str:
 54    """Generate Cache key for flow"""
 55    prefix = CACHE_PREFIX + str(flow.pk)
 56    if user:
 57        prefix += f"#{user.pk}"
 58    return prefix
 59
 60
 61@dataclass(slots=True)
 62class FlowPlan:
 63    """This data-class is the output of a FlowPlanner. It holds a flat list
 64    of all Stages that should be run."""
 65
 66    flow_pk: str
 67
 68    bindings: list[FlowStageBinding] = field(default_factory=list)
 69    context: dict[str, Any] = field(default_factory=dict)
 70    markers: list[StageMarker] = field(default_factory=list)
 71
 72    def append_stage(self, stage: Stage, marker: StageMarker | None = None):
 73        """Append `stage` to the end of the plan, optionally with stage marker"""
 74        return self.append(FlowStageBinding(stage=stage), marker)
 75
 76    def append(self, binding: FlowStageBinding, marker: StageMarker | None = None):
 77        """Append `stage` to the end of the plan, optionally with stage marker"""
 78        self.bindings.append(binding)
 79        self.markers.append(marker or StageMarker())
 80
 81    def insert_stage(self, stage: Stage, marker: StageMarker | None = None, index=1):
 82        """Insert stage into plan, as immediate next stage"""
 83        self.bindings.insert(index, FlowStageBinding(stage=stage, order=0))
 84        self.markers.insert(index, marker or StageMarker())
 85
 86    def redirect(self, destination: str):
 87        """Insert a redirect stage as next stage"""
 88        from authentik.flows.stage import RedirectStage
 89
 90        self.insert_stage(in_memory_stage(RedirectStage, destination=destination))
 91
 92    def next(self, http_request: HttpRequest | None) -> FlowStageBinding | None:
 93        """Return next pending stage from the bottom of the list"""
 94        if not self.has_stages:
 95            return None
 96        binding = self.bindings[0]
 97        marker = self.markers[0]
 98
 99        if marker.__class__ is not StageMarker:
100            LOGGER.debug("f(plan_inst): stage has marker", binding=binding, marker=marker)
101        marked_stage = marker.process(self, binding, http_request)
102        if not marked_stage:
103            LOGGER.debug("f(plan_inst): marker returned none, next stage", binding=binding)
104            self.bindings.remove(binding)
105            self.markers.remove(marker)
106            if not self.has_stages:
107                return None
108
109            return self.next(http_request)
110        return marked_stage
111
112    def pop(self):
113        """Pop next pending stage from bottom of list"""
114        if not self.markers and not self.bindings:
115            return
116        self.markers.pop(0)
117        self.bindings.pop(0)
118
119    @property
120    def has_stages(self) -> bool:
121        """Check if there are any stages left in this plan"""
122        return len(self.markers) + len(self.bindings) > 0
123
124    def requires_flow_executor(
125        self,
126        allowed_silent_types: list[StageView] | None = None,
127    ):
128        # Check if we actually need to show the Flow executor, or if we can jump straight to the end
129        found_unskippable = True
130        if allowed_silent_types:
131            LOGGER.debug("Checking if we can skip the flow executor...")
132            # Policies applied to the flow have already been evaluated, so we're checking for stages
133            # allow-listed or bindings that require a policy re-eval
134            found_unskippable = False
135            for binding, marker in zip(self.bindings, self.markers, strict=True):
136                if binding.stage.view not in allowed_silent_types:
137                    found_unskippable = True
138                if marker and isinstance(marker, ReevaluateMarker):
139                    found_unskippable = True
140        LOGGER.debug("Required flow executor status", status=found_unskippable)
141        return found_unskippable
142
143    def to_redirect(
144        self,
145        request: HttpRequest,
146        flow: Flow,
147        next: str | None = None,
148        allowed_silent_types: list[StageView] | None = None,
149    ) -> HttpResponse:
150        """Redirect to the flow executor for this flow plan"""
151        from authentik.flows.views.executor import (
152            NEXT_ARG_NAME,
153            SESSION_KEY_PLAN,
154            FlowExecutorView,
155        )
156
157        request.session[SESSION_KEY_PLAN] = self
158        requires_flow_executor = self.requires_flow_executor(allowed_silent_types)
159
160        if not requires_flow_executor:
161            # No unskippable stages found, so we can directly return the response of the last stage
162            final_stage: type[StageView] = self.bindings[-1].stage.view
163            temp_exec = FlowExecutorView(flow=flow, request=request, plan=self)
164            temp_exec.current_stage = self.bindings[-1].stage
165            temp_exec.current_stage_view = final_stage
166            temp_exec.setup(request, flow.slug)
167            stage = final_stage(request=request, executor=temp_exec)
168            response = stage.dispatch(request)
169            # Ensure we clean the flow state we have in the session before we redirect away
170            temp_exec.stage_ok()
171            return response
172
173        get_qs = request.GET.copy()
174        if request.user.is_authenticated and (
175            # Object-scoped permission or global permission
176            request.user.has_perm("authentik_flows.inspect_flow", flow)
177            or request.user.has_perm("authentik_flows.inspect_flow")
178        ):
179            get_qs["inspector"] = "available"
180        if next:
181            get_qs[NEXT_ARG_NAME] = next
182
183        return redirect_with_qs(
184            "authentik_core:if-flow",
185            get_qs,
186            flow_slug=flow.slug,
187        )
188
189
190class FlowPlanner:
191    """Execute all policies to plan out a flat list of all Stages
192    that should be applied."""
193
194    use_cache: bool
195    allow_empty_flows: bool
196
197    flow: Flow
198
199    _logger: BoundLogger
200
201    def __init__(self, flow: Flow):
202        self.use_cache = True
203        self.allow_empty_flows = False
204        self.flow = flow
205        self._logger = get_logger().bind(flow_slug=flow.slug)
206
207    def _check_authentication(self, request: HttpRequest, context: dict[str, Any]):
208        """Check the flow's authentication level is matched by `request`"""
209        if (
210            self.flow.authentication == FlowAuthenticationRequirement.REQUIRE_AUTHENTICATED
211            and not request.user.is_authenticated
212        ):
213            raise FlowNonApplicableException()
214        if (
215            self.flow.authentication == FlowAuthenticationRequirement.REQUIRE_UNAUTHENTICATED
216            and request.user.is_authenticated
217        ):
218            raise FlowNonApplicableException()
219        if (
220            self.flow.authentication == FlowAuthenticationRequirement.REQUIRE_SUPERUSER
221            and not request.user.is_superuser
222        ):
223            raise FlowNonApplicableException()
224        if (
225            self.flow.authentication == FlowAuthenticationRequirement.REQUIRE_REDIRECT
226            and context.get(PLAN_CONTEXT_IS_REDIRECTED) is None
227        ):
228            raise FlowNonApplicableException()
229        outpost_user = ClientIPMiddleware.get_outpost_user(request)
230        if self.flow.authentication == FlowAuthenticationRequirement.REQUIRE_OUTPOST:
231            if not outpost_user:
232                raise FlowNonApplicableException()
233        if outpost_user:
234            outpost = Outpost.objects.filter(
235                # TODO: Since Outpost and user are not directly connected, we have to look up a user
236                # like this. This should ideally by in authentik/outposts/models.py
237                pk=outpost_user.username.replace("ak-outpost-", "")
238            ).first()
239            if outpost:
240                return {
241                    PLAN_CONTEXT_OUTPOST: {
242                        "instance": outpost,
243                    }
244                }
245        return {}
246
247    def plan(self, request: HttpRequest, default_context: dict[str, Any] | None = None) -> FlowPlan:
248        """Check each of the flows' policies, check policies for each stage with PolicyBinding
249        and return ordered list"""
250        with start_span(op="authentik.flow.planner.plan", name=self.flow.slug) as span:
251            span: Span
252            span.set_data("flow", self.flow)
253            span.set_data("request", request)
254
255            self._logger.debug(
256                "f(plan): starting planning process",
257            )
258            context = default_context or {}
259            # Bit of a workaround here, if there is a pending user set in the default context
260            # we use that user for our cache key to make sure they don't get the generic response
261            if context and PLAN_CONTEXT_PENDING_USER in context:
262                user = context[PLAN_CONTEXT_PENDING_USER]
263            else:
264                user = request.user
265
266            context.update(self._check_authentication(request, context))
267            # First off, check the flow's direct policy bindings
268            # to make sure the user even has access to the flow
269            engine = PolicyEngine(self.flow, user, request)
270            engine.use_cache = self.use_cache
271            span.set_data("context", cleanse_dict(context))
272            engine.request.context.update(context)
273            engine.build()
274            result = engine.result
275            if not result.passing:
276                exc = FlowNonApplicableException()
277                exc.policy_result = result
278                raise exc
279            # User is passing so far, check if we have a cached plan
280            cached_plan_key = cache_key(self.flow, user)
281            cached_plan = cache.get(cached_plan_key, None)
282            if self.flow.designation not in [FlowDesignation.STAGE_CONFIGURATION]:
283                if cached_plan and self.use_cache:
284                    self._logger.debug(
285                        "f(plan): taking plan from cache",
286                        key=cached_plan_key,
287                    )
288                    # Reset the context as this isn't factored into caching
289                    cached_plan.context = context
290                    return cached_plan
291            self._logger.debug(
292                "f(plan): building plan",
293            )
294            plan = self._build_plan(user, request, context)
295            if self.use_cache:
296                cache.set(cache_key(self.flow, user), plan, CACHE_TIMEOUT)
297            if not plan.bindings and not self.allow_empty_flows:
298                raise EmptyFlowException()
299            return plan
300
301    def _build_plan(
302        self,
303        user: User,
304        request: HttpRequest,
305        default_context: dict[str, Any] | None,
306    ) -> FlowPlan:
307        """Build flow plan by checking each stage in their respective
308        order and checking the applied policies"""
309        with (
310            start_span(
311                op="authentik.flow.planner.build_plan",
312                name=self.flow.slug,
313            ) as span,
314            HIST_FLOWS_PLAN_TIME.labels(flow_slug=self.flow.slug).time(),
315        ):
316            span: Span
317            span.set_data("flow", self.flow)
318            span.set_data("user", user)
319            span.set_data("request", request)
320
321            plan = FlowPlan(flow_pk=self.flow.pk.hex)
322            if default_context:
323                plan.context = default_context
324            # Check Flow policies
325            bindings = list(
326                FlowStageBinding.objects.filter(target__pk=self.flow.pk).order_by("order")
327            )
328            stages = Stage.objects.filter(flowstagebinding__in=[binding.pk for binding in bindings])
329            for binding in bindings:
330                binding: FlowStageBinding
331                stage = [stage for stage in stages if stage.pk == binding.stage_id][0]
332                marker = StageMarker()
333                if binding.evaluate_on_plan:
334                    self._logger.debug(
335                        "f(plan): evaluating on plan",
336                        stage=stage,
337                    )
338                    engine = PolicyEngine(binding, user, request)
339                    engine.use_cache = self.use_cache
340                    engine.request.context["flow_plan"] = plan
341                    engine.request.context.update(plan.context)
342                    engine.build()
343                    if engine.passing:
344                        self._logger.debug(
345                            "f(plan): stage passing",
346                            stage=stage,
347                        )
348                    else:
349                        stage = None
350                else:
351                    self._logger.debug(
352                        "f(plan): not evaluating on plan",
353                        stage=stage,
354                    )
355                if binding.re_evaluate_policies and stage:
356                    self._logger.debug(
357                        "f(plan): stage has re-evaluate marker",
358                        stage=stage,
359                    )
360                    marker = ReevaluateMarker(binding=binding)
361                if stage:
362                    plan.append(binding, marker)
363        self._logger.debug(
364            "f(plan): finished building",
365        )
366        return plan
LOGGER = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
PLAN_CONTEXT_PENDING_USER = 'pending_user'
PLAN_CONTEXT_SSO = 'is_sso'
PLAN_CONTEXT_REDIRECT = 'redirect'
PLAN_CONTEXT_APPLICATION = 'application'
PLAN_CONTEXT_DEVICE = 'device'
PLAN_CONTEXT_SOURCE = 'source'
PLAN_CONTEXT_OUTPOST = 'outpost'
PLAN_CONTEXT_POST = 'goauthentik.io/http/post'
PLAN_CONTEXT_IS_RESTORED = 'is_restored'
PLAN_CONTEXT_IS_REDIRECTED = 'is_redirected'
PLAN_CONTEXT_REDIRECT_STAGE_TARGET = 'redirect_stage_target'
CACHE_TIMEOUT = 300
CACHE_PREFIX = 'goauthentik.io/flows/planner/'
def cache_key( flow: authentik.flows.models.Flow, user: authentik.core.models.User | None = None) -> str:
54def cache_key(flow: Flow, user: User | None = None) -> str:
55    """Generate Cache key for flow"""
56    prefix = CACHE_PREFIX + str(flow.pk)
57    if user:
58        prefix += f"#{user.pk}"
59    return prefix

Generate Cache key for flow

@dataclass(slots=True)
class FlowPlan:
 62@dataclass(slots=True)
 63class FlowPlan:
 64    """This data-class is the output of a FlowPlanner. It holds a flat list
 65    of all Stages that should be run."""
 66
 67    flow_pk: str
 68
 69    bindings: list[FlowStageBinding] = field(default_factory=list)
 70    context: dict[str, Any] = field(default_factory=dict)
 71    markers: list[StageMarker] = field(default_factory=list)
 72
 73    def append_stage(self, stage: Stage, marker: StageMarker | None = None):
 74        """Append `stage` to the end of the plan, optionally with stage marker"""
 75        return self.append(FlowStageBinding(stage=stage), marker)
 76
 77    def append(self, binding: FlowStageBinding, marker: StageMarker | None = None):
 78        """Append `stage` to the end of the plan, optionally with stage marker"""
 79        self.bindings.append(binding)
 80        self.markers.append(marker or StageMarker())
 81
 82    def insert_stage(self, stage: Stage, marker: StageMarker | None = None, index=1):
 83        """Insert stage into plan, as immediate next stage"""
 84        self.bindings.insert(index, FlowStageBinding(stage=stage, order=0))
 85        self.markers.insert(index, marker or StageMarker())
 86
 87    def redirect(self, destination: str):
 88        """Insert a redirect stage as next stage"""
 89        from authentik.flows.stage import RedirectStage
 90
 91        self.insert_stage(in_memory_stage(RedirectStage, destination=destination))
 92
 93    def next(self, http_request: HttpRequest | None) -> FlowStageBinding | None:
 94        """Return next pending stage from the bottom of the list"""
 95        if not self.has_stages:
 96            return None
 97        binding = self.bindings[0]
 98        marker = self.markers[0]
 99
100        if marker.__class__ is not StageMarker:
101            LOGGER.debug("f(plan_inst): stage has marker", binding=binding, marker=marker)
102        marked_stage = marker.process(self, binding, http_request)
103        if not marked_stage:
104            LOGGER.debug("f(plan_inst): marker returned none, next stage", binding=binding)
105            self.bindings.remove(binding)
106            self.markers.remove(marker)
107            if not self.has_stages:
108                return None
109
110            return self.next(http_request)
111        return marked_stage
112
113    def pop(self):
114        """Pop next pending stage from bottom of list"""
115        if not self.markers and not self.bindings:
116            return
117        self.markers.pop(0)
118        self.bindings.pop(0)
119
120    @property
121    def has_stages(self) -> bool:
122        """Check if there are any stages left in this plan"""
123        return len(self.markers) + len(self.bindings) > 0
124
125    def requires_flow_executor(
126        self,
127        allowed_silent_types: list[StageView] | None = None,
128    ):
129        # Check if we actually need to show the Flow executor, or if we can jump straight to the end
130        found_unskippable = True
131        if allowed_silent_types:
132            LOGGER.debug("Checking if we can skip the flow executor...")
133            # Policies applied to the flow have already been evaluated, so we're checking for stages
134            # allow-listed or bindings that require a policy re-eval
135            found_unskippable = False
136            for binding, marker in zip(self.bindings, self.markers, strict=True):
137                if binding.stage.view not in allowed_silent_types:
138                    found_unskippable = True
139                if marker and isinstance(marker, ReevaluateMarker):
140                    found_unskippable = True
141        LOGGER.debug("Required flow executor status", status=found_unskippable)
142        return found_unskippable
143
144    def to_redirect(
145        self,
146        request: HttpRequest,
147        flow: Flow,
148        next: str | None = None,
149        allowed_silent_types: list[StageView] | None = None,
150    ) -> HttpResponse:
151        """Redirect to the flow executor for this flow plan"""
152        from authentik.flows.views.executor import (
153            NEXT_ARG_NAME,
154            SESSION_KEY_PLAN,
155            FlowExecutorView,
156        )
157
158        request.session[SESSION_KEY_PLAN] = self
159        requires_flow_executor = self.requires_flow_executor(allowed_silent_types)
160
161        if not requires_flow_executor:
162            # No unskippable stages found, so we can directly return the response of the last stage
163            final_stage: type[StageView] = self.bindings[-1].stage.view
164            temp_exec = FlowExecutorView(flow=flow, request=request, plan=self)
165            temp_exec.current_stage = self.bindings[-1].stage
166            temp_exec.current_stage_view = final_stage
167            temp_exec.setup(request, flow.slug)
168            stage = final_stage(request=request, executor=temp_exec)
169            response = stage.dispatch(request)
170            # Ensure we clean the flow state we have in the session before we redirect away
171            temp_exec.stage_ok()
172            return response
173
174        get_qs = request.GET.copy()
175        if request.user.is_authenticated and (
176            # Object-scoped permission or global permission
177            request.user.has_perm("authentik_flows.inspect_flow", flow)
178            or request.user.has_perm("authentik_flows.inspect_flow")
179        ):
180            get_qs["inspector"] = "available"
181        if next:
182            get_qs[NEXT_ARG_NAME] = next
183
184        return redirect_with_qs(
185            "authentik_core:if-flow",
186            get_qs,
187            flow_slug=flow.slug,
188        )

This data-class is the output of a FlowPlanner. It holds a flat list of all Stages that should be run.

FlowPlan( flow_pk: str, bindings: list[authentik.flows.models.FlowStageBinding] = <factory>, context: dict[str, typing.Any] = <factory>, markers: list[authentik.flows.markers.StageMarker] = <factory>)
flow_pk: str
context: dict[str, typing.Any]
def append_stage( self, stage: authentik.flows.models.Stage, marker: authentik.flows.markers.StageMarker | None = None):
73    def append_stage(self, stage: Stage, marker: StageMarker | None = None):
74        """Append `stage` to the end of the plan, optionally with stage marker"""
75        return self.append(FlowStageBinding(stage=stage), marker)

Append stage to the end of the plan, optionally with stage marker

def append( self, binding: authentik.flows.models.FlowStageBinding, marker: authentik.flows.markers.StageMarker | None = None):
77    def append(self, binding: FlowStageBinding, marker: StageMarker | None = None):
78        """Append `stage` to the end of the plan, optionally with stage marker"""
79        self.bindings.append(binding)
80        self.markers.append(marker or StageMarker())

Append stage to the end of the plan, optionally with stage marker

def insert_stage( self, stage: authentik.flows.models.Stage, marker: authentik.flows.markers.StageMarker | None = None, index=1):
82    def insert_stage(self, stage: Stage, marker: StageMarker | None = None, index=1):
83        """Insert stage into plan, as immediate next stage"""
84        self.bindings.insert(index, FlowStageBinding(stage=stage, order=0))
85        self.markers.insert(index, marker or StageMarker())

Insert stage into plan, as immediate next stage

def redirect(self, destination: str):
87    def redirect(self, destination: str):
88        """Insert a redirect stage as next stage"""
89        from authentik.flows.stage import RedirectStage
90
91        self.insert_stage(in_memory_stage(RedirectStage, destination=destination))

Insert a redirect stage as next stage

def next( self, http_request: django.http.request.HttpRequest | None) -> authentik.flows.models.FlowStageBinding | None:
 93    def next(self, http_request: HttpRequest | None) -> FlowStageBinding | None:
 94        """Return next pending stage from the bottom of the list"""
 95        if not self.has_stages:
 96            return None
 97        binding = self.bindings[0]
 98        marker = self.markers[0]
 99
100        if marker.__class__ is not StageMarker:
101            LOGGER.debug("f(plan_inst): stage has marker", binding=binding, marker=marker)
102        marked_stage = marker.process(self, binding, http_request)
103        if not marked_stage:
104            LOGGER.debug("f(plan_inst): marker returned none, next stage", binding=binding)
105            self.bindings.remove(binding)
106            self.markers.remove(marker)
107            if not self.has_stages:
108                return None
109
110            return self.next(http_request)
111        return marked_stage

Return next pending stage from the bottom of the list

def pop(self):
113    def pop(self):
114        """Pop next pending stage from bottom of list"""
115        if not self.markers and not self.bindings:
116            return
117        self.markers.pop(0)
118        self.bindings.pop(0)

Pop next pending stage from bottom of list

has_stages: bool
120    @property
121    def has_stages(self) -> bool:
122        """Check if there are any stages left in this plan"""
123        return len(self.markers) + len(self.bindings) > 0

Check if there are any stages left in this plan

def requires_flow_executor(unknown):
125    def requires_flow_executor(
126        self,
127        allowed_silent_types: list[StageView] | None = None,
128    ):
129        # Check if we actually need to show the Flow executor, or if we can jump straight to the end
130        found_unskippable = True
131        if allowed_silent_types:
132            LOGGER.debug("Checking if we can skip the flow executor...")
133            # Policies applied to the flow have already been evaluated, so we're checking for stages
134            # allow-listed or bindings that require a policy re-eval
135            found_unskippable = False
136            for binding, marker in zip(self.bindings, self.markers, strict=True):
137                if binding.stage.view not in allowed_silent_types:
138                    found_unskippable = True
139                if marker and isinstance(marker, ReevaluateMarker):
140                    found_unskippable = True
141        LOGGER.debug("Required flow executor status", status=found_unskippable)
142        return found_unskippable
def to_redirect(unknown):
144    def to_redirect(
145        self,
146        request: HttpRequest,
147        flow: Flow,
148        next: str | None = None,
149        allowed_silent_types: list[StageView] | None = None,
150    ) -> HttpResponse:
151        """Redirect to the flow executor for this flow plan"""
152        from authentik.flows.views.executor import (
153            NEXT_ARG_NAME,
154            SESSION_KEY_PLAN,
155            FlowExecutorView,
156        )
157
158        request.session[SESSION_KEY_PLAN] = self
159        requires_flow_executor = self.requires_flow_executor(allowed_silent_types)
160
161        if not requires_flow_executor:
162            # No unskippable stages found, so we can directly return the response of the last stage
163            final_stage: type[StageView] = self.bindings[-1].stage.view
164            temp_exec = FlowExecutorView(flow=flow, request=request, plan=self)
165            temp_exec.current_stage = self.bindings[-1].stage
166            temp_exec.current_stage_view = final_stage
167            temp_exec.setup(request, flow.slug)
168            stage = final_stage(request=request, executor=temp_exec)
169            response = stage.dispatch(request)
170            # Ensure we clean the flow state we have in the session before we redirect away
171            temp_exec.stage_ok()
172            return response
173
174        get_qs = request.GET.copy()
175        if request.user.is_authenticated and (
176            # Object-scoped permission or global permission
177            request.user.has_perm("authentik_flows.inspect_flow", flow)
178            or request.user.has_perm("authentik_flows.inspect_flow")
179        ):
180            get_qs["inspector"] = "available"
181        if next:
182            get_qs[NEXT_ARG_NAME] = next
183
184        return redirect_with_qs(
185            "authentik_core:if-flow",
186            get_qs,
187            flow_slug=flow.slug,
188        )

Redirect to the flow executor for this flow plan

class FlowPlanner:
191class FlowPlanner:
192    """Execute all policies to plan out a flat list of all Stages
193    that should be applied."""
194
195    use_cache: bool
196    allow_empty_flows: bool
197
198    flow: Flow
199
200    _logger: BoundLogger
201
202    def __init__(self, flow: Flow):
203        self.use_cache = True
204        self.allow_empty_flows = False
205        self.flow = flow
206        self._logger = get_logger().bind(flow_slug=flow.slug)
207
208    def _check_authentication(self, request: HttpRequest, context: dict[str, Any]):
209        """Check the flow's authentication level is matched by `request`"""
210        if (
211            self.flow.authentication == FlowAuthenticationRequirement.REQUIRE_AUTHENTICATED
212            and not request.user.is_authenticated
213        ):
214            raise FlowNonApplicableException()
215        if (
216            self.flow.authentication == FlowAuthenticationRequirement.REQUIRE_UNAUTHENTICATED
217            and request.user.is_authenticated
218        ):
219            raise FlowNonApplicableException()
220        if (
221            self.flow.authentication == FlowAuthenticationRequirement.REQUIRE_SUPERUSER
222            and not request.user.is_superuser
223        ):
224            raise FlowNonApplicableException()
225        if (
226            self.flow.authentication == FlowAuthenticationRequirement.REQUIRE_REDIRECT
227            and context.get(PLAN_CONTEXT_IS_REDIRECTED) is None
228        ):
229            raise FlowNonApplicableException()
230        outpost_user = ClientIPMiddleware.get_outpost_user(request)
231        if self.flow.authentication == FlowAuthenticationRequirement.REQUIRE_OUTPOST:
232            if not outpost_user:
233                raise FlowNonApplicableException()
234        if outpost_user:
235            outpost = Outpost.objects.filter(
236                # TODO: Since Outpost and user are not directly connected, we have to look up a user
237                # like this. This should ideally by in authentik/outposts/models.py
238                pk=outpost_user.username.replace("ak-outpost-", "")
239            ).first()
240            if outpost:
241                return {
242                    PLAN_CONTEXT_OUTPOST: {
243                        "instance": outpost,
244                    }
245                }
246        return {}
247
248    def plan(self, request: HttpRequest, default_context: dict[str, Any] | None = None) -> FlowPlan:
249        """Check each of the flows' policies, check policies for each stage with PolicyBinding
250        and return ordered list"""
251        with start_span(op="authentik.flow.planner.plan", name=self.flow.slug) as span:
252            span: Span
253            span.set_data("flow", self.flow)
254            span.set_data("request", request)
255
256            self._logger.debug(
257                "f(plan): starting planning process",
258            )
259            context = default_context or {}
260            # Bit of a workaround here, if there is a pending user set in the default context
261            # we use that user for our cache key to make sure they don't get the generic response
262            if context and PLAN_CONTEXT_PENDING_USER in context:
263                user = context[PLAN_CONTEXT_PENDING_USER]
264            else:
265                user = request.user
266
267            context.update(self._check_authentication(request, context))
268            # First off, check the flow's direct policy bindings
269            # to make sure the user even has access to the flow
270            engine = PolicyEngine(self.flow, user, request)
271            engine.use_cache = self.use_cache
272            span.set_data("context", cleanse_dict(context))
273            engine.request.context.update(context)
274            engine.build()
275            result = engine.result
276            if not result.passing:
277                exc = FlowNonApplicableException()
278                exc.policy_result = result
279                raise exc
280            # User is passing so far, check if we have a cached plan
281            cached_plan_key = cache_key(self.flow, user)
282            cached_plan = cache.get(cached_plan_key, None)
283            if self.flow.designation not in [FlowDesignation.STAGE_CONFIGURATION]:
284                if cached_plan and self.use_cache:
285                    self._logger.debug(
286                        "f(plan): taking plan from cache",
287                        key=cached_plan_key,
288                    )
289                    # Reset the context as this isn't factored into caching
290                    cached_plan.context = context
291                    return cached_plan
292            self._logger.debug(
293                "f(plan): building plan",
294            )
295            plan = self._build_plan(user, request, context)
296            if self.use_cache:
297                cache.set(cache_key(self.flow, user), plan, CACHE_TIMEOUT)
298            if not plan.bindings and not self.allow_empty_flows:
299                raise EmptyFlowException()
300            return plan
301
302    def _build_plan(
303        self,
304        user: User,
305        request: HttpRequest,
306        default_context: dict[str, Any] | None,
307    ) -> FlowPlan:
308        """Build flow plan by checking each stage in their respective
309        order and checking the applied policies"""
310        with (
311            start_span(
312                op="authentik.flow.planner.build_plan",
313                name=self.flow.slug,
314            ) as span,
315            HIST_FLOWS_PLAN_TIME.labels(flow_slug=self.flow.slug).time(),
316        ):
317            span: Span
318            span.set_data("flow", self.flow)
319            span.set_data("user", user)
320            span.set_data("request", request)
321
322            plan = FlowPlan(flow_pk=self.flow.pk.hex)
323            if default_context:
324                plan.context = default_context
325            # Check Flow policies
326            bindings = list(
327                FlowStageBinding.objects.filter(target__pk=self.flow.pk).order_by("order")
328            )
329            stages = Stage.objects.filter(flowstagebinding__in=[binding.pk for binding in bindings])
330            for binding in bindings:
331                binding: FlowStageBinding
332                stage = [stage for stage in stages if stage.pk == binding.stage_id][0]
333                marker = StageMarker()
334                if binding.evaluate_on_plan:
335                    self._logger.debug(
336                        "f(plan): evaluating on plan",
337                        stage=stage,
338                    )
339                    engine = PolicyEngine(binding, user, request)
340                    engine.use_cache = self.use_cache
341                    engine.request.context["flow_plan"] = plan
342                    engine.request.context.update(plan.context)
343                    engine.build()
344                    if engine.passing:
345                        self._logger.debug(
346                            "f(plan): stage passing",
347                            stage=stage,
348                        )
349                    else:
350                        stage = None
351                else:
352                    self._logger.debug(
353                        "f(plan): not evaluating on plan",
354                        stage=stage,
355                    )
356                if binding.re_evaluate_policies and stage:
357                    self._logger.debug(
358                        "f(plan): stage has re-evaluate marker",
359                        stage=stage,
360                    )
361                    marker = ReevaluateMarker(binding=binding)
362                if stage:
363                    plan.append(binding, marker)
364        self._logger.debug(
365            "f(plan): finished building",
366        )
367        return plan

Execute all policies to plan out a flat list of all Stages that should be applied.

FlowPlanner(flow: authentik.flows.models.Flow)
202    def __init__(self, flow: Flow):
203        self.use_cache = True
204        self.allow_empty_flows = False
205        self.flow = flow
206        self._logger = get_logger().bind(flow_slug=flow.slug)
use_cache: bool
allow_empty_flows: bool
def plan( self, request: django.http.request.HttpRequest, default_context: dict[str, Any] | None = None) -> FlowPlan:
248    def plan(self, request: HttpRequest, default_context: dict[str, Any] | None = None) -> FlowPlan:
249        """Check each of the flows' policies, check policies for each stage with PolicyBinding
250        and return ordered list"""
251        with start_span(op="authentik.flow.planner.plan", name=self.flow.slug) as span:
252            span: Span
253            span.set_data("flow", self.flow)
254            span.set_data("request", request)
255
256            self._logger.debug(
257                "f(plan): starting planning process",
258            )
259            context = default_context or {}
260            # Bit of a workaround here, if there is a pending user set in the default context
261            # we use that user for our cache key to make sure they don't get the generic response
262            if context and PLAN_CONTEXT_PENDING_USER in context:
263                user = context[PLAN_CONTEXT_PENDING_USER]
264            else:
265                user = request.user
266
267            context.update(self._check_authentication(request, context))
268            # First off, check the flow's direct policy bindings
269            # to make sure the user even has access to the flow
270            engine = PolicyEngine(self.flow, user, request)
271            engine.use_cache = self.use_cache
272            span.set_data("context", cleanse_dict(context))
273            engine.request.context.update(context)
274            engine.build()
275            result = engine.result
276            if not result.passing:
277                exc = FlowNonApplicableException()
278                exc.policy_result = result
279                raise exc
280            # User is passing so far, check if we have a cached plan
281            cached_plan_key = cache_key(self.flow, user)
282            cached_plan = cache.get(cached_plan_key, None)
283            if self.flow.designation not in [FlowDesignation.STAGE_CONFIGURATION]:
284                if cached_plan and self.use_cache:
285                    self._logger.debug(
286                        "f(plan): taking plan from cache",
287                        key=cached_plan_key,
288                    )
289                    # Reset the context as this isn't factored into caching
290                    cached_plan.context = context
291                    return cached_plan
292            self._logger.debug(
293                "f(plan): building plan",
294            )
295            plan = self._build_plan(user, request, context)
296            if self.use_cache:
297                cache.set(cache_key(self.flow, user), plan, CACHE_TIMEOUT)
298            if not plan.bindings and not self.allow_empty_flows:
299                raise EmptyFlowException()
300            return plan

Check each of the flows' policies, check policies for each stage with PolicyBinding and return ordered list