authentik.flows.planner
Flows Planner
1"""Flows Planner""" 2 3from dataclasses import dataclass, field 4from typing import TYPE_CHECKING, Any 5 6from django.core.cache import cache 7from django.http import HttpRequest, HttpResponse 8from django.utils.translation import gettext as _ 9from sentry_sdk import start_span 10from sentry_sdk.tracing import Span 11from structlog.stdlib import BoundLogger, get_logger 12 13from authentik.core.models import User 14from authentik.events.models import cleanse_dict 15from authentik.flows.apps import HIST_FLOWS_PLAN_TIME 16from authentik.flows.exceptions import EmptyFlowException, FlowNonApplicableException 17from authentik.flows.markers import ReevaluateMarker, StageMarker 18from authentik.flows.models import ( 19 Flow, 20 FlowAuthenticationRequirement, 21 FlowDesignation, 22 FlowStageBinding, 23 Stage, 24 in_memory_stage, 25) 26from authentik.lib.config import CONFIG 27from authentik.lib.utils.urls import redirect_with_qs 28from authentik.outposts.models import Outpost 29from authentik.policies.engine import PolicyEngine 30from authentik.policies.types import PolicyResult 31from authentik.root.middleware import ClientIPMiddleware 32 33if TYPE_CHECKING: 34 from authentik.flows.stage import StageView 35 36 37LOGGER = get_logger() 38PLAN_CONTEXT_PENDING_USER = "pending_user" 39PLAN_CONTEXT_SSO = "is_sso" 40PLAN_CONTEXT_REDIRECT = "redirect" 41PLAN_CONTEXT_APPLICATION = "application" 42PLAN_CONTEXT_DEVICE = "device" 43PLAN_CONTEXT_SOURCE = "source" 44PLAN_CONTEXT_OUTPOST = "outpost" 45PLAN_CONTEXT_POST = "goauthentik.io/http/post" 46# Is set by the Flow Planner when a FlowToken was used, and the currently active flow plan 47# was restored. 48PLAN_CONTEXT_IS_RESTORED = "is_restored" 49PLAN_CONTEXT_IS_REDIRECTED = "is_redirected" 50PLAN_CONTEXT_REDIRECT_STAGE_TARGET = "redirect_stage_target" 51CACHE_TIMEOUT = CONFIG.get_int("cache.timeout_flows") 52CACHE_PREFIX = "goauthentik.io/flows/planner/" 53 54 55def cache_key(flow: Flow, user: User | None = None) -> str: 56 """Generate Cache key for flow""" 57 prefix = CACHE_PREFIX + str(flow.pk) 58 if user: 59 prefix += f"#{user.pk}" 60 return prefix 61 62 63@dataclass(slots=True) 64class FlowPlan: 65 """This data-class is the output of a FlowPlanner. It holds a flat list 66 of all Stages that should be run.""" 67 68 flow_pk: str 69 70 bindings: list[FlowStageBinding] = field(default_factory=list) 71 context: dict[str, Any] = field(default_factory=dict) 72 markers: list[StageMarker] = field(default_factory=list) 73 74 def append_stage(self, stage: Stage, marker: StageMarker | None = None): 75 """Append `stage` to the end of the plan, optionally with stage marker""" 76 return self.append(FlowStageBinding(stage=stage), marker) 77 78 def append(self, binding: FlowStageBinding, marker: StageMarker | None = None): 79 """Append `stage` to the end of the plan, optionally with stage marker""" 80 self.bindings.append(binding) 81 self.markers.append(marker or StageMarker()) 82 83 def insert_stage(self, stage: Stage, marker: StageMarker | None = None, index=1): 84 """Insert stage into plan, as immediate next stage""" 85 self.bindings.insert(index, FlowStageBinding(stage=stage, order=0)) 86 self.markers.insert(index, marker or StageMarker()) 87 88 def redirect(self, destination: str): 89 """Insert a redirect stage as next stage""" 90 from authentik.flows.stage import RedirectStage 91 92 self.insert_stage(in_memory_stage(RedirectStage, destination=destination)) 93 94 def next(self, http_request: HttpRequest | None) -> FlowStageBinding | None: 95 """Return next pending stage from the bottom of the list""" 96 if not self.has_stages: 97 return None 98 binding = self.bindings[0] 99 marker = self.markers[0] 100 101 if marker.__class__ is not StageMarker: 102 LOGGER.debug("f(plan_inst): stage has marker", binding=binding, marker=marker) 103 marked_stage = marker.process(self, binding, http_request) 104 if not marked_stage: 105 LOGGER.debug("f(plan_inst): marker returned none, next stage", binding=binding) 106 self.bindings.remove(binding) 107 self.markers.remove(marker) 108 if not self.has_stages: 109 return None 110 111 return self.next(http_request) 112 return marked_stage 113 114 def pop(self): 115 """Pop next pending stage from bottom of list""" 116 if not self.markers and not self.bindings: 117 return 118 self.markers.pop(0) 119 self.bindings.pop(0) 120 121 @property 122 def has_stages(self) -> bool: 123 """Check if there are any stages left in this plan""" 124 return len(self.markers) + len(self.bindings) > 0 125 126 def requires_flow_executor( 127 self, 128 allowed_silent_types: list[StageView] | None = None, 129 ): 130 # Check if we actually need to show the Flow executor, or if we can jump straight to the end 131 found_unskippable = True 132 if allowed_silent_types: 133 LOGGER.debug("Checking if we can skip the flow executor...") 134 # Policies applied to the flow have already been evaluated, so we're checking for stages 135 # allow-listed or bindings that require a policy re-eval 136 found_unskippable = False 137 for binding, marker in zip(self.bindings, self.markers, strict=True): 138 if binding.stage.view not in allowed_silent_types: 139 found_unskippable = True 140 if marker and isinstance(marker, ReevaluateMarker): 141 found_unskippable = True 142 LOGGER.debug("Required flow executor status", status=found_unskippable) 143 return found_unskippable 144 145 def to_redirect( 146 self, 147 request: HttpRequest, 148 flow: Flow, 149 next: str | None = None, 150 allowed_silent_types: list[StageView] | None = None, 151 ) -> HttpResponse: 152 """Redirect to the flow executor for this flow plan""" 153 from authentik.flows.views.executor import ( 154 NEXT_ARG_NAME, 155 SESSION_KEY_PLAN, 156 FlowExecutorView, 157 ) 158 159 request.session[SESSION_KEY_PLAN] = self 160 requires_flow_executor = self.requires_flow_executor(allowed_silent_types) 161 162 if not requires_flow_executor: 163 # No unskippable stages found, so we can directly return the response of the last stage 164 final_stage: type[StageView] = self.bindings[-1].stage.view 165 temp_exec = FlowExecutorView(flow=flow, request=request, plan=self) 166 temp_exec.current_stage = self.bindings[-1].stage 167 temp_exec.current_stage_view = final_stage 168 temp_exec.setup(request, flow.slug) 169 stage = final_stage(request=request, executor=temp_exec) 170 response = stage.dispatch(request) 171 # Ensure we clean the flow state we have in the session before we redirect away 172 temp_exec.stage_ok() 173 return response 174 175 get_qs = request.GET.copy() 176 if request.user.is_authenticated and ( 177 # Object-scoped permission or global permission 178 request.user.has_perm("authentik_flows.inspect_flow", flow) 179 or request.user.has_perm("authentik_flows.inspect_flow") 180 ): 181 get_qs["inspector"] = "available" 182 if next: 183 get_qs[NEXT_ARG_NAME] = next 184 185 return redirect_with_qs( 186 "authentik_core:if-flow", 187 get_qs, 188 flow_slug=flow.slug, 189 ) 190 191 192class FlowPlanner: 193 """Execute all policies to plan out a flat list of all Stages 194 that should be applied.""" 195 196 use_cache: bool 197 allow_empty_flows: bool 198 199 flow: Flow 200 201 _logger: BoundLogger 202 203 def __init__(self, flow: Flow): 204 self.use_cache = True 205 self.allow_empty_flows = False 206 self.flow = flow 207 self._logger = get_logger().bind(flow_slug=flow.slug) 208 209 def _check_authentication(self, request: HttpRequest, context: dict[str, Any]): 210 """Check the flow's authentication level is matched by `request`""" 211 if ( 212 self.flow.authentication == FlowAuthenticationRequirement.REQUIRE_AUTHENTICATED 213 and not request.user.is_authenticated 214 ): 215 raise FlowNonApplicableException() 216 if ( 217 self.flow.authentication == FlowAuthenticationRequirement.REQUIRE_UNAUTHENTICATED 218 and request.user.is_authenticated 219 ): 220 raise FlowNonApplicableException() 221 if ( 222 self.flow.authentication == FlowAuthenticationRequirement.REQUIRE_SUPERUSER 223 and not request.user.is_superuser 224 ): 225 raise FlowNonApplicableException() 226 if ( 227 self.flow.authentication == FlowAuthenticationRequirement.REQUIRE_REDIRECT 228 and context.get(PLAN_CONTEXT_IS_REDIRECTED) is None 229 ): 230 raise FlowNonApplicableException() 231 if ( 232 self.flow.authentication == FlowAuthenticationRequirement.REQUIRE_TOKEN 233 and context.get(PLAN_CONTEXT_IS_RESTORED) is None 234 ): 235 raise FlowNonApplicableException( 236 PolicyResult( 237 False, _("This link is invalid or has expired. Please request a new one.") 238 ) 239 ) 240 outpost_user = ClientIPMiddleware.get_outpost_user(request) 241 if self.flow.authentication == FlowAuthenticationRequirement.REQUIRE_OUTPOST: 242 if not outpost_user: 243 raise FlowNonApplicableException() 244 if outpost_user: 245 outpost = Outpost.objects.filter( 246 # TODO: Since Outpost and user are not directly connected, we have to look up a user 247 # like this. This should ideally by in authentik/outposts/models.py 248 pk=outpost_user.username.replace("ak-outpost-", "") 249 ).first() 250 if outpost: 251 return { 252 PLAN_CONTEXT_OUTPOST: { 253 "instance": outpost, 254 } 255 } 256 return {} 257 258 def plan(self, request: HttpRequest, default_context: dict[str, Any] | None = None) -> FlowPlan: 259 """Check each of the flows' policies, check policies for each stage with PolicyBinding 260 and return ordered list""" 261 with start_span(op="authentik.flow.planner.plan", name=self.flow.slug) as span: 262 span: Span 263 span.set_data("flow", self.flow) 264 span.set_data("request", request) 265 266 self._logger.debug( 267 "f(plan): starting planning process", 268 ) 269 context = default_context or {} 270 # Bit of a workaround here, if there is a pending user set in the default context 271 # we use that user for our cache key to make sure they don't get the generic response 272 if context and PLAN_CONTEXT_PENDING_USER in context: 273 user = context[PLAN_CONTEXT_PENDING_USER] 274 else: 275 user = request.user 276 277 context.update(self._check_authentication(request, context)) 278 # First off, check the flow's direct policy bindings 279 # to make sure the user even has access to the flow 280 engine = PolicyEngine(self.flow, user, request) 281 engine.use_cache = self.use_cache 282 span.set_data("context", cleanse_dict(context)) 283 engine.request.context.update(context) 284 engine.build() 285 result = engine.result 286 if not result.passing: 287 raise FlowNonApplicableException(result) 288 # User is passing so far, check if we have a cached plan 289 cached_plan_key = cache_key(self.flow, user) 290 cached_plan = cache.get(cached_plan_key, None) 291 if self.flow.designation not in [FlowDesignation.STAGE_CONFIGURATION]: 292 if cached_plan and self.use_cache: 293 self._logger.debug( 294 "f(plan): taking plan from cache", 295 key=cached_plan_key, 296 ) 297 # Reset the context as this isn't factored into caching 298 cached_plan.context = context 299 return cached_plan 300 self._logger.debug( 301 "f(plan): building plan", 302 ) 303 plan = self._build_plan(user, request, context) 304 if self.use_cache: 305 cache.set(cache_key(self.flow, user), plan, CACHE_TIMEOUT) 306 if not plan.bindings and not self.allow_empty_flows: 307 raise EmptyFlowException() 308 return plan 309 310 def _build_plan( 311 self, 312 user: User, 313 request: HttpRequest, 314 default_context: dict[str, Any] | None, 315 ) -> FlowPlan: 316 """Build flow plan by checking each stage in their respective 317 order and checking the applied policies""" 318 with ( 319 start_span( 320 op="authentik.flow.planner.build_plan", 321 name=self.flow.slug, 322 ) as span, 323 HIST_FLOWS_PLAN_TIME.labels(flow_slug=self.flow.slug).time(), 324 ): 325 span: Span 326 span.set_data("flow", self.flow) 327 span.set_data("user", user) 328 span.set_data("request", request) 329 330 plan = FlowPlan(flow_pk=self.flow.pk.hex) 331 if default_context: 332 plan.context = default_context 333 # Check Flow policies 334 bindings = list( 335 FlowStageBinding.objects.filter(target__pk=self.flow.pk).order_by("order") 336 ) 337 stages = Stage.objects.filter(flowstagebinding__in=[binding.pk for binding in bindings]) 338 for binding in bindings: 339 binding: FlowStageBinding 340 stage = [stage for stage in stages if stage.pk == binding.stage_id][0] 341 marker = StageMarker() 342 if binding.evaluate_on_plan: 343 self._logger.debug( 344 "f(plan): evaluating on plan", 345 stage=stage, 346 ) 347 engine = PolicyEngine(binding, user, request) 348 engine.use_cache = self.use_cache 349 engine.request.context["flow_plan"] = plan 350 engine.request.context.update(plan.context) 351 engine.build() 352 if engine.passing: 353 self._logger.debug( 354 "f(plan): stage passing", 355 stage=stage, 356 ) 357 else: 358 stage = None 359 else: 360 self._logger.debug( 361 "f(plan): not evaluating on plan", 362 stage=stage, 363 ) 364 if binding.re_evaluate_policies and stage: 365 self._logger.debug( 366 "f(plan): stage has re-evaluate marker", 367 stage=stage, 368 ) 369 marker = ReevaluateMarker(binding=binding) 370 if stage: 371 plan.append(binding, marker) 372 self._logger.debug( 373 "f(plan): finished building", 374 ) 375 return plan
LOGGER =
<BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
PLAN_CONTEXT_PENDING_USER =
'pending_user'
PLAN_CONTEXT_SSO =
'is_sso'
PLAN_CONTEXT_REDIRECT =
'redirect'
PLAN_CONTEXT_APPLICATION =
'application'
PLAN_CONTEXT_DEVICE =
'device'
PLAN_CONTEXT_SOURCE =
'source'
PLAN_CONTEXT_OUTPOST =
'outpost'
PLAN_CONTEXT_POST =
'goauthentik.io/http/post'
PLAN_CONTEXT_IS_RESTORED =
'is_restored'
PLAN_CONTEXT_IS_REDIRECTED =
'is_redirected'
PLAN_CONTEXT_REDIRECT_STAGE_TARGET =
'redirect_stage_target'
CACHE_TIMEOUT =
300
CACHE_PREFIX =
'goauthentik.io/flows/planner/'
def
cache_key( flow: authentik.flows.models.Flow, user: authentik.core.models.User | None = None) -> str:
56def cache_key(flow: Flow, user: User | None = None) -> str: 57 """Generate Cache key for flow""" 58 prefix = CACHE_PREFIX + str(flow.pk) 59 if user: 60 prefix += f"#{user.pk}" 61 return prefix
Generate Cache key for flow
@dataclass(slots=True)
class
FlowPlan:
64@dataclass(slots=True) 65class FlowPlan: 66 """This data-class is the output of a FlowPlanner. It holds a flat list 67 of all Stages that should be run.""" 68 69 flow_pk: str 70 71 bindings: list[FlowStageBinding] = field(default_factory=list) 72 context: dict[str, Any] = field(default_factory=dict) 73 markers: list[StageMarker] = field(default_factory=list) 74 75 def append_stage(self, stage: Stage, marker: StageMarker | None = None): 76 """Append `stage` to the end of the plan, optionally with stage marker""" 77 return self.append(FlowStageBinding(stage=stage), marker) 78 79 def append(self, binding: FlowStageBinding, marker: StageMarker | None = None): 80 """Append `stage` to the end of the plan, optionally with stage marker""" 81 self.bindings.append(binding) 82 self.markers.append(marker or StageMarker()) 83 84 def insert_stage(self, stage: Stage, marker: StageMarker | None = None, index=1): 85 """Insert stage into plan, as immediate next stage""" 86 self.bindings.insert(index, FlowStageBinding(stage=stage, order=0)) 87 self.markers.insert(index, marker or StageMarker()) 88 89 def redirect(self, destination: str): 90 """Insert a redirect stage as next stage""" 91 from authentik.flows.stage import RedirectStage 92 93 self.insert_stage(in_memory_stage(RedirectStage, destination=destination)) 94 95 def next(self, http_request: HttpRequest | None) -> FlowStageBinding | None: 96 """Return next pending stage from the bottom of the list""" 97 if not self.has_stages: 98 return None 99 binding = self.bindings[0] 100 marker = self.markers[0] 101 102 if marker.__class__ is not StageMarker: 103 LOGGER.debug("f(plan_inst): stage has marker", binding=binding, marker=marker) 104 marked_stage = marker.process(self, binding, http_request) 105 if not marked_stage: 106 LOGGER.debug("f(plan_inst): marker returned none, next stage", binding=binding) 107 self.bindings.remove(binding) 108 self.markers.remove(marker) 109 if not self.has_stages: 110 return None 111 112 return self.next(http_request) 113 return marked_stage 114 115 def pop(self): 116 """Pop next pending stage from bottom of list""" 117 if not self.markers and not self.bindings: 118 return 119 self.markers.pop(0) 120 self.bindings.pop(0) 121 122 @property 123 def has_stages(self) -> bool: 124 """Check if there are any stages left in this plan""" 125 return len(self.markers) + len(self.bindings) > 0 126 127 def requires_flow_executor( 128 self, 129 allowed_silent_types: list[StageView] | None = None, 130 ): 131 # Check if we actually need to show the Flow executor, or if we can jump straight to the end 132 found_unskippable = True 133 if allowed_silent_types: 134 LOGGER.debug("Checking if we can skip the flow executor...") 135 # Policies applied to the flow have already been evaluated, so we're checking for stages 136 # allow-listed or bindings that require a policy re-eval 137 found_unskippable = False 138 for binding, marker in zip(self.bindings, self.markers, strict=True): 139 if binding.stage.view not in allowed_silent_types: 140 found_unskippable = True 141 if marker and isinstance(marker, ReevaluateMarker): 142 found_unskippable = True 143 LOGGER.debug("Required flow executor status", status=found_unskippable) 144 return found_unskippable 145 146 def to_redirect( 147 self, 148 request: HttpRequest, 149 flow: Flow, 150 next: str | None = None, 151 allowed_silent_types: list[StageView] | None = None, 152 ) -> HttpResponse: 153 """Redirect to the flow executor for this flow plan""" 154 from authentik.flows.views.executor import ( 155 NEXT_ARG_NAME, 156 SESSION_KEY_PLAN, 157 FlowExecutorView, 158 ) 159 160 request.session[SESSION_KEY_PLAN] = self 161 requires_flow_executor = self.requires_flow_executor(allowed_silent_types) 162 163 if not requires_flow_executor: 164 # No unskippable stages found, so we can directly return the response of the last stage 165 final_stage: type[StageView] = self.bindings[-1].stage.view 166 temp_exec = FlowExecutorView(flow=flow, request=request, plan=self) 167 temp_exec.current_stage = self.bindings[-1].stage 168 temp_exec.current_stage_view = final_stage 169 temp_exec.setup(request, flow.slug) 170 stage = final_stage(request=request, executor=temp_exec) 171 response = stage.dispatch(request) 172 # Ensure we clean the flow state we have in the session before we redirect away 173 temp_exec.stage_ok() 174 return response 175 176 get_qs = request.GET.copy() 177 if request.user.is_authenticated and ( 178 # Object-scoped permission or global permission 179 request.user.has_perm("authentik_flows.inspect_flow", flow) 180 or request.user.has_perm("authentik_flows.inspect_flow") 181 ): 182 get_qs["inspector"] = "available" 183 if next: 184 get_qs[NEXT_ARG_NAME] = next 185 186 return redirect_with_qs( 187 "authentik_core:if-flow", 188 get_qs, 189 flow_slug=flow.slug, 190 )
This data-class is the output of a FlowPlanner. It holds a flat list of all Stages that should be run.
FlowPlan( flow_pk: str, bindings: list[authentik.flows.models.FlowStageBinding] = <factory>, context: dict[str, typing.Any] = <factory>, markers: list[authentik.flows.markers.StageMarker] = <factory>)
bindings: list[authentik.flows.models.FlowStageBinding]
markers: list[authentik.flows.markers.StageMarker]
def
append_stage( self, stage: authentik.flows.models.Stage, marker: authentik.flows.markers.StageMarker | None = None):
75 def append_stage(self, stage: Stage, marker: StageMarker | None = None): 76 """Append `stage` to the end of the plan, optionally with stage marker""" 77 return self.append(FlowStageBinding(stage=stage), marker)
Append stage to the end of the plan, optionally with stage marker
def
append( self, binding: authentik.flows.models.FlowStageBinding, marker: authentik.flows.markers.StageMarker | None = None):
79 def append(self, binding: FlowStageBinding, marker: StageMarker | None = None): 80 """Append `stage` to the end of the plan, optionally with stage marker""" 81 self.bindings.append(binding) 82 self.markers.append(marker or StageMarker())
Append stage to the end of the plan, optionally with stage marker
def
insert_stage( self, stage: authentik.flows.models.Stage, marker: authentik.flows.markers.StageMarker | None = None, index=1):
84 def insert_stage(self, stage: Stage, marker: StageMarker | None = None, index=1): 85 """Insert stage into plan, as immediate next stage""" 86 self.bindings.insert(index, FlowStageBinding(stage=stage, order=0)) 87 self.markers.insert(index, marker or StageMarker())
Insert stage into plan, as immediate next stage
def
redirect(self, destination: str):
89 def redirect(self, destination: str): 90 """Insert a redirect stage as next stage""" 91 from authentik.flows.stage import RedirectStage 92 93 self.insert_stage(in_memory_stage(RedirectStage, destination=destination))
Insert a redirect stage as next stage
def
next( self, http_request: django.http.request.HttpRequest | None) -> authentik.flows.models.FlowStageBinding | None:
95 def next(self, http_request: HttpRequest | None) -> FlowStageBinding | None: 96 """Return next pending stage from the bottom of the list""" 97 if not self.has_stages: 98 return None 99 binding = self.bindings[0] 100 marker = self.markers[0] 101 102 if marker.__class__ is not StageMarker: 103 LOGGER.debug("f(plan_inst): stage has marker", binding=binding, marker=marker) 104 marked_stage = marker.process(self, binding, http_request) 105 if not marked_stage: 106 LOGGER.debug("f(plan_inst): marker returned none, next stage", binding=binding) 107 self.bindings.remove(binding) 108 self.markers.remove(marker) 109 if not self.has_stages: 110 return None 111 112 return self.next(http_request) 113 return marked_stage
Return next pending stage from the bottom of the list
def
pop(self):
115 def pop(self): 116 """Pop next pending stage from bottom of list""" 117 if not self.markers and not self.bindings: 118 return 119 self.markers.pop(0) 120 self.bindings.pop(0)
Pop next pending stage from bottom of list
has_stages: bool
122 @property 123 def has_stages(self) -> bool: 124 """Check if there are any stages left in this plan""" 125 return len(self.markers) + len(self.bindings) > 0
Check if there are any stages left in this plan
def
requires_flow_executor(unknown):
127 def requires_flow_executor( 128 self, 129 allowed_silent_types: list[StageView] | None = None, 130 ): 131 # Check if we actually need to show the Flow executor, or if we can jump straight to the end 132 found_unskippable = True 133 if allowed_silent_types: 134 LOGGER.debug("Checking if we can skip the flow executor...") 135 # Policies applied to the flow have already been evaluated, so we're checking for stages 136 # allow-listed or bindings that require a policy re-eval 137 found_unskippable = False 138 for binding, marker in zip(self.bindings, self.markers, strict=True): 139 if binding.stage.view not in allowed_silent_types: 140 found_unskippable = True 141 if marker and isinstance(marker, ReevaluateMarker): 142 found_unskippable = True 143 LOGGER.debug("Required flow executor status", status=found_unskippable) 144 return found_unskippable
def
to_redirect(unknown):
146 def to_redirect( 147 self, 148 request: HttpRequest, 149 flow: Flow, 150 next: str | None = None, 151 allowed_silent_types: list[StageView] | None = None, 152 ) -> HttpResponse: 153 """Redirect to the flow executor for this flow plan""" 154 from authentik.flows.views.executor import ( 155 NEXT_ARG_NAME, 156 SESSION_KEY_PLAN, 157 FlowExecutorView, 158 ) 159 160 request.session[SESSION_KEY_PLAN] = self 161 requires_flow_executor = self.requires_flow_executor(allowed_silent_types) 162 163 if not requires_flow_executor: 164 # No unskippable stages found, so we can directly return the response of the last stage 165 final_stage: type[StageView] = self.bindings[-1].stage.view 166 temp_exec = FlowExecutorView(flow=flow, request=request, plan=self) 167 temp_exec.current_stage = self.bindings[-1].stage 168 temp_exec.current_stage_view = final_stage 169 temp_exec.setup(request, flow.slug) 170 stage = final_stage(request=request, executor=temp_exec) 171 response = stage.dispatch(request) 172 # Ensure we clean the flow state we have in the session before we redirect away 173 temp_exec.stage_ok() 174 return response 175 176 get_qs = request.GET.copy() 177 if request.user.is_authenticated and ( 178 # Object-scoped permission or global permission 179 request.user.has_perm("authentik_flows.inspect_flow", flow) 180 or request.user.has_perm("authentik_flows.inspect_flow") 181 ): 182 get_qs["inspector"] = "available" 183 if next: 184 get_qs[NEXT_ARG_NAME] = next 185 186 return redirect_with_qs( 187 "authentik_core:if-flow", 188 get_qs, 189 flow_slug=flow.slug, 190 )
Redirect to the flow executor for this flow plan
class
FlowPlanner:
193class FlowPlanner: 194 """Execute all policies to plan out a flat list of all Stages 195 that should be applied.""" 196 197 use_cache: bool 198 allow_empty_flows: bool 199 200 flow: Flow 201 202 _logger: BoundLogger 203 204 def __init__(self, flow: Flow): 205 self.use_cache = True 206 self.allow_empty_flows = False 207 self.flow = flow 208 self._logger = get_logger().bind(flow_slug=flow.slug) 209 210 def _check_authentication(self, request: HttpRequest, context: dict[str, Any]): 211 """Check the flow's authentication level is matched by `request`""" 212 if ( 213 self.flow.authentication == FlowAuthenticationRequirement.REQUIRE_AUTHENTICATED 214 and not request.user.is_authenticated 215 ): 216 raise FlowNonApplicableException() 217 if ( 218 self.flow.authentication == FlowAuthenticationRequirement.REQUIRE_UNAUTHENTICATED 219 and request.user.is_authenticated 220 ): 221 raise FlowNonApplicableException() 222 if ( 223 self.flow.authentication == FlowAuthenticationRequirement.REQUIRE_SUPERUSER 224 and not request.user.is_superuser 225 ): 226 raise FlowNonApplicableException() 227 if ( 228 self.flow.authentication == FlowAuthenticationRequirement.REQUIRE_REDIRECT 229 and context.get(PLAN_CONTEXT_IS_REDIRECTED) is None 230 ): 231 raise FlowNonApplicableException() 232 if ( 233 self.flow.authentication == FlowAuthenticationRequirement.REQUIRE_TOKEN 234 and context.get(PLAN_CONTEXT_IS_RESTORED) is None 235 ): 236 raise FlowNonApplicableException( 237 PolicyResult( 238 False, _("This link is invalid or has expired. Please request a new one.") 239 ) 240 ) 241 outpost_user = ClientIPMiddleware.get_outpost_user(request) 242 if self.flow.authentication == FlowAuthenticationRequirement.REQUIRE_OUTPOST: 243 if not outpost_user: 244 raise FlowNonApplicableException() 245 if outpost_user: 246 outpost = Outpost.objects.filter( 247 # TODO: Since Outpost and user are not directly connected, we have to look up a user 248 # like this. This should ideally by in authentik/outposts/models.py 249 pk=outpost_user.username.replace("ak-outpost-", "") 250 ).first() 251 if outpost: 252 return { 253 PLAN_CONTEXT_OUTPOST: { 254 "instance": outpost, 255 } 256 } 257 return {} 258 259 def plan(self, request: HttpRequest, default_context: dict[str, Any] | None = None) -> FlowPlan: 260 """Check each of the flows' policies, check policies for each stage with PolicyBinding 261 and return ordered list""" 262 with start_span(op="authentik.flow.planner.plan", name=self.flow.slug) as span: 263 span: Span 264 span.set_data("flow", self.flow) 265 span.set_data("request", request) 266 267 self._logger.debug( 268 "f(plan): starting planning process", 269 ) 270 context = default_context or {} 271 # Bit of a workaround here, if there is a pending user set in the default context 272 # we use that user for our cache key to make sure they don't get the generic response 273 if context and PLAN_CONTEXT_PENDING_USER in context: 274 user = context[PLAN_CONTEXT_PENDING_USER] 275 else: 276 user = request.user 277 278 context.update(self._check_authentication(request, context)) 279 # First off, check the flow's direct policy bindings 280 # to make sure the user even has access to the flow 281 engine = PolicyEngine(self.flow, user, request) 282 engine.use_cache = self.use_cache 283 span.set_data("context", cleanse_dict(context)) 284 engine.request.context.update(context) 285 engine.build() 286 result = engine.result 287 if not result.passing: 288 raise FlowNonApplicableException(result) 289 # User is passing so far, check if we have a cached plan 290 cached_plan_key = cache_key(self.flow, user) 291 cached_plan = cache.get(cached_plan_key, None) 292 if self.flow.designation not in [FlowDesignation.STAGE_CONFIGURATION]: 293 if cached_plan and self.use_cache: 294 self._logger.debug( 295 "f(plan): taking plan from cache", 296 key=cached_plan_key, 297 ) 298 # Reset the context as this isn't factored into caching 299 cached_plan.context = context 300 return cached_plan 301 self._logger.debug( 302 "f(plan): building plan", 303 ) 304 plan = self._build_plan(user, request, context) 305 if self.use_cache: 306 cache.set(cache_key(self.flow, user), plan, CACHE_TIMEOUT) 307 if not plan.bindings and not self.allow_empty_flows: 308 raise EmptyFlowException() 309 return plan 310 311 def _build_plan( 312 self, 313 user: User, 314 request: HttpRequest, 315 default_context: dict[str, Any] | None, 316 ) -> FlowPlan: 317 """Build flow plan by checking each stage in their respective 318 order and checking the applied policies""" 319 with ( 320 start_span( 321 op="authentik.flow.planner.build_plan", 322 name=self.flow.slug, 323 ) as span, 324 HIST_FLOWS_PLAN_TIME.labels(flow_slug=self.flow.slug).time(), 325 ): 326 span: Span 327 span.set_data("flow", self.flow) 328 span.set_data("user", user) 329 span.set_data("request", request) 330 331 plan = FlowPlan(flow_pk=self.flow.pk.hex) 332 if default_context: 333 plan.context = default_context 334 # Check Flow policies 335 bindings = list( 336 FlowStageBinding.objects.filter(target__pk=self.flow.pk).order_by("order") 337 ) 338 stages = Stage.objects.filter(flowstagebinding__in=[binding.pk for binding in bindings]) 339 for binding in bindings: 340 binding: FlowStageBinding 341 stage = [stage for stage in stages if stage.pk == binding.stage_id][0] 342 marker = StageMarker() 343 if binding.evaluate_on_plan: 344 self._logger.debug( 345 "f(plan): evaluating on plan", 346 stage=stage, 347 ) 348 engine = PolicyEngine(binding, user, request) 349 engine.use_cache = self.use_cache 350 engine.request.context["flow_plan"] = plan 351 engine.request.context.update(plan.context) 352 engine.build() 353 if engine.passing: 354 self._logger.debug( 355 "f(plan): stage passing", 356 stage=stage, 357 ) 358 else: 359 stage = None 360 else: 361 self._logger.debug( 362 "f(plan): not evaluating on plan", 363 stage=stage, 364 ) 365 if binding.re_evaluate_policies and stage: 366 self._logger.debug( 367 "f(plan): stage has re-evaluate marker", 368 stage=stage, 369 ) 370 marker = ReevaluateMarker(binding=binding) 371 if stage: 372 plan.append(binding, marker) 373 self._logger.debug( 374 "f(plan): finished building", 375 ) 376 return plan
Execute all policies to plan out a flat list of all Stages that should be applied.
FlowPlanner(flow: authentik.flows.models.Flow)
def
plan( self, request: django.http.request.HttpRequest, default_context: dict[str, Any] | None = None) -> FlowPlan:
259 def plan(self, request: HttpRequest, default_context: dict[str, Any] | None = None) -> FlowPlan: 260 """Check each of the flows' policies, check policies for each stage with PolicyBinding 261 and return ordered list""" 262 with start_span(op="authentik.flow.planner.plan", name=self.flow.slug) as span: 263 span: Span 264 span.set_data("flow", self.flow) 265 span.set_data("request", request) 266 267 self._logger.debug( 268 "f(plan): starting planning process", 269 ) 270 context = default_context or {} 271 # Bit of a workaround here, if there is a pending user set in the default context 272 # we use that user for our cache key to make sure they don't get the generic response 273 if context and PLAN_CONTEXT_PENDING_USER in context: 274 user = context[PLAN_CONTEXT_PENDING_USER] 275 else: 276 user = request.user 277 278 context.update(self._check_authentication(request, context)) 279 # First off, check the flow's direct policy bindings 280 # to make sure the user even has access to the flow 281 engine = PolicyEngine(self.flow, user, request) 282 engine.use_cache = self.use_cache 283 span.set_data("context", cleanse_dict(context)) 284 engine.request.context.update(context) 285 engine.build() 286 result = engine.result 287 if not result.passing: 288 raise FlowNonApplicableException(result) 289 # User is passing so far, check if we have a cached plan 290 cached_plan_key = cache_key(self.flow, user) 291 cached_plan = cache.get(cached_plan_key, None) 292 if self.flow.designation not in [FlowDesignation.STAGE_CONFIGURATION]: 293 if cached_plan and self.use_cache: 294 self._logger.debug( 295 "f(plan): taking plan from cache", 296 key=cached_plan_key, 297 ) 298 # Reset the context as this isn't factored into caching 299 cached_plan.context = context 300 return cached_plan 301 self._logger.debug( 302 "f(plan): building plan", 303 ) 304 plan = self._build_plan(user, request, context) 305 if self.use_cache: 306 cache.set(cache_key(self.flow, user), plan, CACHE_TIMEOUT) 307 if not plan.bindings and not self.allow_empty_flows: 308 raise EmptyFlowException() 309 return plan
Check each of the flows' policies, check policies for each stage with PolicyBinding and return ordered list