authentik.flows.views.executor
authentik multi-stage authentication engine
1"""authentik multi-stage authentication engine""" 2 3from copy import deepcopy 4 5from django.conf import settings 6from django.contrib.auth.mixins import LoginRequiredMixin 7from django.core.cache import cache 8from django.http import Http404, HttpRequest, HttpResponse, HttpResponseRedirect 9from django.http.request import QueryDict 10from django.shortcuts import get_object_or_404, redirect 11from django.template.response import TemplateResponse 12from django.urls import reverse 13from django.utils.decorators import method_decorator 14from django.utils.translation import gettext as _ 15from django.views.decorators.clickjacking import xframe_options_sameorigin 16from django.views.generic import View 17from drf_spectacular.types import OpenApiTypes 18from drf_spectacular.utils import OpenApiParameter, PolymorphicProxySerializer, extend_schema 19from rest_framework.permissions import AllowAny 20from rest_framework.views import APIView 21from sentry_sdk import capture_exception, start_span 22from sentry_sdk.api import set_tag 23from structlog.stdlib import BoundLogger, get_logger 24 25from authentik.brands.models import Brand 26from authentik.events.models import Event, EventAction, cleanse_dict 27from authentik.flows.apps import HIST_FLOW_EXECUTION_STAGE_TIME 28from authentik.flows.challenge import ( 29 Challenge, 30 ChallengeResponse, 31 FlowErrorChallenge, 32 HttpChallengeResponse, 33 RedirectChallenge, 34 ShellChallenge, 35 WithUserInfoChallenge, 36) 37from authentik.flows.exceptions import EmptyFlowException, FlowNonApplicableException 38from authentik.flows.models import ( 39 ConfigurableStage, 40 Flow, 41 FlowDeniedAction, 42 FlowDesignation, 43 FlowStageBinding, 44 FlowToken, 45 InvalidResponseAction, 46 Stage, 47) 48from authentik.flows.planner import ( 49 CACHE_PREFIX, 50 PLAN_CONTEXT_IS_RESTORED, 51 PLAN_CONTEXT_PENDING_USER, 52 PLAN_CONTEXT_REDIRECT, 53 FlowPlan, 54 FlowPlanner, 55) 56from authentik.flows.stage import AccessDeniedStage, StageView 57from authentik.lib.sentry import SentryIgnoredException, should_ignore_exception 58from authentik.lib.utils.reflection import all_subclasses, class_to_path 59from authentik.lib.utils.urls import is_url_absolute, redirect_with_qs 60from authentik.policies.engine import PolicyEngine 61 62LOGGER = get_logger() 63# Argument used to redirect user after login 64NEXT_ARG_NAME = "next" 65 66SESSION_KEY_PLAN = "authentik/flows/plan" 67SESSION_KEY_GET = "authentik/flows/get" 68SESSION_KEY_POST = "authentik/flows/post" 69SESSION_KEY_HISTORY = "authentik/flows/history" 70QS_KEY_TOKEN = "flow_token" # nosec 71QS_QUERY = "query" 72 73 74def challenge_types(): 75 """This function returns a mapping which contains all subclasses of challenges 76 subclasses of Challenge, and Challenge itself.""" 77 mapping = {} 78 for cls in all_subclasses(Challenge): 79 if cls == WithUserInfoChallenge: 80 continue 81 mapping[cls().fields["component"].default] = cls 82 return mapping 83 84 85def challenge_response_types(): 86 """This function returns a mapping which contains all subclasses of challenges 87 subclasses of Challenge, and Challenge itself.""" 88 mapping = {} 89 for cls in all_subclasses(ChallengeResponse): 90 mapping[cls(stage=None).fields["component"].default] = cls 91 return mapping 92 93 94class InvalidStageError(SentryIgnoredException): 95 """Error raised when a challenge from a stage is not valid""" 96 97 98@method_decorator(xframe_options_sameorigin, name="dispatch") 99class FlowExecutorView(APIView): 100 """Flow executor, passing requests to Stage Views""" 101 102 permission_classes = [AllowAny] 103 104 flow: Flow = None 105 106 plan: FlowPlan | None = None 107 current_binding: FlowStageBinding | None = None 108 current_stage: Stage 109 current_stage_view: View 110 111 _logger: BoundLogger 112 113 def setup(self, request: HttpRequest, flow_slug: str): 114 super().setup(request, flow_slug=flow_slug) 115 if not self.flow: 116 self.flow = get_object_or_404(Flow.objects.select_related(), slug=flow_slug) 117 self._logger = get_logger().bind(flow_slug=flow_slug) 118 set_tag("authentik.flow", self.flow.slug) 119 120 def handle_invalid_flow(self, exc: FlowNonApplicableException) -> HttpResponse: 121 """When a flow is non-applicable check if user is on the correct domain""" 122 if self.flow.denied_action in [ 123 FlowDeniedAction.CONTINUE, 124 FlowDeniedAction.MESSAGE_CONTINUE, 125 ]: 126 next_url = self.request.GET.get(NEXT_ARG_NAME) 127 if next_url and not is_url_absolute(next_url): 128 self._logger.debug("f(exec): Redirecting to next on fail") 129 return to_stage_response(self.request, redirect(next_url)) 130 if self.flow.denied_action == FlowDeniedAction.CONTINUE: 131 return to_stage_response( 132 self.request, redirect(reverse("authentik_core:root-redirect")) 133 ) 134 return to_stage_response(self.request, self.stage_invalid(error_message=exc.messages)) 135 136 def _check_flow_token(self, key: str) -> FlowPlan | None: 137 """Check if the user is using a flow token to restore a plan""" 138 token: FlowToken | None = FlowToken.objects.filter(key=key).first() 139 if not token: 140 return None 141 plan = None 142 try: 143 plan = token.plan 144 except (AttributeError, EOFError, ImportError, IndexError) as exc: 145 LOGGER.warning("f(exec): Failed to restore token plan", exc=exc) 146 finally: 147 if token.revoke_on_execution: 148 token.delete() 149 if not isinstance(plan, FlowPlan): 150 return None 151 if existing_plan := self.request.session.get(SESSION_KEY_PLAN): 152 plan.context.update(existing_plan.context) 153 plan.context[PLAN_CONTEXT_IS_RESTORED] = token 154 self._logger.debug("f(exec): restored flow plan from token", plan=plan) 155 return plan 156 157 def dispatch(self, request: HttpRequest, flow_slug: str) -> HttpResponse: 158 with start_span(op="authentik.flow.executor.dispatch", name=self.flow.slug) as span: 159 span.set_data("authentik Flow", self.flow.slug) 160 get_params = QueryDict(request.GET.get(QS_QUERY, "")) 161 if QS_KEY_TOKEN in get_params: 162 plan = self._check_flow_token(get_params[QS_KEY_TOKEN]) 163 if plan: 164 self.request.session[SESSION_KEY_PLAN] = plan 165 # Early check if there's an active Plan for the current session 166 if SESSION_KEY_PLAN in self.request.session: 167 self.plan: FlowPlan = self.request.session[SESSION_KEY_PLAN] 168 if self.plan.flow_pk != self.flow.pk.hex: 169 self._logger.warning( 170 "f(exec): Found existing plan for other flow, deleting plan", 171 other_flow=self.plan.flow_pk, 172 ) 173 # Existing plan is deleted from session and instance 174 self.plan = None 175 self.cancel() 176 else: 177 self._logger.debug("f(exec): Continuing existing plan") 178 179 # Initial flow request, check if we have an upstream query string passed in 180 request.session[SESSION_KEY_GET] = get_params 181 # Don't check session again as we've either already loaded the plan or we need to plan 182 if not self.plan: 183 request.session[SESSION_KEY_HISTORY] = [] 184 self._logger.debug("f(exec): No active Plan found, initiating planner") 185 try: 186 self.plan = self._initiate_plan() 187 except FlowNonApplicableException as exc: 188 # If we're this flow is for authentication and the user is already authenticated 189 # continue to the next URL 190 if ( 191 self.flow.designation == FlowDesignation.AUTHENTICATION 192 and self.request.user.is_authenticated 193 ): 194 return self._flow_done() 195 self._logger.warning("f(exec): Flow not applicable to current user", exc=exc) 196 return self.handle_invalid_flow(exc) 197 except EmptyFlowException as exc: 198 self._logger.warning("f(exec): Flow is empty", exc=exc) 199 # To match behaviour with loading an empty flow plan from cache, 200 # we don't show an error message here, but rather call _flow_done() 201 return self._flow_done() 202 # We don't save the Plan after getting the next stage 203 # as it hasn't been successfully passed yet 204 try: 205 # This is the first time we actually access any attribute on the selected plan 206 # if the cached plan is from an older version, it might have different attributes 207 # in which case we just delete the plan and invalidate everything 208 next_binding = self.plan.next(self.request) 209 except Exception as exc: # noqa 210 self._logger.warning( 211 "f(exec): found incompatible flow plan, invalidating run", exc=exc 212 ) 213 keys = cache.keys(f"{CACHE_PREFIX}*") 214 cache.delete_many(keys) 215 return self.stage_invalid() 216 if not next_binding: 217 self._logger.debug("f(exec): no more stages, flow is done.") 218 return self._flow_done() 219 self.current_binding = next_binding 220 self.current_stage = next_binding.stage 221 self._logger.debug( 222 "f(exec): Current stage", 223 current_stage=self.current_stage, 224 flow_slug=self.flow.slug, 225 ) 226 try: 227 stage_cls = self.current_stage.view 228 except NotImplementedError as exc: 229 self._logger.debug("Error getting stage type", exc=exc) 230 return self.stage_invalid() 231 self.current_stage_view = stage_cls(self) 232 self.current_stage_view.args = self.args 233 self.current_stage_view.kwargs = self.kwargs 234 self.current_stage_view.request = request 235 try: 236 return super().dispatch(request) 237 except InvalidStageError as exc: 238 return self.stage_invalid(str(exc)) 239 240 def handle_exception(self, exc: Exception) -> HttpResponse: 241 """Handle exception in stage execution""" 242 if settings.DEBUG or settings.TEST: 243 raise exc 244 self._logger.warning(exc) 245 if not should_ignore_exception(exc): 246 capture_exception(exc) 247 Event.new( 248 action=EventAction.SYSTEM_EXCEPTION, 249 message="System exception during flow execution.", 250 ).with_exception(exc).from_http(self.request) 251 challenge = FlowErrorChallenge(self.request, exc) 252 challenge.is_valid(raise_exception=True) 253 return to_stage_response(self.request, HttpChallengeResponse(challenge)) 254 255 @extend_schema( 256 responses={ 257 200: PolymorphicProxySerializer( 258 component_name="ChallengeTypes", 259 serializers=challenge_types, 260 resource_type_field_name="component", 261 ), 262 }, 263 request=OpenApiTypes.NONE, 264 parameters=[ 265 OpenApiParameter( 266 name="query", 267 location=OpenApiParameter.QUERY, 268 required=True, 269 description="Querystring as received", 270 type=OpenApiTypes.STR, 271 ) 272 ], 273 operation_id="flows_executor_get", 274 ) 275 def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 276 """Get the next pending challenge from the currently active flow.""" 277 class_path = class_to_path(self.current_stage_view.__class__) 278 self._logger.debug( 279 "f(exec): Passing GET", 280 view_class=class_path, 281 stage=self.current_stage, 282 ) 283 try: 284 with ( 285 start_span( 286 op="authentik.flow.executor.stage", 287 name=class_path, 288 ) as span, 289 HIST_FLOW_EXECUTION_STAGE_TIME.labels( 290 method=request.method.upper(), 291 stage_type=class_path, 292 ).time(), 293 ): 294 span.set_data("Method", request.method.upper()) 295 span.set_data("authentik Stage", self.current_stage_view) 296 span.set_data("authentik Flow", self.flow.slug) 297 stage_response = self.current_stage_view.dispatch(request) 298 return to_stage_response(request, stage_response) 299 except Exception as exc: # noqa 300 return self.handle_exception(exc) 301 302 @extend_schema( 303 responses={ 304 200: PolymorphicProxySerializer( 305 component_name="ChallengeTypes", 306 serializers=challenge_types, 307 resource_type_field_name="component", 308 ), 309 }, 310 request=PolymorphicProxySerializer( 311 component_name="FlowChallengeResponse", 312 serializers=challenge_response_types, 313 resource_type_field_name="component", 314 ), 315 parameters=[ 316 OpenApiParameter( 317 name="query", 318 location=OpenApiParameter.QUERY, 319 required=True, 320 description="Querystring as received", 321 type=OpenApiTypes.STR, 322 ) 323 ], 324 operation_id="flows_executor_solve", 325 ) 326 def post(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 327 """Solve the previously retrieved challenge and advanced to the next stage.""" 328 class_path = class_to_path(self.current_stage_view.__class__) 329 self._logger.debug( 330 "f(exec): Passing POST", 331 view_class=class_path, 332 stage=self.current_stage, 333 ) 334 try: 335 with ( 336 start_span( 337 op="authentik.flow.executor.stage", 338 name=class_path, 339 ) as span, 340 HIST_FLOW_EXECUTION_STAGE_TIME.labels( 341 method=request.method.upper(), 342 stage_type=class_path, 343 ).time(), 344 ): 345 span.set_data("Method", request.method.upper()) 346 span.set_data("authentik Stage", self.current_stage_view) 347 span.set_data("authentik Flow", self.flow.slug) 348 stage_response = self.current_stage_view.dispatch(request) 349 return to_stage_response(request, stage_response) 350 except Exception as exc: # noqa 351 return self.handle_exception(exc) 352 353 def _initiate_plan(self) -> FlowPlan: 354 planner = FlowPlanner(self.flow) 355 plan = planner.plan(self.request) 356 self.request.session[SESSION_KEY_PLAN] = plan 357 try: 358 # Call the has_stages getter to check that 359 # there are no issues with the class we might've gotten 360 # from the cache. If there are errors, just delete all cached flows 361 _ = plan.has_stages 362 except Exception: # noqa 363 keys = cache.keys(f"{CACHE_PREFIX}*") 364 cache.delete_many(keys) 365 return self._initiate_plan() 366 return plan 367 368 def restart_flow(self, keep_context=False) -> HttpResponse: 369 """Restart the currently active flow, optionally keeping the current context""" 370 planner = FlowPlanner(self.flow) 371 planner.use_cache = False 372 default_context = None 373 if keep_context: 374 default_context = self.plan.context 375 try: 376 plan = planner.plan(self.request, default_context) 377 except FlowNonApplicableException as exc: 378 self._logger.warning("f(exec): Flow restart not applicable to current user", exc=exc) 379 return self.handle_invalid_flow(exc) 380 self.request.session[SESSION_KEY_PLAN] = plan 381 kwargs = self.kwargs 382 kwargs.update({"flow_slug": self.flow.slug}) 383 return redirect_with_qs("authentik_api:flow-executor", self.request.GET, **kwargs) 384 385 def _flow_done(self) -> HttpResponse: 386 """User Successfully passed all stages""" 387 # Since this is wrapped by the ExecutorShell, the next argument is saved in the session 388 # extract the next param before cancel as that cleans it 389 if self.plan and PLAN_CONTEXT_REDIRECT in self.plan.context: 390 # The context `redirect` variable can only be set by 391 # an expression policy or authentik itself, so we don't 392 # check if its an absolute URL or a relative one 393 self.cancel() 394 return to_stage_response( 395 self.request, redirect(self.plan.context.get(PLAN_CONTEXT_REDIRECT)) 396 ) 397 next_param = self.request.session.get(SESSION_KEY_GET, {}).get( 398 NEXT_ARG_NAME, "authentik_core:root-redirect" 399 ) 400 self.cancel() 401 if next_param and not is_url_absolute(next_param): 402 return to_stage_response(self.request, redirect_with_qs(next_param)) 403 return to_stage_response( 404 self.request, self.stage_invalid(error_message=_("Invalid next URL")) 405 ) 406 407 def stage_ok(self) -> HttpResponse: 408 """Callback called by stages upon successful completion. 409 Persists updated plan and context to session.""" 410 self._logger.debug( 411 "f(exec): Stage ok", 412 stage_class=class_to_path(self.current_stage_view.__class__), 413 ) 414 if isinstance(self.current_stage_view, StageView): 415 self.current_stage_view.cleanup() 416 self.request.session.get(SESSION_KEY_HISTORY, []).append(deepcopy(self.plan)) 417 self.plan.pop() 418 self.request.session[SESSION_KEY_PLAN] = self.plan 419 if self.plan.bindings: 420 self._logger.debug( 421 "f(exec): Continuing with next stage", 422 remaining=len(self.plan.bindings), 423 ) 424 kwargs = self.kwargs 425 kwargs.update({"flow_slug": self.flow.slug}) 426 return redirect_with_qs("authentik_api:flow-executor", self.request.GET, **kwargs) 427 # User passed all stages 428 self._logger.debug( 429 "f(exec): User passed all stages", 430 context=cleanse_dict(self.plan.context), 431 ) 432 return self._flow_done() 433 434 def stage_invalid(self, error_message: str | None = None) -> HttpResponse: 435 """Callback used stage when data is correct but a policy denies access 436 or the user account is disabled. 437 438 Optionally, an exception can be passed, which will be shown if the current user 439 is a superuser.""" 440 self._logger.debug("f(exec): Stage invalid") 441 if self.current_binding and self.current_binding.invalid_response_action in [ 442 InvalidResponseAction.RESTART, 443 InvalidResponseAction.RESTART_WITH_CONTEXT, 444 ]: 445 keep_context = ( 446 self.current_binding.invalid_response_action 447 == InvalidResponseAction.RESTART_WITH_CONTEXT 448 ) 449 self._logger.debug( 450 "f(exec): Invalid response, restarting flow", 451 keep_context=keep_context, 452 ) 453 return self.restart_flow(keep_context) 454 self.cancel() 455 challenge_view = AccessDeniedStage(self, error_message) 456 challenge_view.request = self.request 457 return to_stage_response(self.request, challenge_view.get(self.request)) 458 459 def cancel(self): 460 """Cancel current flow execution""" 461 keys_to_delete = [ 462 SESSION_KEY_PLAN, 463 SESSION_KEY_GET, 464 # We might need the initial POST payloads for later requests 465 # SESSION_KEY_POST, 466 # We don't delete the history on purpose, as a user might 467 # still be inspecting it. 468 # It's only deleted on a fresh executions 469 # SESSION_KEY_HISTORY, 470 ] 471 self._logger.debug("f(exec): cleaning up") 472 for key in keys_to_delete: 473 if key in self.request.session: 474 del self.request.session[key] 475 476 477class CancelView(View): 478 """View which cancels the currently active plan""" 479 480 def get(self, request: HttpRequest) -> HttpResponse: 481 """View which canels the currently active plan""" 482 if SESSION_KEY_PLAN in request.session: 483 del request.session[SESSION_KEY_PLAN] 484 LOGGER.debug("Canceled current plan") 485 next_url = self.request.GET.get(NEXT_ARG_NAME) 486 if next_url and not is_url_absolute(next_url): 487 return redirect(next_url) 488 return redirect("authentik_flows:default-invalidation") 489 490 491class ToDefaultFlow(View): 492 """Redirect to default flow matching by designation""" 493 494 designation: FlowDesignation | None = None 495 496 @staticmethod 497 def flow_by_policy(request: HttpRequest, **flow_filter) -> Flow | None: 498 """Get a Flow by `**flow_filter` and check if the request from `request` can access it.""" 499 flows = Flow.objects.filter(**flow_filter).order_by("slug") 500 for flow in flows: 501 engine = PolicyEngine(flow, request.user, request) 502 engine.build() 503 result = engine.result 504 if result.passing: 505 LOGGER.debug("flow_by_policy: flow passing", flow=flow) 506 return flow 507 LOGGER.warning("flow_by_policy: flow not passing", flow=flow, messages=result.messages) 508 LOGGER.debug("flow_by_policy: no flow found", filters=flow_filter) 509 return None 510 511 @staticmethod 512 def get_flow(request: HttpRequest, designation: FlowDesignation) -> Flow: 513 """Get a flow for the selected designation""" 514 brand: Brand = request.brand 515 flow = None 516 # First, attempt to get default flow from brand 517 if designation == FlowDesignation.AUTHENTICATION: 518 flow = brand.flow_authentication 519 elif designation == FlowDesignation.INVALIDATION: 520 flow = brand.flow_invalidation 521 if flow: 522 return flow 523 # If no flow was set, get the first based on slug and policy 524 flow = ToDefaultFlow.flow_by_policy(request, designation=designation) 525 if flow: 526 return flow 527 # If we still don't have a flow, 404 528 raise Http404 529 530 def dispatch(self, request: HttpRequest) -> HttpResponse: 531 flow = ToDefaultFlow.get_flow(request, self.designation) 532 # If user already has a pending plan, clear it so we don't have to later. 533 if SESSION_KEY_PLAN in self.request.session: 534 plan: FlowPlan = self.request.session[SESSION_KEY_PLAN] 535 if plan.flow_pk != flow.pk.hex: 536 LOGGER.warning( 537 "f(def): Found existing plan for other flow, deleting plan", 538 flow_slug=flow.slug, 539 ) 540 del self.request.session[SESSION_KEY_PLAN] 541 return redirect_with_qs("authentik_core:if-flow", request.GET, flow_slug=flow.slug) 542 543 544def to_stage_response(request: HttpRequest, source: HttpResponse) -> HttpResponse: 545 """Convert normal HttpResponse into JSON Response""" 546 if ( 547 isinstance(source, HttpResponseRedirect) 548 or source.status_code == HttpResponseRedirect.status_code 549 ): 550 redirect_url = source["Location"] 551 # Redirects to the same URL usually indicate an Error within a form 552 if request.get_full_path() == redirect_url: 553 return source 554 LOGGER.debug( 555 "converting to redirect challenge", 556 to=str(redirect_url), 557 current=request.path, 558 ) 559 return HttpChallengeResponse( 560 RedirectChallenge( 561 { 562 "to": str(redirect_url), 563 } 564 ) 565 ) 566 if isinstance(source, TemplateResponse): 567 return HttpChallengeResponse( 568 ShellChallenge( 569 { 570 "body": source.render().content.decode("utf-8"), 571 } 572 ) 573 ) 574 # Check for actual HttpResponse (without isinstance as we don't want to check inheritance) 575 if source.__class__ == HttpResponse: 576 return HttpChallengeResponse( 577 ShellChallenge( 578 { 579 "body": source.content.decode("utf-8"), 580 } 581 ) 582 ) 583 return source 584 585 586class ConfigureFlowInitView(LoginRequiredMixin, View): 587 """Initiate planner for selected change flow and redirect to flow executor, 588 or raise Http404 if no configure_flow has been set.""" 589 590 def get(self, request: HttpRequest, stage_uuid: str) -> HttpResponse: 591 """Initiate planner for selected change flow and redirect to flow executor, 592 or raise Http404 if no configure_flow has been set.""" 593 try: 594 stage: Stage = Stage.objects.get_subclass(pk=stage_uuid) 595 except Stage.DoesNotExist as exc: 596 raise Http404 from exc 597 if not isinstance(stage, ConfigurableStage): 598 LOGGER.debug("Stage does not inherit ConfigurableStage", stage=stage) 599 raise Http404 600 if not stage.configure_flow: 601 LOGGER.debug("Stage has no configure_flow set", stage=stage) 602 raise Http404 603 604 try: 605 plan = FlowPlanner(stage.configure_flow).plan( 606 request, {PLAN_CONTEXT_PENDING_USER: request.user} 607 ) 608 except FlowNonApplicableException: 609 LOGGER.warning("Flow not applicable to user") 610 raise Http404 from None 611 return plan.to_redirect(request, stage.configure_flow)
75def challenge_types(): 76 """This function returns a mapping which contains all subclasses of challenges 77 subclasses of Challenge, and Challenge itself.""" 78 mapping = {} 79 for cls in all_subclasses(Challenge): 80 if cls == WithUserInfoChallenge: 81 continue 82 mapping[cls().fields["component"].default] = cls 83 return mapping
This function returns a mapping which contains all subclasses of challenges subclasses of Challenge, and Challenge itself.
86def challenge_response_types(): 87 """This function returns a mapping which contains all subclasses of challenges 88 subclasses of Challenge, and Challenge itself.""" 89 mapping = {} 90 for cls in all_subclasses(ChallengeResponse): 91 mapping[cls(stage=None).fields["component"].default] = cls 92 return mapping
This function returns a mapping which contains all subclasses of challenges subclasses of Challenge, and Challenge itself.
95class InvalidStageError(SentryIgnoredException): 96 """Error raised when a challenge from a stage is not valid"""
Error raised when a challenge from a stage is not valid
99@method_decorator(xframe_options_sameorigin, name="dispatch") 100class FlowExecutorView(APIView): 101 """Flow executor, passing requests to Stage Views""" 102 103 permission_classes = [AllowAny] 104 105 flow: Flow = None 106 107 plan: FlowPlan | None = None 108 current_binding: FlowStageBinding | None = None 109 current_stage: Stage 110 current_stage_view: View 111 112 _logger: BoundLogger 113 114 def setup(self, request: HttpRequest, flow_slug: str): 115 super().setup(request, flow_slug=flow_slug) 116 if not self.flow: 117 self.flow = get_object_or_404(Flow.objects.select_related(), slug=flow_slug) 118 self._logger = get_logger().bind(flow_slug=flow_slug) 119 set_tag("authentik.flow", self.flow.slug) 120 121 def handle_invalid_flow(self, exc: FlowNonApplicableException) -> HttpResponse: 122 """When a flow is non-applicable check if user is on the correct domain""" 123 if self.flow.denied_action in [ 124 FlowDeniedAction.CONTINUE, 125 FlowDeniedAction.MESSAGE_CONTINUE, 126 ]: 127 next_url = self.request.GET.get(NEXT_ARG_NAME) 128 if next_url and not is_url_absolute(next_url): 129 self._logger.debug("f(exec): Redirecting to next on fail") 130 return to_stage_response(self.request, redirect(next_url)) 131 if self.flow.denied_action == FlowDeniedAction.CONTINUE: 132 return to_stage_response( 133 self.request, redirect(reverse("authentik_core:root-redirect")) 134 ) 135 return to_stage_response(self.request, self.stage_invalid(error_message=exc.messages)) 136 137 def _check_flow_token(self, key: str) -> FlowPlan | None: 138 """Check if the user is using a flow token to restore a plan""" 139 token: FlowToken | None = FlowToken.objects.filter(key=key).first() 140 if not token: 141 return None 142 plan = None 143 try: 144 plan = token.plan 145 except (AttributeError, EOFError, ImportError, IndexError) as exc: 146 LOGGER.warning("f(exec): Failed to restore token plan", exc=exc) 147 finally: 148 if token.revoke_on_execution: 149 token.delete() 150 if not isinstance(plan, FlowPlan): 151 return None 152 if existing_plan := self.request.session.get(SESSION_KEY_PLAN): 153 plan.context.update(existing_plan.context) 154 plan.context[PLAN_CONTEXT_IS_RESTORED] = token 155 self._logger.debug("f(exec): restored flow plan from token", plan=plan) 156 return plan 157 158 def dispatch(self, request: HttpRequest, flow_slug: str) -> HttpResponse: 159 with start_span(op="authentik.flow.executor.dispatch", name=self.flow.slug) as span: 160 span.set_data("authentik Flow", self.flow.slug) 161 get_params = QueryDict(request.GET.get(QS_QUERY, "")) 162 if QS_KEY_TOKEN in get_params: 163 plan = self._check_flow_token(get_params[QS_KEY_TOKEN]) 164 if plan: 165 self.request.session[SESSION_KEY_PLAN] = plan 166 # Early check if there's an active Plan for the current session 167 if SESSION_KEY_PLAN in self.request.session: 168 self.plan: FlowPlan = self.request.session[SESSION_KEY_PLAN] 169 if self.plan.flow_pk != self.flow.pk.hex: 170 self._logger.warning( 171 "f(exec): Found existing plan for other flow, deleting plan", 172 other_flow=self.plan.flow_pk, 173 ) 174 # Existing plan is deleted from session and instance 175 self.plan = None 176 self.cancel() 177 else: 178 self._logger.debug("f(exec): Continuing existing plan") 179 180 # Initial flow request, check if we have an upstream query string passed in 181 request.session[SESSION_KEY_GET] = get_params 182 # Don't check session again as we've either already loaded the plan or we need to plan 183 if not self.plan: 184 request.session[SESSION_KEY_HISTORY] = [] 185 self._logger.debug("f(exec): No active Plan found, initiating planner") 186 try: 187 self.plan = self._initiate_plan() 188 except FlowNonApplicableException as exc: 189 # If we're this flow is for authentication and the user is already authenticated 190 # continue to the next URL 191 if ( 192 self.flow.designation == FlowDesignation.AUTHENTICATION 193 and self.request.user.is_authenticated 194 ): 195 return self._flow_done() 196 self._logger.warning("f(exec): Flow not applicable to current user", exc=exc) 197 return self.handle_invalid_flow(exc) 198 except EmptyFlowException as exc: 199 self._logger.warning("f(exec): Flow is empty", exc=exc) 200 # To match behaviour with loading an empty flow plan from cache, 201 # we don't show an error message here, but rather call _flow_done() 202 return self._flow_done() 203 # We don't save the Plan after getting the next stage 204 # as it hasn't been successfully passed yet 205 try: 206 # This is the first time we actually access any attribute on the selected plan 207 # if the cached plan is from an older version, it might have different attributes 208 # in which case we just delete the plan and invalidate everything 209 next_binding = self.plan.next(self.request) 210 except Exception as exc: # noqa 211 self._logger.warning( 212 "f(exec): found incompatible flow plan, invalidating run", exc=exc 213 ) 214 keys = cache.keys(f"{CACHE_PREFIX}*") 215 cache.delete_many(keys) 216 return self.stage_invalid() 217 if not next_binding: 218 self._logger.debug("f(exec): no more stages, flow is done.") 219 return self._flow_done() 220 self.current_binding = next_binding 221 self.current_stage = next_binding.stage 222 self._logger.debug( 223 "f(exec): Current stage", 224 current_stage=self.current_stage, 225 flow_slug=self.flow.slug, 226 ) 227 try: 228 stage_cls = self.current_stage.view 229 except NotImplementedError as exc: 230 self._logger.debug("Error getting stage type", exc=exc) 231 return self.stage_invalid() 232 self.current_stage_view = stage_cls(self) 233 self.current_stage_view.args = self.args 234 self.current_stage_view.kwargs = self.kwargs 235 self.current_stage_view.request = request 236 try: 237 return super().dispatch(request) 238 except InvalidStageError as exc: 239 return self.stage_invalid(str(exc)) 240 241 def handle_exception(self, exc: Exception) -> HttpResponse: 242 """Handle exception in stage execution""" 243 if settings.DEBUG or settings.TEST: 244 raise exc 245 self._logger.warning(exc) 246 if not should_ignore_exception(exc): 247 capture_exception(exc) 248 Event.new( 249 action=EventAction.SYSTEM_EXCEPTION, 250 message="System exception during flow execution.", 251 ).with_exception(exc).from_http(self.request) 252 challenge = FlowErrorChallenge(self.request, exc) 253 challenge.is_valid(raise_exception=True) 254 return to_stage_response(self.request, HttpChallengeResponse(challenge)) 255 256 @extend_schema( 257 responses={ 258 200: PolymorphicProxySerializer( 259 component_name="ChallengeTypes", 260 serializers=challenge_types, 261 resource_type_field_name="component", 262 ), 263 }, 264 request=OpenApiTypes.NONE, 265 parameters=[ 266 OpenApiParameter( 267 name="query", 268 location=OpenApiParameter.QUERY, 269 required=True, 270 description="Querystring as received", 271 type=OpenApiTypes.STR, 272 ) 273 ], 274 operation_id="flows_executor_get", 275 ) 276 def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 277 """Get the next pending challenge from the currently active flow.""" 278 class_path = class_to_path(self.current_stage_view.__class__) 279 self._logger.debug( 280 "f(exec): Passing GET", 281 view_class=class_path, 282 stage=self.current_stage, 283 ) 284 try: 285 with ( 286 start_span( 287 op="authentik.flow.executor.stage", 288 name=class_path, 289 ) as span, 290 HIST_FLOW_EXECUTION_STAGE_TIME.labels( 291 method=request.method.upper(), 292 stage_type=class_path, 293 ).time(), 294 ): 295 span.set_data("Method", request.method.upper()) 296 span.set_data("authentik Stage", self.current_stage_view) 297 span.set_data("authentik Flow", self.flow.slug) 298 stage_response = self.current_stage_view.dispatch(request) 299 return to_stage_response(request, stage_response) 300 except Exception as exc: # noqa 301 return self.handle_exception(exc) 302 303 @extend_schema( 304 responses={ 305 200: PolymorphicProxySerializer( 306 component_name="ChallengeTypes", 307 serializers=challenge_types, 308 resource_type_field_name="component", 309 ), 310 }, 311 request=PolymorphicProxySerializer( 312 component_name="FlowChallengeResponse", 313 serializers=challenge_response_types, 314 resource_type_field_name="component", 315 ), 316 parameters=[ 317 OpenApiParameter( 318 name="query", 319 location=OpenApiParameter.QUERY, 320 required=True, 321 description="Querystring as received", 322 type=OpenApiTypes.STR, 323 ) 324 ], 325 operation_id="flows_executor_solve", 326 ) 327 def post(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 328 """Solve the previously retrieved challenge and advanced to the next stage.""" 329 class_path = class_to_path(self.current_stage_view.__class__) 330 self._logger.debug( 331 "f(exec): Passing POST", 332 view_class=class_path, 333 stage=self.current_stage, 334 ) 335 try: 336 with ( 337 start_span( 338 op="authentik.flow.executor.stage", 339 name=class_path, 340 ) as span, 341 HIST_FLOW_EXECUTION_STAGE_TIME.labels( 342 method=request.method.upper(), 343 stage_type=class_path, 344 ).time(), 345 ): 346 span.set_data("Method", request.method.upper()) 347 span.set_data("authentik Stage", self.current_stage_view) 348 span.set_data("authentik Flow", self.flow.slug) 349 stage_response = self.current_stage_view.dispatch(request) 350 return to_stage_response(request, stage_response) 351 except Exception as exc: # noqa 352 return self.handle_exception(exc) 353 354 def _initiate_plan(self) -> FlowPlan: 355 planner = FlowPlanner(self.flow) 356 plan = planner.plan(self.request) 357 self.request.session[SESSION_KEY_PLAN] = plan 358 try: 359 # Call the has_stages getter to check that 360 # there are no issues with the class we might've gotten 361 # from the cache. If there are errors, just delete all cached flows 362 _ = plan.has_stages 363 except Exception: # noqa 364 keys = cache.keys(f"{CACHE_PREFIX}*") 365 cache.delete_many(keys) 366 return self._initiate_plan() 367 return plan 368 369 def restart_flow(self, keep_context=False) -> HttpResponse: 370 """Restart the currently active flow, optionally keeping the current context""" 371 planner = FlowPlanner(self.flow) 372 planner.use_cache = False 373 default_context = None 374 if keep_context: 375 default_context = self.plan.context 376 try: 377 plan = planner.plan(self.request, default_context) 378 except FlowNonApplicableException as exc: 379 self._logger.warning("f(exec): Flow restart not applicable to current user", exc=exc) 380 return self.handle_invalid_flow(exc) 381 self.request.session[SESSION_KEY_PLAN] = plan 382 kwargs = self.kwargs 383 kwargs.update({"flow_slug": self.flow.slug}) 384 return redirect_with_qs("authentik_api:flow-executor", self.request.GET, **kwargs) 385 386 def _flow_done(self) -> HttpResponse: 387 """User Successfully passed all stages""" 388 # Since this is wrapped by the ExecutorShell, the next argument is saved in the session 389 # extract the next param before cancel as that cleans it 390 if self.plan and PLAN_CONTEXT_REDIRECT in self.plan.context: 391 # The context `redirect` variable can only be set by 392 # an expression policy or authentik itself, so we don't 393 # check if its an absolute URL or a relative one 394 self.cancel() 395 return to_stage_response( 396 self.request, redirect(self.plan.context.get(PLAN_CONTEXT_REDIRECT)) 397 ) 398 next_param = self.request.session.get(SESSION_KEY_GET, {}).get( 399 NEXT_ARG_NAME, "authentik_core:root-redirect" 400 ) 401 self.cancel() 402 if next_param and not is_url_absolute(next_param): 403 return to_stage_response(self.request, redirect_with_qs(next_param)) 404 return to_stage_response( 405 self.request, self.stage_invalid(error_message=_("Invalid next URL")) 406 ) 407 408 def stage_ok(self) -> HttpResponse: 409 """Callback called by stages upon successful completion. 410 Persists updated plan and context to session.""" 411 self._logger.debug( 412 "f(exec): Stage ok", 413 stage_class=class_to_path(self.current_stage_view.__class__), 414 ) 415 if isinstance(self.current_stage_view, StageView): 416 self.current_stage_view.cleanup() 417 self.request.session.get(SESSION_KEY_HISTORY, []).append(deepcopy(self.plan)) 418 self.plan.pop() 419 self.request.session[SESSION_KEY_PLAN] = self.plan 420 if self.plan.bindings: 421 self._logger.debug( 422 "f(exec): Continuing with next stage", 423 remaining=len(self.plan.bindings), 424 ) 425 kwargs = self.kwargs 426 kwargs.update({"flow_slug": self.flow.slug}) 427 return redirect_with_qs("authentik_api:flow-executor", self.request.GET, **kwargs) 428 # User passed all stages 429 self._logger.debug( 430 "f(exec): User passed all stages", 431 context=cleanse_dict(self.plan.context), 432 ) 433 return self._flow_done() 434 435 def stage_invalid(self, error_message: str | None = None) -> HttpResponse: 436 """Callback used stage when data is correct but a policy denies access 437 or the user account is disabled. 438 439 Optionally, an exception can be passed, which will be shown if the current user 440 is a superuser.""" 441 self._logger.debug("f(exec): Stage invalid") 442 if self.current_binding and self.current_binding.invalid_response_action in [ 443 InvalidResponseAction.RESTART, 444 InvalidResponseAction.RESTART_WITH_CONTEXT, 445 ]: 446 keep_context = ( 447 self.current_binding.invalid_response_action 448 == InvalidResponseAction.RESTART_WITH_CONTEXT 449 ) 450 self._logger.debug( 451 "f(exec): Invalid response, restarting flow", 452 keep_context=keep_context, 453 ) 454 return self.restart_flow(keep_context) 455 self.cancel() 456 challenge_view = AccessDeniedStage(self, error_message) 457 challenge_view.request = self.request 458 return to_stage_response(self.request, challenge_view.get(self.request)) 459 460 def cancel(self): 461 """Cancel current flow execution""" 462 keys_to_delete = [ 463 SESSION_KEY_PLAN, 464 SESSION_KEY_GET, 465 # We might need the initial POST payloads for later requests 466 # SESSION_KEY_POST, 467 # We don't delete the history on purpose, as a user might 468 # still be inspecting it. 469 # It's only deleted on a fresh executions 470 # SESSION_KEY_HISTORY, 471 ] 472 self._logger.debug("f(exec): cleaning up") 473 for key in keys_to_delete: 474 if key in self.request.session: 475 del self.request.session[key]
Flow executor, passing requests to Stage Views
114 def setup(self, request: HttpRequest, flow_slug: str): 115 super().setup(request, flow_slug=flow_slug) 116 if not self.flow: 117 self.flow = get_object_or_404(Flow.objects.select_related(), slug=flow_slug) 118 self._logger = get_logger().bind(flow_slug=flow_slug) 119 set_tag("authentik.flow", self.flow.slug)
Initialize attributes shared by all view methods.
121 def handle_invalid_flow(self, exc: FlowNonApplicableException) -> HttpResponse: 122 """When a flow is non-applicable check if user is on the correct domain""" 123 if self.flow.denied_action in [ 124 FlowDeniedAction.CONTINUE, 125 FlowDeniedAction.MESSAGE_CONTINUE, 126 ]: 127 next_url = self.request.GET.get(NEXT_ARG_NAME) 128 if next_url and not is_url_absolute(next_url): 129 self._logger.debug("f(exec): Redirecting to next on fail") 130 return to_stage_response(self.request, redirect(next_url)) 131 if self.flow.denied_action == FlowDeniedAction.CONTINUE: 132 return to_stage_response( 133 self.request, redirect(reverse("authentik_core:root-redirect")) 134 ) 135 return to_stage_response(self.request, self.stage_invalid(error_message=exc.messages))
When a flow is non-applicable check if user is on the correct domain
158 def dispatch(self, request: HttpRequest, flow_slug: str) -> HttpResponse: 159 with start_span(op="authentik.flow.executor.dispatch", name=self.flow.slug) as span: 160 span.set_data("authentik Flow", self.flow.slug) 161 get_params = QueryDict(request.GET.get(QS_QUERY, "")) 162 if QS_KEY_TOKEN in get_params: 163 plan = self._check_flow_token(get_params[QS_KEY_TOKEN]) 164 if plan: 165 self.request.session[SESSION_KEY_PLAN] = plan 166 # Early check if there's an active Plan for the current session 167 if SESSION_KEY_PLAN in self.request.session: 168 self.plan: FlowPlan = self.request.session[SESSION_KEY_PLAN] 169 if self.plan.flow_pk != self.flow.pk.hex: 170 self._logger.warning( 171 "f(exec): Found existing plan for other flow, deleting plan", 172 other_flow=self.plan.flow_pk, 173 ) 174 # Existing plan is deleted from session and instance 175 self.plan = None 176 self.cancel() 177 else: 178 self._logger.debug("f(exec): Continuing existing plan") 179 180 # Initial flow request, check if we have an upstream query string passed in 181 request.session[SESSION_KEY_GET] = get_params 182 # Don't check session again as we've either already loaded the plan or we need to plan 183 if not self.plan: 184 request.session[SESSION_KEY_HISTORY] = [] 185 self._logger.debug("f(exec): No active Plan found, initiating planner") 186 try: 187 self.plan = self._initiate_plan() 188 except FlowNonApplicableException as exc: 189 # If we're this flow is for authentication and the user is already authenticated 190 # continue to the next URL 191 if ( 192 self.flow.designation == FlowDesignation.AUTHENTICATION 193 and self.request.user.is_authenticated 194 ): 195 return self._flow_done() 196 self._logger.warning("f(exec): Flow not applicable to current user", exc=exc) 197 return self.handle_invalid_flow(exc) 198 except EmptyFlowException as exc: 199 self._logger.warning("f(exec): Flow is empty", exc=exc) 200 # To match behaviour with loading an empty flow plan from cache, 201 # we don't show an error message here, but rather call _flow_done() 202 return self._flow_done() 203 # We don't save the Plan after getting the next stage 204 # as it hasn't been successfully passed yet 205 try: 206 # This is the first time we actually access any attribute on the selected plan 207 # if the cached plan is from an older version, it might have different attributes 208 # in which case we just delete the plan and invalidate everything 209 next_binding = self.plan.next(self.request) 210 except Exception as exc: # noqa 211 self._logger.warning( 212 "f(exec): found incompatible flow plan, invalidating run", exc=exc 213 ) 214 keys = cache.keys(f"{CACHE_PREFIX}*") 215 cache.delete_many(keys) 216 return self.stage_invalid() 217 if not next_binding: 218 self._logger.debug("f(exec): no more stages, flow is done.") 219 return self._flow_done() 220 self.current_binding = next_binding 221 self.current_stage = next_binding.stage 222 self._logger.debug( 223 "f(exec): Current stage", 224 current_stage=self.current_stage, 225 flow_slug=self.flow.slug, 226 ) 227 try: 228 stage_cls = self.current_stage.view 229 except NotImplementedError as exc: 230 self._logger.debug("Error getting stage type", exc=exc) 231 return self.stage_invalid() 232 self.current_stage_view = stage_cls(self) 233 self.current_stage_view.args = self.args 234 self.current_stage_view.kwargs = self.kwargs 235 self.current_stage_view.request = request 236 try: 237 return super().dispatch(request) 238 except InvalidStageError as exc: 239 return self.stage_invalid(str(exc))
.dispatch() is pretty much the same as Django's regular dispatch,
but with extra hooks for startup, finalize, and exception handling.
241 def handle_exception(self, exc: Exception) -> HttpResponse: 242 """Handle exception in stage execution""" 243 if settings.DEBUG or settings.TEST: 244 raise exc 245 self._logger.warning(exc) 246 if not should_ignore_exception(exc): 247 capture_exception(exc) 248 Event.new( 249 action=EventAction.SYSTEM_EXCEPTION, 250 message="System exception during flow execution.", 251 ).with_exception(exc).from_http(self.request) 252 challenge = FlowErrorChallenge(self.request, exc) 253 challenge.is_valid(raise_exception=True) 254 return to_stage_response(self.request, HttpChallengeResponse(challenge))
Handle exception in stage execution
256 @extend_schema( 257 responses={ 258 200: PolymorphicProxySerializer( 259 component_name="ChallengeTypes", 260 serializers=challenge_types, 261 resource_type_field_name="component", 262 ), 263 }, 264 request=OpenApiTypes.NONE, 265 parameters=[ 266 OpenApiParameter( 267 name="query", 268 location=OpenApiParameter.QUERY, 269 required=True, 270 description="Querystring as received", 271 type=OpenApiTypes.STR, 272 ) 273 ], 274 operation_id="flows_executor_get", 275 ) 276 def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 277 """Get the next pending challenge from the currently active flow.""" 278 class_path = class_to_path(self.current_stage_view.__class__) 279 self._logger.debug( 280 "f(exec): Passing GET", 281 view_class=class_path, 282 stage=self.current_stage, 283 ) 284 try: 285 with ( 286 start_span( 287 op="authentik.flow.executor.stage", 288 name=class_path, 289 ) as span, 290 HIST_FLOW_EXECUTION_STAGE_TIME.labels( 291 method=request.method.upper(), 292 stage_type=class_path, 293 ).time(), 294 ): 295 span.set_data("Method", request.method.upper()) 296 span.set_data("authentik Stage", self.current_stage_view) 297 span.set_data("authentik Flow", self.flow.slug) 298 stage_response = self.current_stage_view.dispatch(request) 299 return to_stage_response(request, stage_response) 300 except Exception as exc: # noqa 301 return self.handle_exception(exc)
Get the next pending challenge from the currently active flow.
303 @extend_schema( 304 responses={ 305 200: PolymorphicProxySerializer( 306 component_name="ChallengeTypes", 307 serializers=challenge_types, 308 resource_type_field_name="component", 309 ), 310 }, 311 request=PolymorphicProxySerializer( 312 component_name="FlowChallengeResponse", 313 serializers=challenge_response_types, 314 resource_type_field_name="component", 315 ), 316 parameters=[ 317 OpenApiParameter( 318 name="query", 319 location=OpenApiParameter.QUERY, 320 required=True, 321 description="Querystring as received", 322 type=OpenApiTypes.STR, 323 ) 324 ], 325 operation_id="flows_executor_solve", 326 ) 327 def post(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 328 """Solve the previously retrieved challenge and advanced to the next stage.""" 329 class_path = class_to_path(self.current_stage_view.__class__) 330 self._logger.debug( 331 "f(exec): Passing POST", 332 view_class=class_path, 333 stage=self.current_stage, 334 ) 335 try: 336 with ( 337 start_span( 338 op="authentik.flow.executor.stage", 339 name=class_path, 340 ) as span, 341 HIST_FLOW_EXECUTION_STAGE_TIME.labels( 342 method=request.method.upper(), 343 stage_type=class_path, 344 ).time(), 345 ): 346 span.set_data("Method", request.method.upper()) 347 span.set_data("authentik Stage", self.current_stage_view) 348 span.set_data("authentik Flow", self.flow.slug) 349 stage_response = self.current_stage_view.dispatch(request) 350 return to_stage_response(request, stage_response) 351 except Exception as exc: # noqa 352 return self.handle_exception(exc)
Solve the previously retrieved challenge and advanced to the next stage.
369 def restart_flow(self, keep_context=False) -> HttpResponse: 370 """Restart the currently active flow, optionally keeping the current context""" 371 planner = FlowPlanner(self.flow) 372 planner.use_cache = False 373 default_context = None 374 if keep_context: 375 default_context = self.plan.context 376 try: 377 plan = planner.plan(self.request, default_context) 378 except FlowNonApplicableException as exc: 379 self._logger.warning("f(exec): Flow restart not applicable to current user", exc=exc) 380 return self.handle_invalid_flow(exc) 381 self.request.session[SESSION_KEY_PLAN] = plan 382 kwargs = self.kwargs 383 kwargs.update({"flow_slug": self.flow.slug}) 384 return redirect_with_qs("authentik_api:flow-executor", self.request.GET, **kwargs)
Restart the currently active flow, optionally keeping the current context
408 def stage_ok(self) -> HttpResponse: 409 """Callback called by stages upon successful completion. 410 Persists updated plan and context to session.""" 411 self._logger.debug( 412 "f(exec): Stage ok", 413 stage_class=class_to_path(self.current_stage_view.__class__), 414 ) 415 if isinstance(self.current_stage_view, StageView): 416 self.current_stage_view.cleanup() 417 self.request.session.get(SESSION_KEY_HISTORY, []).append(deepcopy(self.plan)) 418 self.plan.pop() 419 self.request.session[SESSION_KEY_PLAN] = self.plan 420 if self.plan.bindings: 421 self._logger.debug( 422 "f(exec): Continuing with next stage", 423 remaining=len(self.plan.bindings), 424 ) 425 kwargs = self.kwargs 426 kwargs.update({"flow_slug": self.flow.slug}) 427 return redirect_with_qs("authentik_api:flow-executor", self.request.GET, **kwargs) 428 # User passed all stages 429 self._logger.debug( 430 "f(exec): User passed all stages", 431 context=cleanse_dict(self.plan.context), 432 ) 433 return self._flow_done()
Callback called by stages upon successful completion. Persists updated plan and context to session.
435 def stage_invalid(self, error_message: str | None = None) -> HttpResponse: 436 """Callback used stage when data is correct but a policy denies access 437 or the user account is disabled. 438 439 Optionally, an exception can be passed, which will be shown if the current user 440 is a superuser.""" 441 self._logger.debug("f(exec): Stage invalid") 442 if self.current_binding and self.current_binding.invalid_response_action in [ 443 InvalidResponseAction.RESTART, 444 InvalidResponseAction.RESTART_WITH_CONTEXT, 445 ]: 446 keep_context = ( 447 self.current_binding.invalid_response_action 448 == InvalidResponseAction.RESTART_WITH_CONTEXT 449 ) 450 self._logger.debug( 451 "f(exec): Invalid response, restarting flow", 452 keep_context=keep_context, 453 ) 454 return self.restart_flow(keep_context) 455 self.cancel() 456 challenge_view = AccessDeniedStage(self, error_message) 457 challenge_view.request = self.request 458 return to_stage_response(self.request, challenge_view.get(self.request))
Callback used stage when data is correct but a policy denies access or the user account is disabled.
Optionally, an exception can be passed, which will be shown if the current user is a superuser.
460 def cancel(self): 461 """Cancel current flow execution""" 462 keys_to_delete = [ 463 SESSION_KEY_PLAN, 464 SESSION_KEY_GET, 465 # We might need the initial POST payloads for later requests 466 # SESSION_KEY_POST, 467 # We don't delete the history on purpose, as a user might 468 # still be inspecting it. 469 # It's only deleted on a fresh executions 470 # SESSION_KEY_HISTORY, 471 ] 472 self._logger.debug("f(exec): cleaning up") 473 for key in keys_to_delete: 474 if key in self.request.session: 475 del self.request.session[key]
Cancel current flow execution
478class CancelView(View): 479 """View which cancels the currently active plan""" 480 481 def get(self, request: HttpRequest) -> HttpResponse: 482 """View which canels the currently active plan""" 483 if SESSION_KEY_PLAN in request.session: 484 del request.session[SESSION_KEY_PLAN] 485 LOGGER.debug("Canceled current plan") 486 next_url = self.request.GET.get(NEXT_ARG_NAME) 487 if next_url and not is_url_absolute(next_url): 488 return redirect(next_url) 489 return redirect("authentik_flows:default-invalidation")
View which cancels the currently active plan
481 def get(self, request: HttpRequest) -> HttpResponse: 482 """View which canels the currently active plan""" 483 if SESSION_KEY_PLAN in request.session: 484 del request.session[SESSION_KEY_PLAN] 485 LOGGER.debug("Canceled current plan") 486 next_url = self.request.GET.get(NEXT_ARG_NAME) 487 if next_url and not is_url_absolute(next_url): 488 return redirect(next_url) 489 return redirect("authentik_flows:default-invalidation")
View which canels the currently active plan
492class ToDefaultFlow(View): 493 """Redirect to default flow matching by designation""" 494 495 designation: FlowDesignation | None = None 496 497 @staticmethod 498 def flow_by_policy(request: HttpRequest, **flow_filter) -> Flow | None: 499 """Get a Flow by `**flow_filter` and check if the request from `request` can access it.""" 500 flows = Flow.objects.filter(**flow_filter).order_by("slug") 501 for flow in flows: 502 engine = PolicyEngine(flow, request.user, request) 503 engine.build() 504 result = engine.result 505 if result.passing: 506 LOGGER.debug("flow_by_policy: flow passing", flow=flow) 507 return flow 508 LOGGER.warning("flow_by_policy: flow not passing", flow=flow, messages=result.messages) 509 LOGGER.debug("flow_by_policy: no flow found", filters=flow_filter) 510 return None 511 512 @staticmethod 513 def get_flow(request: HttpRequest, designation: FlowDesignation) -> Flow: 514 """Get a flow for the selected designation""" 515 brand: Brand = request.brand 516 flow = None 517 # First, attempt to get default flow from brand 518 if designation == FlowDesignation.AUTHENTICATION: 519 flow = brand.flow_authentication 520 elif designation == FlowDesignation.INVALIDATION: 521 flow = brand.flow_invalidation 522 if flow: 523 return flow 524 # If no flow was set, get the first based on slug and policy 525 flow = ToDefaultFlow.flow_by_policy(request, designation=designation) 526 if flow: 527 return flow 528 # If we still don't have a flow, 404 529 raise Http404 530 531 def dispatch(self, request: HttpRequest) -> HttpResponse: 532 flow = ToDefaultFlow.get_flow(request, self.designation) 533 # If user already has a pending plan, clear it so we don't have to later. 534 if SESSION_KEY_PLAN in self.request.session: 535 plan: FlowPlan = self.request.session[SESSION_KEY_PLAN] 536 if plan.flow_pk != flow.pk.hex: 537 LOGGER.warning( 538 "f(def): Found existing plan for other flow, deleting plan", 539 flow_slug=flow.slug, 540 ) 541 del self.request.session[SESSION_KEY_PLAN] 542 return redirect_with_qs("authentik_core:if-flow", request.GET, flow_slug=flow.slug)
Redirect to default flow matching by designation
497 @staticmethod 498 def flow_by_policy(request: HttpRequest, **flow_filter) -> Flow | None: 499 """Get a Flow by `**flow_filter` and check if the request from `request` can access it.""" 500 flows = Flow.objects.filter(**flow_filter).order_by("slug") 501 for flow in flows: 502 engine = PolicyEngine(flow, request.user, request) 503 engine.build() 504 result = engine.result 505 if result.passing: 506 LOGGER.debug("flow_by_policy: flow passing", flow=flow) 507 return flow 508 LOGGER.warning("flow_by_policy: flow not passing", flow=flow, messages=result.messages) 509 LOGGER.debug("flow_by_policy: no flow found", filters=flow_filter) 510 return None
Get a Flow by **flow_filter and check if the request from request can access it.
512 @staticmethod 513 def get_flow(request: HttpRequest, designation: FlowDesignation) -> Flow: 514 """Get a flow for the selected designation""" 515 brand: Brand = request.brand 516 flow = None 517 # First, attempt to get default flow from brand 518 if designation == FlowDesignation.AUTHENTICATION: 519 flow = brand.flow_authentication 520 elif designation == FlowDesignation.INVALIDATION: 521 flow = brand.flow_invalidation 522 if flow: 523 return flow 524 # If no flow was set, get the first based on slug and policy 525 flow = ToDefaultFlow.flow_by_policy(request, designation=designation) 526 if flow: 527 return flow 528 # If we still don't have a flow, 404 529 raise Http404
Get a flow for the selected designation
531 def dispatch(self, request: HttpRequest) -> HttpResponse: 532 flow = ToDefaultFlow.get_flow(request, self.designation) 533 # If user already has a pending plan, clear it so we don't have to later. 534 if SESSION_KEY_PLAN in self.request.session: 535 plan: FlowPlan = self.request.session[SESSION_KEY_PLAN] 536 if plan.flow_pk != flow.pk.hex: 537 LOGGER.warning( 538 "f(def): Found existing plan for other flow, deleting plan", 539 flow_slug=flow.slug, 540 ) 541 del self.request.session[SESSION_KEY_PLAN] 542 return redirect_with_qs("authentik_core:if-flow", request.GET, flow_slug=flow.slug)
545def to_stage_response(request: HttpRequest, source: HttpResponse) -> HttpResponse: 546 """Convert normal HttpResponse into JSON Response""" 547 if ( 548 isinstance(source, HttpResponseRedirect) 549 or source.status_code == HttpResponseRedirect.status_code 550 ): 551 redirect_url = source["Location"] 552 # Redirects to the same URL usually indicate an Error within a form 553 if request.get_full_path() == redirect_url: 554 return source 555 LOGGER.debug( 556 "converting to redirect challenge", 557 to=str(redirect_url), 558 current=request.path, 559 ) 560 return HttpChallengeResponse( 561 RedirectChallenge( 562 { 563 "to": str(redirect_url), 564 } 565 ) 566 ) 567 if isinstance(source, TemplateResponse): 568 return HttpChallengeResponse( 569 ShellChallenge( 570 { 571 "body": source.render().content.decode("utf-8"), 572 } 573 ) 574 ) 575 # Check for actual HttpResponse (without isinstance as we don't want to check inheritance) 576 if source.__class__ == HttpResponse: 577 return HttpChallengeResponse( 578 ShellChallenge( 579 { 580 "body": source.content.decode("utf-8"), 581 } 582 ) 583 ) 584 return source
Convert normal HttpResponse into JSON Response
587class ConfigureFlowInitView(LoginRequiredMixin, View): 588 """Initiate planner for selected change flow and redirect to flow executor, 589 or raise Http404 if no configure_flow has been set.""" 590 591 def get(self, request: HttpRequest, stage_uuid: str) -> HttpResponse: 592 """Initiate planner for selected change flow and redirect to flow executor, 593 or raise Http404 if no configure_flow has been set.""" 594 try: 595 stage: Stage = Stage.objects.get_subclass(pk=stage_uuid) 596 except Stage.DoesNotExist as exc: 597 raise Http404 from exc 598 if not isinstance(stage, ConfigurableStage): 599 LOGGER.debug("Stage does not inherit ConfigurableStage", stage=stage) 600 raise Http404 601 if not stage.configure_flow: 602 LOGGER.debug("Stage has no configure_flow set", stage=stage) 603 raise Http404 604 605 try: 606 plan = FlowPlanner(stage.configure_flow).plan( 607 request, {PLAN_CONTEXT_PENDING_USER: request.user} 608 ) 609 except FlowNonApplicableException: 610 LOGGER.warning("Flow not applicable to user") 611 raise Http404 from None 612 return plan.to_redirect(request, stage.configure_flow)
Initiate planner for selected change flow and redirect to flow executor, or raise Http404 if no configure_flow has been set.
591 def get(self, request: HttpRequest, stage_uuid: str) -> HttpResponse: 592 """Initiate planner for selected change flow and redirect to flow executor, 593 or raise Http404 if no configure_flow has been set.""" 594 try: 595 stage: Stage = Stage.objects.get_subclass(pk=stage_uuid) 596 except Stage.DoesNotExist as exc: 597 raise Http404 from exc 598 if not isinstance(stage, ConfigurableStage): 599 LOGGER.debug("Stage does not inherit ConfigurableStage", stage=stage) 600 raise Http404 601 if not stage.configure_flow: 602 LOGGER.debug("Stage has no configure_flow set", stage=stage) 603 raise Http404 604 605 try: 606 plan = FlowPlanner(stage.configure_flow).plan( 607 request, {PLAN_CONTEXT_PENDING_USER: request.user} 608 ) 609 except FlowNonApplicableException: 610 LOGGER.warning("Flow not applicable to user") 611 raise Http404 from None 612 return plan.to_redirect(request, stage.configure_flow)
Initiate planner for selected change flow and redirect to flow executor, or raise Http404 if no configure_flow has been set.