authentik.flows.planner
Flows Planner
1"""Flows Planner""" 2 3from dataclasses import dataclass, field 4from typing import TYPE_CHECKING, Any 5 6from django.core.cache import cache 7from django.http import HttpRequest, HttpResponse 8from sentry_sdk import start_span 9from sentry_sdk.tracing import Span 10from structlog.stdlib import BoundLogger, get_logger 11 12from authentik.core.models import User 13from authentik.events.models import cleanse_dict 14from authentik.flows.apps import HIST_FLOWS_PLAN_TIME 15from authentik.flows.exceptions import EmptyFlowException, FlowNonApplicableException 16from authentik.flows.markers import ReevaluateMarker, StageMarker 17from authentik.flows.models import ( 18 Flow, 19 FlowAuthenticationRequirement, 20 FlowDesignation, 21 FlowStageBinding, 22 Stage, 23 in_memory_stage, 24) 25from authentik.lib.config import CONFIG 26from authentik.lib.utils.urls import redirect_with_qs 27from authentik.outposts.models import Outpost 28from authentik.policies.engine import PolicyEngine 29from authentik.root.middleware import ClientIPMiddleware 30 31if TYPE_CHECKING: 32 from authentik.flows.stage import StageView 33 34 35LOGGER = get_logger() 36PLAN_CONTEXT_PENDING_USER = "pending_user" 37PLAN_CONTEXT_SSO = "is_sso" 38PLAN_CONTEXT_REDIRECT = "redirect" 39PLAN_CONTEXT_APPLICATION = "application" 40PLAN_CONTEXT_DEVICE = "device" 41PLAN_CONTEXT_SOURCE = "source" 42PLAN_CONTEXT_OUTPOST = "outpost" 43PLAN_CONTEXT_POST = "goauthentik.io/http/post" 44# Is set by the Flow Planner when a FlowToken was used, and the currently active flow plan 45# was restored. 46PLAN_CONTEXT_IS_RESTORED = "is_restored" 47PLAN_CONTEXT_IS_REDIRECTED = "is_redirected" 48PLAN_CONTEXT_REDIRECT_STAGE_TARGET = "redirect_stage_target" 49CACHE_TIMEOUT = CONFIG.get_int("cache.timeout_flows") 50CACHE_PREFIX = "goauthentik.io/flows/planner/" 51 52 53def cache_key(flow: Flow, user: User | None = None) -> str: 54 """Generate Cache key for flow""" 55 prefix = CACHE_PREFIX + str(flow.pk) 56 if user: 57 prefix += f"#{user.pk}" 58 return prefix 59 60 61@dataclass(slots=True) 62class FlowPlan: 63 """This data-class is the output of a FlowPlanner. It holds a flat list 64 of all Stages that should be run.""" 65 66 flow_pk: str 67 68 bindings: list[FlowStageBinding] = field(default_factory=list) 69 context: dict[str, Any] = field(default_factory=dict) 70 markers: list[StageMarker] = field(default_factory=list) 71 72 def append_stage(self, stage: Stage, marker: StageMarker | None = None): 73 """Append `stage` to the end of the plan, optionally with stage marker""" 74 return self.append(FlowStageBinding(stage=stage), marker) 75 76 def append(self, binding: FlowStageBinding, marker: StageMarker | None = None): 77 """Append `stage` to the end of the plan, optionally with stage marker""" 78 self.bindings.append(binding) 79 self.markers.append(marker or StageMarker()) 80 81 def insert_stage(self, stage: Stage, marker: StageMarker | None = None, index=1): 82 """Insert stage into plan, as immediate next stage""" 83 self.bindings.insert(index, FlowStageBinding(stage=stage, order=0)) 84 self.markers.insert(index, marker or StageMarker()) 85 86 def redirect(self, destination: str): 87 """Insert a redirect stage as next stage""" 88 from authentik.flows.stage import RedirectStage 89 90 self.insert_stage(in_memory_stage(RedirectStage, destination=destination)) 91 92 def next(self, http_request: HttpRequest | None) -> FlowStageBinding | None: 93 """Return next pending stage from the bottom of the list""" 94 if not self.has_stages: 95 return None 96 binding = self.bindings[0] 97 marker = self.markers[0] 98 99 if marker.__class__ is not StageMarker: 100 LOGGER.debug("f(plan_inst): stage has marker", binding=binding, marker=marker) 101 marked_stage = marker.process(self, binding, http_request) 102 if not marked_stage: 103 LOGGER.debug("f(plan_inst): marker returned none, next stage", binding=binding) 104 self.bindings.remove(binding) 105 self.markers.remove(marker) 106 if not self.has_stages: 107 return None 108 109 return self.next(http_request) 110 return marked_stage 111 112 def pop(self): 113 """Pop next pending stage from bottom of list""" 114 if not self.markers and not self.bindings: 115 return 116 self.markers.pop(0) 117 self.bindings.pop(0) 118 119 @property 120 def has_stages(self) -> bool: 121 """Check if there are any stages left in this plan""" 122 return len(self.markers) + len(self.bindings) > 0 123 124 def requires_flow_executor( 125 self, 126 allowed_silent_types: list[StageView] | None = None, 127 ): 128 # Check if we actually need to show the Flow executor, or if we can jump straight to the end 129 found_unskippable = True 130 if allowed_silent_types: 131 LOGGER.debug("Checking if we can skip the flow executor...") 132 # Policies applied to the flow have already been evaluated, so we're checking for stages 133 # allow-listed or bindings that require a policy re-eval 134 found_unskippable = False 135 for binding, marker in zip(self.bindings, self.markers, strict=True): 136 if binding.stage.view not in allowed_silent_types: 137 found_unskippable = True 138 if marker and isinstance(marker, ReevaluateMarker): 139 found_unskippable = True 140 LOGGER.debug("Required flow executor status", status=found_unskippable) 141 return found_unskippable 142 143 def to_redirect( 144 self, 145 request: HttpRequest, 146 flow: Flow, 147 next: str | None = None, 148 allowed_silent_types: list[StageView] | None = None, 149 ) -> HttpResponse: 150 """Redirect to the flow executor for this flow plan""" 151 from authentik.flows.views.executor import ( 152 NEXT_ARG_NAME, 153 SESSION_KEY_PLAN, 154 FlowExecutorView, 155 ) 156 157 request.session[SESSION_KEY_PLAN] = self 158 requires_flow_executor = self.requires_flow_executor(allowed_silent_types) 159 160 if not requires_flow_executor: 161 # No unskippable stages found, so we can directly return the response of the last stage 162 final_stage: type[StageView] = self.bindings[-1].stage.view 163 temp_exec = FlowExecutorView(flow=flow, request=request, plan=self) 164 temp_exec.current_stage = self.bindings[-1].stage 165 temp_exec.current_stage_view = final_stage 166 temp_exec.setup(request, flow.slug) 167 stage = final_stage(request=request, executor=temp_exec) 168 response = stage.dispatch(request) 169 # Ensure we clean the flow state we have in the session before we redirect away 170 temp_exec.stage_ok() 171 return response 172 173 get_qs = request.GET.copy() 174 if request.user.is_authenticated and ( 175 # Object-scoped permission or global permission 176 request.user.has_perm("authentik_flows.inspect_flow", flow) 177 or request.user.has_perm("authentik_flows.inspect_flow") 178 ): 179 get_qs["inspector"] = "available" 180 if next: 181 get_qs[NEXT_ARG_NAME] = next 182 183 return redirect_with_qs( 184 "authentik_core:if-flow", 185 get_qs, 186 flow_slug=flow.slug, 187 ) 188 189 190class FlowPlanner: 191 """Execute all policies to plan out a flat list of all Stages 192 that should be applied.""" 193 194 use_cache: bool 195 allow_empty_flows: bool 196 197 flow: Flow 198 199 _logger: BoundLogger 200 201 def __init__(self, flow: Flow): 202 self.use_cache = True 203 self.allow_empty_flows = False 204 self.flow = flow 205 self._logger = get_logger().bind(flow_slug=flow.slug) 206 207 def _check_authentication(self, request: HttpRequest, context: dict[str, Any]): 208 """Check the flow's authentication level is matched by `request`""" 209 if ( 210 self.flow.authentication == FlowAuthenticationRequirement.REQUIRE_AUTHENTICATED 211 and not request.user.is_authenticated 212 ): 213 raise FlowNonApplicableException() 214 if ( 215 self.flow.authentication == FlowAuthenticationRequirement.REQUIRE_UNAUTHENTICATED 216 and request.user.is_authenticated 217 ): 218 raise FlowNonApplicableException() 219 if ( 220 self.flow.authentication == FlowAuthenticationRequirement.REQUIRE_SUPERUSER 221 and not request.user.is_superuser 222 ): 223 raise FlowNonApplicableException() 224 if ( 225 self.flow.authentication == FlowAuthenticationRequirement.REQUIRE_REDIRECT 226 and context.get(PLAN_CONTEXT_IS_REDIRECTED) is None 227 ): 228 raise FlowNonApplicableException() 229 outpost_user = ClientIPMiddleware.get_outpost_user(request) 230 if self.flow.authentication == FlowAuthenticationRequirement.REQUIRE_OUTPOST: 231 if not outpost_user: 232 raise FlowNonApplicableException() 233 if outpost_user: 234 outpost = Outpost.objects.filter( 235 # TODO: Since Outpost and user are not directly connected, we have to look up a user 236 # like this. This should ideally by in authentik/outposts/models.py 237 pk=outpost_user.username.replace("ak-outpost-", "") 238 ).first() 239 if outpost: 240 return { 241 PLAN_CONTEXT_OUTPOST: { 242 "instance": outpost, 243 } 244 } 245 return {} 246 247 def plan(self, request: HttpRequest, default_context: dict[str, Any] | None = None) -> FlowPlan: 248 """Check each of the flows' policies, check policies for each stage with PolicyBinding 249 and return ordered list""" 250 with start_span(op="authentik.flow.planner.plan", name=self.flow.slug) as span: 251 span: Span 252 span.set_data("flow", self.flow) 253 span.set_data("request", request) 254 255 self._logger.debug( 256 "f(plan): starting planning process", 257 ) 258 context = default_context or {} 259 # Bit of a workaround here, if there is a pending user set in the default context 260 # we use that user for our cache key to make sure they don't get the generic response 261 if context and PLAN_CONTEXT_PENDING_USER in context: 262 user = context[PLAN_CONTEXT_PENDING_USER] 263 else: 264 user = request.user 265 266 context.update(self._check_authentication(request, context)) 267 # First off, check the flow's direct policy bindings 268 # to make sure the user even has access to the flow 269 engine = PolicyEngine(self.flow, user, request) 270 engine.use_cache = self.use_cache 271 span.set_data("context", cleanse_dict(context)) 272 engine.request.context.update(context) 273 engine.build() 274 result = engine.result 275 if not result.passing: 276 exc = FlowNonApplicableException() 277 exc.policy_result = result 278 raise exc 279 # User is passing so far, check if we have a cached plan 280 cached_plan_key = cache_key(self.flow, user) 281 cached_plan = cache.get(cached_plan_key, None) 282 if self.flow.designation not in [FlowDesignation.STAGE_CONFIGURATION]: 283 if cached_plan and self.use_cache: 284 self._logger.debug( 285 "f(plan): taking plan from cache", 286 key=cached_plan_key, 287 ) 288 # Reset the context as this isn't factored into caching 289 cached_plan.context = context 290 return cached_plan 291 self._logger.debug( 292 "f(plan): building plan", 293 ) 294 plan = self._build_plan(user, request, context) 295 if self.use_cache: 296 cache.set(cache_key(self.flow, user), plan, CACHE_TIMEOUT) 297 if not plan.bindings and not self.allow_empty_flows: 298 raise EmptyFlowException() 299 return plan 300 301 def _build_plan( 302 self, 303 user: User, 304 request: HttpRequest, 305 default_context: dict[str, Any] | None, 306 ) -> FlowPlan: 307 """Build flow plan by checking each stage in their respective 308 order and checking the applied policies""" 309 with ( 310 start_span( 311 op="authentik.flow.planner.build_plan", 312 name=self.flow.slug, 313 ) as span, 314 HIST_FLOWS_PLAN_TIME.labels(flow_slug=self.flow.slug).time(), 315 ): 316 span: Span 317 span.set_data("flow", self.flow) 318 span.set_data("user", user) 319 span.set_data("request", request) 320 321 plan = FlowPlan(flow_pk=self.flow.pk.hex) 322 if default_context: 323 plan.context = default_context 324 # Check Flow policies 325 bindings = list( 326 FlowStageBinding.objects.filter(target__pk=self.flow.pk).order_by("order") 327 ) 328 stages = Stage.objects.filter(flowstagebinding__in=[binding.pk for binding in bindings]) 329 for binding in bindings: 330 binding: FlowStageBinding 331 stage = [stage for stage in stages if stage.pk == binding.stage_id][0] 332 marker = StageMarker() 333 if binding.evaluate_on_plan: 334 self._logger.debug( 335 "f(plan): evaluating on plan", 336 stage=stage, 337 ) 338 engine = PolicyEngine(binding, user, request) 339 engine.use_cache = self.use_cache 340 engine.request.context["flow_plan"] = plan 341 engine.request.context.update(plan.context) 342 engine.build() 343 if engine.passing: 344 self._logger.debug( 345 "f(plan): stage passing", 346 stage=stage, 347 ) 348 else: 349 stage = None 350 else: 351 self._logger.debug( 352 "f(plan): not evaluating on plan", 353 stage=stage, 354 ) 355 if binding.re_evaluate_policies and stage: 356 self._logger.debug( 357 "f(plan): stage has re-evaluate marker", 358 stage=stage, 359 ) 360 marker = ReevaluateMarker(binding=binding) 361 if stage: 362 plan.append(binding, marker) 363 self._logger.debug( 364 "f(plan): finished building", 365 ) 366 return plan
LOGGER =
<BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
PLAN_CONTEXT_PENDING_USER =
'pending_user'
PLAN_CONTEXT_SSO =
'is_sso'
PLAN_CONTEXT_REDIRECT =
'redirect'
PLAN_CONTEXT_APPLICATION =
'application'
PLAN_CONTEXT_DEVICE =
'device'
PLAN_CONTEXT_SOURCE =
'source'
PLAN_CONTEXT_OUTPOST =
'outpost'
PLAN_CONTEXT_POST =
'goauthentik.io/http/post'
PLAN_CONTEXT_IS_RESTORED =
'is_restored'
PLAN_CONTEXT_IS_REDIRECTED =
'is_redirected'
PLAN_CONTEXT_REDIRECT_STAGE_TARGET =
'redirect_stage_target'
CACHE_TIMEOUT =
300
CACHE_PREFIX =
'goauthentik.io/flows/planner/'
def
cache_key( flow: authentik.flows.models.Flow, user: authentik.core.models.User | None = None) -> str:
54def cache_key(flow: Flow, user: User | None = None) -> str: 55 """Generate Cache key for flow""" 56 prefix = CACHE_PREFIX + str(flow.pk) 57 if user: 58 prefix += f"#{user.pk}" 59 return prefix
Generate Cache key for flow
@dataclass(slots=True)
class
FlowPlan:
62@dataclass(slots=True) 63class FlowPlan: 64 """This data-class is the output of a FlowPlanner. It holds a flat list 65 of all Stages that should be run.""" 66 67 flow_pk: str 68 69 bindings: list[FlowStageBinding] = field(default_factory=list) 70 context: dict[str, Any] = field(default_factory=dict) 71 markers: list[StageMarker] = field(default_factory=list) 72 73 def append_stage(self, stage: Stage, marker: StageMarker | None = None): 74 """Append `stage` to the end of the plan, optionally with stage marker""" 75 return self.append(FlowStageBinding(stage=stage), marker) 76 77 def append(self, binding: FlowStageBinding, marker: StageMarker | None = None): 78 """Append `stage` to the end of the plan, optionally with stage marker""" 79 self.bindings.append(binding) 80 self.markers.append(marker or StageMarker()) 81 82 def insert_stage(self, stage: Stage, marker: StageMarker | None = None, index=1): 83 """Insert stage into plan, as immediate next stage""" 84 self.bindings.insert(index, FlowStageBinding(stage=stage, order=0)) 85 self.markers.insert(index, marker or StageMarker()) 86 87 def redirect(self, destination: str): 88 """Insert a redirect stage as next stage""" 89 from authentik.flows.stage import RedirectStage 90 91 self.insert_stage(in_memory_stage(RedirectStage, destination=destination)) 92 93 def next(self, http_request: HttpRequest | None) -> FlowStageBinding | None: 94 """Return next pending stage from the bottom of the list""" 95 if not self.has_stages: 96 return None 97 binding = self.bindings[0] 98 marker = self.markers[0] 99 100 if marker.__class__ is not StageMarker: 101 LOGGER.debug("f(plan_inst): stage has marker", binding=binding, marker=marker) 102 marked_stage = marker.process(self, binding, http_request) 103 if not marked_stage: 104 LOGGER.debug("f(plan_inst): marker returned none, next stage", binding=binding) 105 self.bindings.remove(binding) 106 self.markers.remove(marker) 107 if not self.has_stages: 108 return None 109 110 return self.next(http_request) 111 return marked_stage 112 113 def pop(self): 114 """Pop next pending stage from bottom of list""" 115 if not self.markers and not self.bindings: 116 return 117 self.markers.pop(0) 118 self.bindings.pop(0) 119 120 @property 121 def has_stages(self) -> bool: 122 """Check if there are any stages left in this plan""" 123 return len(self.markers) + len(self.bindings) > 0 124 125 def requires_flow_executor( 126 self, 127 allowed_silent_types: list[StageView] | None = None, 128 ): 129 # Check if we actually need to show the Flow executor, or if we can jump straight to the end 130 found_unskippable = True 131 if allowed_silent_types: 132 LOGGER.debug("Checking if we can skip the flow executor...") 133 # Policies applied to the flow have already been evaluated, so we're checking for stages 134 # allow-listed or bindings that require a policy re-eval 135 found_unskippable = False 136 for binding, marker in zip(self.bindings, self.markers, strict=True): 137 if binding.stage.view not in allowed_silent_types: 138 found_unskippable = True 139 if marker and isinstance(marker, ReevaluateMarker): 140 found_unskippable = True 141 LOGGER.debug("Required flow executor status", status=found_unskippable) 142 return found_unskippable 143 144 def to_redirect( 145 self, 146 request: HttpRequest, 147 flow: Flow, 148 next: str | None = None, 149 allowed_silent_types: list[StageView] | None = None, 150 ) -> HttpResponse: 151 """Redirect to the flow executor for this flow plan""" 152 from authentik.flows.views.executor import ( 153 NEXT_ARG_NAME, 154 SESSION_KEY_PLAN, 155 FlowExecutorView, 156 ) 157 158 request.session[SESSION_KEY_PLAN] = self 159 requires_flow_executor = self.requires_flow_executor(allowed_silent_types) 160 161 if not requires_flow_executor: 162 # No unskippable stages found, so we can directly return the response of the last stage 163 final_stage: type[StageView] = self.bindings[-1].stage.view 164 temp_exec = FlowExecutorView(flow=flow, request=request, plan=self) 165 temp_exec.current_stage = self.bindings[-1].stage 166 temp_exec.current_stage_view = final_stage 167 temp_exec.setup(request, flow.slug) 168 stage = final_stage(request=request, executor=temp_exec) 169 response = stage.dispatch(request) 170 # Ensure we clean the flow state we have in the session before we redirect away 171 temp_exec.stage_ok() 172 return response 173 174 get_qs = request.GET.copy() 175 if request.user.is_authenticated and ( 176 # Object-scoped permission or global permission 177 request.user.has_perm("authentik_flows.inspect_flow", flow) 178 or request.user.has_perm("authentik_flows.inspect_flow") 179 ): 180 get_qs["inspector"] = "available" 181 if next: 182 get_qs[NEXT_ARG_NAME] = next 183 184 return redirect_with_qs( 185 "authentik_core:if-flow", 186 get_qs, 187 flow_slug=flow.slug, 188 )
This data-class is the output of a FlowPlanner. It holds a flat list of all Stages that should be run.
FlowPlan( flow_pk: str, bindings: list[authentik.flows.models.FlowStageBinding] = <factory>, context: dict[str, typing.Any] = <factory>, markers: list[authentik.flows.markers.StageMarker] = <factory>)
bindings: list[authentik.flows.models.FlowStageBinding]
markers: list[authentik.flows.markers.StageMarker]
def
append_stage( self, stage: authentik.flows.models.Stage, marker: authentik.flows.markers.StageMarker | None = None):
73 def append_stage(self, stage: Stage, marker: StageMarker | None = None): 74 """Append `stage` to the end of the plan, optionally with stage marker""" 75 return self.append(FlowStageBinding(stage=stage), marker)
Append stage to the end of the plan, optionally with stage marker
def
append( self, binding: authentik.flows.models.FlowStageBinding, marker: authentik.flows.markers.StageMarker | None = None):
77 def append(self, binding: FlowStageBinding, marker: StageMarker | None = None): 78 """Append `stage` to the end of the plan, optionally with stage marker""" 79 self.bindings.append(binding) 80 self.markers.append(marker or StageMarker())
Append stage to the end of the plan, optionally with stage marker
def
insert_stage( self, stage: authentik.flows.models.Stage, marker: authentik.flows.markers.StageMarker | None = None, index=1):
82 def insert_stage(self, stage: Stage, marker: StageMarker | None = None, index=1): 83 """Insert stage into plan, as immediate next stage""" 84 self.bindings.insert(index, FlowStageBinding(stage=stage, order=0)) 85 self.markers.insert(index, marker or StageMarker())
Insert stage into plan, as immediate next stage
def
redirect(self, destination: str):
87 def redirect(self, destination: str): 88 """Insert a redirect stage as next stage""" 89 from authentik.flows.stage import RedirectStage 90 91 self.insert_stage(in_memory_stage(RedirectStage, destination=destination))
Insert a redirect stage as next stage
def
next( self, http_request: django.http.request.HttpRequest | None) -> authentik.flows.models.FlowStageBinding | None:
93 def next(self, http_request: HttpRequest | None) -> FlowStageBinding | None: 94 """Return next pending stage from the bottom of the list""" 95 if not self.has_stages: 96 return None 97 binding = self.bindings[0] 98 marker = self.markers[0] 99 100 if marker.__class__ is not StageMarker: 101 LOGGER.debug("f(plan_inst): stage has marker", binding=binding, marker=marker) 102 marked_stage = marker.process(self, binding, http_request) 103 if not marked_stage: 104 LOGGER.debug("f(plan_inst): marker returned none, next stage", binding=binding) 105 self.bindings.remove(binding) 106 self.markers.remove(marker) 107 if not self.has_stages: 108 return None 109 110 return self.next(http_request) 111 return marked_stage
Return next pending stage from the bottom of the list
def
pop(self):
113 def pop(self): 114 """Pop next pending stage from bottom of list""" 115 if not self.markers and not self.bindings: 116 return 117 self.markers.pop(0) 118 self.bindings.pop(0)
Pop next pending stage from bottom of list
has_stages: bool
120 @property 121 def has_stages(self) -> bool: 122 """Check if there are any stages left in this plan""" 123 return len(self.markers) + len(self.bindings) > 0
Check if there are any stages left in this plan
def
requires_flow_executor(unknown):
125 def requires_flow_executor( 126 self, 127 allowed_silent_types: list[StageView] | None = None, 128 ): 129 # Check if we actually need to show the Flow executor, or if we can jump straight to the end 130 found_unskippable = True 131 if allowed_silent_types: 132 LOGGER.debug("Checking if we can skip the flow executor...") 133 # Policies applied to the flow have already been evaluated, so we're checking for stages 134 # allow-listed or bindings that require a policy re-eval 135 found_unskippable = False 136 for binding, marker in zip(self.bindings, self.markers, strict=True): 137 if binding.stage.view not in allowed_silent_types: 138 found_unskippable = True 139 if marker and isinstance(marker, ReevaluateMarker): 140 found_unskippable = True 141 LOGGER.debug("Required flow executor status", status=found_unskippable) 142 return found_unskippable
def
to_redirect(unknown):
144 def to_redirect( 145 self, 146 request: HttpRequest, 147 flow: Flow, 148 next: str | None = None, 149 allowed_silent_types: list[StageView] | None = None, 150 ) -> HttpResponse: 151 """Redirect to the flow executor for this flow plan""" 152 from authentik.flows.views.executor import ( 153 NEXT_ARG_NAME, 154 SESSION_KEY_PLAN, 155 FlowExecutorView, 156 ) 157 158 request.session[SESSION_KEY_PLAN] = self 159 requires_flow_executor = self.requires_flow_executor(allowed_silent_types) 160 161 if not requires_flow_executor: 162 # No unskippable stages found, so we can directly return the response of the last stage 163 final_stage: type[StageView] = self.bindings[-1].stage.view 164 temp_exec = FlowExecutorView(flow=flow, request=request, plan=self) 165 temp_exec.current_stage = self.bindings[-1].stage 166 temp_exec.current_stage_view = final_stage 167 temp_exec.setup(request, flow.slug) 168 stage = final_stage(request=request, executor=temp_exec) 169 response = stage.dispatch(request) 170 # Ensure we clean the flow state we have in the session before we redirect away 171 temp_exec.stage_ok() 172 return response 173 174 get_qs = request.GET.copy() 175 if request.user.is_authenticated and ( 176 # Object-scoped permission or global permission 177 request.user.has_perm("authentik_flows.inspect_flow", flow) 178 or request.user.has_perm("authentik_flows.inspect_flow") 179 ): 180 get_qs["inspector"] = "available" 181 if next: 182 get_qs[NEXT_ARG_NAME] = next 183 184 return redirect_with_qs( 185 "authentik_core:if-flow", 186 get_qs, 187 flow_slug=flow.slug, 188 )
Redirect to the flow executor for this flow plan
class
FlowPlanner:
191class FlowPlanner: 192 """Execute all policies to plan out a flat list of all Stages 193 that should be applied.""" 194 195 use_cache: bool 196 allow_empty_flows: bool 197 198 flow: Flow 199 200 _logger: BoundLogger 201 202 def __init__(self, flow: Flow): 203 self.use_cache = True 204 self.allow_empty_flows = False 205 self.flow = flow 206 self._logger = get_logger().bind(flow_slug=flow.slug) 207 208 def _check_authentication(self, request: HttpRequest, context: dict[str, Any]): 209 """Check the flow's authentication level is matched by `request`""" 210 if ( 211 self.flow.authentication == FlowAuthenticationRequirement.REQUIRE_AUTHENTICATED 212 and not request.user.is_authenticated 213 ): 214 raise FlowNonApplicableException() 215 if ( 216 self.flow.authentication == FlowAuthenticationRequirement.REQUIRE_UNAUTHENTICATED 217 and request.user.is_authenticated 218 ): 219 raise FlowNonApplicableException() 220 if ( 221 self.flow.authentication == FlowAuthenticationRequirement.REQUIRE_SUPERUSER 222 and not request.user.is_superuser 223 ): 224 raise FlowNonApplicableException() 225 if ( 226 self.flow.authentication == FlowAuthenticationRequirement.REQUIRE_REDIRECT 227 and context.get(PLAN_CONTEXT_IS_REDIRECTED) is None 228 ): 229 raise FlowNonApplicableException() 230 outpost_user = ClientIPMiddleware.get_outpost_user(request) 231 if self.flow.authentication == FlowAuthenticationRequirement.REQUIRE_OUTPOST: 232 if not outpost_user: 233 raise FlowNonApplicableException() 234 if outpost_user: 235 outpost = Outpost.objects.filter( 236 # TODO: Since Outpost and user are not directly connected, we have to look up a user 237 # like this. This should ideally by in authentik/outposts/models.py 238 pk=outpost_user.username.replace("ak-outpost-", "") 239 ).first() 240 if outpost: 241 return { 242 PLAN_CONTEXT_OUTPOST: { 243 "instance": outpost, 244 } 245 } 246 return {} 247 248 def plan(self, request: HttpRequest, default_context: dict[str, Any] | None = None) -> FlowPlan: 249 """Check each of the flows' policies, check policies for each stage with PolicyBinding 250 and return ordered list""" 251 with start_span(op="authentik.flow.planner.plan", name=self.flow.slug) as span: 252 span: Span 253 span.set_data("flow", self.flow) 254 span.set_data("request", request) 255 256 self._logger.debug( 257 "f(plan): starting planning process", 258 ) 259 context = default_context or {} 260 # Bit of a workaround here, if there is a pending user set in the default context 261 # we use that user for our cache key to make sure they don't get the generic response 262 if context and PLAN_CONTEXT_PENDING_USER in context: 263 user = context[PLAN_CONTEXT_PENDING_USER] 264 else: 265 user = request.user 266 267 context.update(self._check_authentication(request, context)) 268 # First off, check the flow's direct policy bindings 269 # to make sure the user even has access to the flow 270 engine = PolicyEngine(self.flow, user, request) 271 engine.use_cache = self.use_cache 272 span.set_data("context", cleanse_dict(context)) 273 engine.request.context.update(context) 274 engine.build() 275 result = engine.result 276 if not result.passing: 277 exc = FlowNonApplicableException() 278 exc.policy_result = result 279 raise exc 280 # User is passing so far, check if we have a cached plan 281 cached_plan_key = cache_key(self.flow, user) 282 cached_plan = cache.get(cached_plan_key, None) 283 if self.flow.designation not in [FlowDesignation.STAGE_CONFIGURATION]: 284 if cached_plan and self.use_cache: 285 self._logger.debug( 286 "f(plan): taking plan from cache", 287 key=cached_plan_key, 288 ) 289 # Reset the context as this isn't factored into caching 290 cached_plan.context = context 291 return cached_plan 292 self._logger.debug( 293 "f(plan): building plan", 294 ) 295 plan = self._build_plan(user, request, context) 296 if self.use_cache: 297 cache.set(cache_key(self.flow, user), plan, CACHE_TIMEOUT) 298 if not plan.bindings and not self.allow_empty_flows: 299 raise EmptyFlowException() 300 return plan 301 302 def _build_plan( 303 self, 304 user: User, 305 request: HttpRequest, 306 default_context: dict[str, Any] | None, 307 ) -> FlowPlan: 308 """Build flow plan by checking each stage in their respective 309 order and checking the applied policies""" 310 with ( 311 start_span( 312 op="authentik.flow.planner.build_plan", 313 name=self.flow.slug, 314 ) as span, 315 HIST_FLOWS_PLAN_TIME.labels(flow_slug=self.flow.slug).time(), 316 ): 317 span: Span 318 span.set_data("flow", self.flow) 319 span.set_data("user", user) 320 span.set_data("request", request) 321 322 plan = FlowPlan(flow_pk=self.flow.pk.hex) 323 if default_context: 324 plan.context = default_context 325 # Check Flow policies 326 bindings = list( 327 FlowStageBinding.objects.filter(target__pk=self.flow.pk).order_by("order") 328 ) 329 stages = Stage.objects.filter(flowstagebinding__in=[binding.pk for binding in bindings]) 330 for binding in bindings: 331 binding: FlowStageBinding 332 stage = [stage for stage in stages if stage.pk == binding.stage_id][0] 333 marker = StageMarker() 334 if binding.evaluate_on_plan: 335 self._logger.debug( 336 "f(plan): evaluating on plan", 337 stage=stage, 338 ) 339 engine = PolicyEngine(binding, user, request) 340 engine.use_cache = self.use_cache 341 engine.request.context["flow_plan"] = plan 342 engine.request.context.update(plan.context) 343 engine.build() 344 if engine.passing: 345 self._logger.debug( 346 "f(plan): stage passing", 347 stage=stage, 348 ) 349 else: 350 stage = None 351 else: 352 self._logger.debug( 353 "f(plan): not evaluating on plan", 354 stage=stage, 355 ) 356 if binding.re_evaluate_policies and stage: 357 self._logger.debug( 358 "f(plan): stage has re-evaluate marker", 359 stage=stage, 360 ) 361 marker = ReevaluateMarker(binding=binding) 362 if stage: 363 plan.append(binding, marker) 364 self._logger.debug( 365 "f(plan): finished building", 366 ) 367 return plan
Execute all policies to plan out a flat list of all Stages that should be applied.
FlowPlanner(flow: authentik.flows.models.Flow)
def
plan( self, request: django.http.request.HttpRequest, default_context: dict[str, Any] | None = None) -> FlowPlan:
248 def plan(self, request: HttpRequest, default_context: dict[str, Any] | None = None) -> FlowPlan: 249 """Check each of the flows' policies, check policies for each stage with PolicyBinding 250 and return ordered list""" 251 with start_span(op="authentik.flow.planner.plan", name=self.flow.slug) as span: 252 span: Span 253 span.set_data("flow", self.flow) 254 span.set_data("request", request) 255 256 self._logger.debug( 257 "f(plan): starting planning process", 258 ) 259 context = default_context or {} 260 # Bit of a workaround here, if there is a pending user set in the default context 261 # we use that user for our cache key to make sure they don't get the generic response 262 if context and PLAN_CONTEXT_PENDING_USER in context: 263 user = context[PLAN_CONTEXT_PENDING_USER] 264 else: 265 user = request.user 266 267 context.update(self._check_authentication(request, context)) 268 # First off, check the flow's direct policy bindings 269 # to make sure the user even has access to the flow 270 engine = PolicyEngine(self.flow, user, request) 271 engine.use_cache = self.use_cache 272 span.set_data("context", cleanse_dict(context)) 273 engine.request.context.update(context) 274 engine.build() 275 result = engine.result 276 if not result.passing: 277 exc = FlowNonApplicableException() 278 exc.policy_result = result 279 raise exc 280 # User is passing so far, check if we have a cached plan 281 cached_plan_key = cache_key(self.flow, user) 282 cached_plan = cache.get(cached_plan_key, None) 283 if self.flow.designation not in [FlowDesignation.STAGE_CONFIGURATION]: 284 if cached_plan and self.use_cache: 285 self._logger.debug( 286 "f(plan): taking plan from cache", 287 key=cached_plan_key, 288 ) 289 # Reset the context as this isn't factored into caching 290 cached_plan.context = context 291 return cached_plan 292 self._logger.debug( 293 "f(plan): building plan", 294 ) 295 plan = self._build_plan(user, request, context) 296 if self.use_cache: 297 cache.set(cache_key(self.flow, user), plan, CACHE_TIMEOUT) 298 if not plan.bindings and not self.allow_empty_flows: 299 raise EmptyFlowException() 300 return plan
Check each of the flows' policies, check policies for each stage with PolicyBinding and return ordered list