authentik.flows.views.executor
authentik multi-stage authentication engine
1"""authentik multi-stage authentication engine""" 2 3from copy import deepcopy 4 5from django.conf import settings 6from django.contrib.auth.mixins import LoginRequiredMixin 7from django.core.cache import cache 8from django.http import Http404, HttpRequest, HttpResponse, HttpResponseRedirect 9from django.http.request import QueryDict 10from django.shortcuts import get_object_or_404, redirect 11from django.template.response import TemplateResponse 12from django.urls import reverse 13from django.utils.decorators import method_decorator 14from django.utils.translation import gettext as _ 15from django.views.decorators.clickjacking import xframe_options_sameorigin 16from django.views.generic import View 17from drf_spectacular.types import OpenApiTypes 18from drf_spectacular.utils import OpenApiParameter, PolymorphicProxySerializer, extend_schema 19from rest_framework.permissions import AllowAny 20from rest_framework.views import APIView 21from sentry_sdk import capture_exception, start_span 22from sentry_sdk.api import set_tag 23from structlog.stdlib import BoundLogger, get_logger 24 25from authentik.brands.models import Brand 26from authentik.events.models import Event, EventAction, cleanse_dict 27from authentik.flows.apps import HIST_FLOW_EXECUTION_STAGE_TIME 28from authentik.flows.challenge import ( 29 Challenge, 30 ChallengeResponse, 31 FlowErrorChallenge, 32 HttpChallengeResponse, 33 RedirectChallenge, 34 ShellChallenge, 35 WithUserInfoChallenge, 36) 37from authentik.flows.exceptions import EmptyFlowException, FlowNonApplicableException 38from authentik.flows.models import ( 39 ConfigurableStage, 40 Flow, 41 FlowDeniedAction, 42 FlowDesignation, 43 FlowStageBinding, 44 FlowToken, 45 InvalidResponseAction, 46 Stage, 47) 48from authentik.flows.planner import ( 49 CACHE_PREFIX, 50 PLAN_CONTEXT_IS_RESTORED, 51 PLAN_CONTEXT_PENDING_USER, 52 PLAN_CONTEXT_REDIRECT, 53 FlowPlan, 54 FlowPlanner, 55) 56from authentik.flows.stage import AccessDeniedStage, StageView 57from authentik.lib.sentry import SentryIgnoredException, should_ignore_exception 58from authentik.lib.utils.reflection import all_subclasses, class_to_path 59from authentik.lib.utils.urls import is_url_absolute, redirect_with_qs 60from authentik.policies.engine import PolicyEngine 61 62LOGGER = get_logger() 63# Argument used to redirect user after login 64NEXT_ARG_NAME = "next" 65SESSION_KEY_PLAN = "authentik/flows/plan" 66SESSION_KEY_GET = "authentik/flows/get" 67SESSION_KEY_POST = "authentik/flows/post" 68SESSION_KEY_HISTORY = "authentik/flows/history" 69QS_KEY_TOKEN = "flow_token" # nosec 70QS_QUERY = "query" 71 72 73def challenge_types(): 74 """This function returns a mapping which contains all subclasses of challenges 75 subclasses of Challenge, and Challenge itself.""" 76 mapping = {} 77 for cls in all_subclasses(Challenge): 78 if cls == WithUserInfoChallenge: 79 continue 80 mapping[cls().fields["component"].default] = cls 81 return mapping 82 83 84def challenge_response_types(): 85 """This function returns a mapping which contains all subclasses of challenges 86 subclasses of Challenge, and Challenge itself.""" 87 mapping = {} 88 for cls in all_subclasses(ChallengeResponse): 89 mapping[cls(stage=None).fields["component"].default] = cls 90 return mapping 91 92 93class InvalidStageError(SentryIgnoredException): 94 """Error raised when a challenge from a stage is not valid""" 95 96 97@method_decorator(xframe_options_sameorigin, name="dispatch") 98class FlowExecutorView(APIView): 99 """Flow executor, passing requests to Stage Views""" 100 101 permission_classes = [AllowAny] 102 103 flow: Flow = None 104 105 plan: FlowPlan | None = None 106 current_binding: FlowStageBinding | None = None 107 current_stage: Stage 108 current_stage_view: View 109 110 _logger: BoundLogger 111 112 def setup(self, request: HttpRequest, flow_slug: str): 113 super().setup(request, flow_slug=flow_slug) 114 if not self.flow: 115 self.flow = get_object_or_404(Flow.objects.select_related(), slug=flow_slug) 116 self._logger = get_logger().bind(flow_slug=flow_slug) 117 set_tag("authentik.flow", self.flow.slug) 118 119 def handle_invalid_flow(self, exc: FlowNonApplicableException) -> HttpResponse: 120 """When a flow is non-applicable check if user is on the correct domain""" 121 if self.flow.denied_action in [ 122 FlowDeniedAction.CONTINUE, 123 FlowDeniedAction.MESSAGE_CONTINUE, 124 ]: 125 next_url = self.request.GET.get(NEXT_ARG_NAME) 126 if next_url and not is_url_absolute(next_url): 127 self._logger.debug("f(exec): Redirecting to next on fail") 128 return to_stage_response(self.request, redirect(next_url)) 129 if self.flow.denied_action == FlowDeniedAction.CONTINUE: 130 return to_stage_response( 131 self.request, redirect(reverse("authentik_core:root-redirect")) 132 ) 133 return to_stage_response(self.request, self.stage_invalid(error_message=exc.messages)) 134 135 def _check_flow_token(self, key: str) -> FlowPlan | None: 136 """Check if the user is using a flow token to restore a plan""" 137 token: FlowToken | None = FlowToken.objects.filter(key=key).first() 138 if not token: 139 return None 140 plan = None 141 try: 142 plan = token.plan 143 except (AttributeError, EOFError, ImportError, IndexError) as exc: 144 LOGGER.warning("f(exec): Failed to restore token plan", exc=exc) 145 finally: 146 if token.revoke_on_execution: 147 token.delete() 148 if not isinstance(plan, FlowPlan): 149 return None 150 if existing_plan := self.request.session.get(SESSION_KEY_PLAN): 151 plan.context.update(existing_plan.context) 152 plan.context[PLAN_CONTEXT_IS_RESTORED] = token 153 self._logger.debug("f(exec): restored flow plan from token", plan=plan) 154 return plan 155 156 def dispatch(self, request: HttpRequest, flow_slug: str) -> HttpResponse: 157 with start_span(op="authentik.flow.executor.dispatch", name=self.flow.slug) as span: 158 span.set_data("authentik Flow", self.flow.slug) 159 get_params = QueryDict(request.GET.get(QS_QUERY, "")) 160 if QS_KEY_TOKEN in get_params: 161 plan = self._check_flow_token(get_params[QS_KEY_TOKEN]) 162 if plan: 163 self.request.session[SESSION_KEY_PLAN] = plan 164 # Early check if there's an active Plan for the current session 165 if SESSION_KEY_PLAN in self.request.session: 166 self.plan: FlowPlan = self.request.session[SESSION_KEY_PLAN] 167 if self.plan.flow_pk != self.flow.pk.hex: 168 self._logger.warning( 169 "f(exec): Found existing plan for other flow, deleting plan", 170 other_flow=self.plan.flow_pk, 171 ) 172 # Existing plan is deleted from session and instance 173 self.plan = None 174 self.cancel() 175 else: 176 self._logger.debug("f(exec): Continuing existing plan") 177 178 # Initial flow request, check if we have an upstream query string passed in 179 request.session[SESSION_KEY_GET] = get_params 180 # Don't check session again as we've either already loaded the plan or we need to plan 181 if not self.plan: 182 request.session[SESSION_KEY_HISTORY] = [] 183 self._logger.debug("f(exec): No active Plan found, initiating planner") 184 try: 185 self.plan = self._initiate_plan() 186 except FlowNonApplicableException as exc: 187 # If we're this flow is for authentication and the user is already authenticated 188 # continue to the next URL 189 if ( 190 self.flow.designation == FlowDesignation.AUTHENTICATION 191 and self.request.user.is_authenticated 192 ): 193 return self._flow_done() 194 self._logger.warning("f(exec): Flow not applicable to current user", exc=exc) 195 return self.handle_invalid_flow(exc) 196 except EmptyFlowException as exc: 197 self._logger.warning("f(exec): Flow is empty", exc=exc) 198 # To match behaviour with loading an empty flow plan from cache, 199 # we don't show an error message here, but rather call _flow_done() 200 return self._flow_done() 201 # We don't save the Plan after getting the next stage 202 # as it hasn't been successfully passed yet 203 try: 204 # This is the first time we actually access any attribute on the selected plan 205 # if the cached plan is from an older version, it might have different attributes 206 # in which case we just delete the plan and invalidate everything 207 next_binding = self.plan.next(self.request) 208 except Exception as exc: # noqa 209 self._logger.warning( 210 "f(exec): found incompatible flow plan, invalidating run", exc=exc 211 ) 212 keys = cache.keys(f"{CACHE_PREFIX}*") 213 cache.delete_many(keys) 214 return self.stage_invalid() 215 if not next_binding: 216 self._logger.debug("f(exec): no more stages, flow is done.") 217 return self._flow_done() 218 self.current_binding = next_binding 219 self.current_stage = next_binding.stage 220 self._logger.debug( 221 "f(exec): Current stage", 222 current_stage=self.current_stage, 223 flow_slug=self.flow.slug, 224 ) 225 try: 226 stage_cls = self.current_stage.view 227 except NotImplementedError as exc: 228 self._logger.debug("Error getting stage type", exc=exc) 229 return self.stage_invalid() 230 self.current_stage_view = stage_cls(self) 231 self.current_stage_view.args = self.args 232 self.current_stage_view.kwargs = self.kwargs 233 self.current_stage_view.request = request 234 try: 235 return super().dispatch(request) 236 except InvalidStageError as exc: 237 return self.stage_invalid(str(exc)) 238 239 def handle_exception(self, exc: Exception) -> HttpResponse: 240 """Handle exception in stage execution""" 241 if settings.DEBUG or settings.TEST: 242 raise exc 243 self._logger.warning(exc) 244 if not should_ignore_exception(exc): 245 capture_exception(exc) 246 Event.new( 247 action=EventAction.SYSTEM_EXCEPTION, 248 message="System exception during flow execution.", 249 ).with_exception(exc).from_http(self.request) 250 challenge = FlowErrorChallenge(self.request, exc) 251 challenge.is_valid(raise_exception=True) 252 return to_stage_response(self.request, HttpChallengeResponse(challenge)) 253 254 @extend_schema( 255 responses={ 256 200: PolymorphicProxySerializer( 257 component_name="ChallengeTypes", 258 serializers=challenge_types, 259 resource_type_field_name="component", 260 ), 261 }, 262 request=OpenApiTypes.NONE, 263 parameters=[ 264 OpenApiParameter( 265 name="query", 266 location=OpenApiParameter.QUERY, 267 required=True, 268 description="Querystring as received", 269 type=OpenApiTypes.STR, 270 ) 271 ], 272 operation_id="flows_executor_get", 273 ) 274 def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 275 """Get the next pending challenge from the currently active flow.""" 276 class_path = class_to_path(self.current_stage_view.__class__) 277 self._logger.debug( 278 "f(exec): Passing GET", 279 view_class=class_path, 280 stage=self.current_stage, 281 ) 282 try: 283 with ( 284 start_span( 285 op="authentik.flow.executor.stage", 286 name=class_path, 287 ) as span, 288 HIST_FLOW_EXECUTION_STAGE_TIME.labels( 289 method=request.method.upper(), 290 stage_type=class_path, 291 ).time(), 292 ): 293 span.set_data("Method", request.method.upper()) 294 span.set_data("authentik Stage", self.current_stage_view) 295 span.set_data("authentik Flow", self.flow.slug) 296 stage_response = self.current_stage_view.dispatch(request) 297 return to_stage_response(request, stage_response) 298 except Exception as exc: # noqa 299 return self.handle_exception(exc) 300 301 @extend_schema( 302 responses={ 303 200: PolymorphicProxySerializer( 304 component_name="ChallengeTypes", 305 serializers=challenge_types, 306 resource_type_field_name="component", 307 ), 308 }, 309 request=PolymorphicProxySerializer( 310 component_name="FlowChallengeResponse", 311 serializers=challenge_response_types, 312 resource_type_field_name="component", 313 ), 314 parameters=[ 315 OpenApiParameter( 316 name="query", 317 location=OpenApiParameter.QUERY, 318 required=True, 319 description="Querystring as received", 320 type=OpenApiTypes.STR, 321 ) 322 ], 323 operation_id="flows_executor_solve", 324 ) 325 def post(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 326 """Solve the previously retrieved challenge and advanced to the next stage.""" 327 class_path = class_to_path(self.current_stage_view.__class__) 328 self._logger.debug( 329 "f(exec): Passing POST", 330 view_class=class_path, 331 stage=self.current_stage, 332 ) 333 try: 334 with ( 335 start_span( 336 op="authentik.flow.executor.stage", 337 name=class_path, 338 ) as span, 339 HIST_FLOW_EXECUTION_STAGE_TIME.labels( 340 method=request.method.upper(), 341 stage_type=class_path, 342 ).time(), 343 ): 344 span.set_data("Method", request.method.upper()) 345 span.set_data("authentik Stage", self.current_stage_view) 346 span.set_data("authentik Flow", self.flow.slug) 347 stage_response = self.current_stage_view.dispatch(request) 348 return to_stage_response(request, stage_response) 349 except Exception as exc: # noqa 350 return self.handle_exception(exc) 351 352 def _initiate_plan(self) -> FlowPlan: 353 planner = FlowPlanner(self.flow) 354 plan = planner.plan(self.request) 355 self.request.session[SESSION_KEY_PLAN] = plan 356 try: 357 # Call the has_stages getter to check that 358 # there are no issues with the class we might've gotten 359 # from the cache. If there are errors, just delete all cached flows 360 _ = plan.has_stages 361 except Exception: # noqa 362 keys = cache.keys(f"{CACHE_PREFIX}*") 363 cache.delete_many(keys) 364 return self._initiate_plan() 365 return plan 366 367 def restart_flow(self, keep_context=False) -> HttpResponse: 368 """Restart the currently active flow, optionally keeping the current context""" 369 planner = FlowPlanner(self.flow) 370 planner.use_cache = False 371 default_context = None 372 if keep_context: 373 default_context = self.plan.context 374 try: 375 plan = planner.plan(self.request, default_context) 376 except FlowNonApplicableException as exc: 377 self._logger.warning("f(exec): Flow restart not applicable to current user", exc=exc) 378 return self.handle_invalid_flow(exc) 379 self.request.session[SESSION_KEY_PLAN] = plan 380 kwargs = self.kwargs 381 kwargs.update({"flow_slug": self.flow.slug}) 382 return redirect_with_qs("authentik_api:flow-executor", self.request.GET, **kwargs) 383 384 def _flow_done(self) -> HttpResponse: 385 """User Successfully passed all stages""" 386 # Since this is wrapped by the ExecutorShell, the next argument is saved in the session 387 # extract the next param before cancel as that cleans it 388 if self.plan and PLAN_CONTEXT_REDIRECT in self.plan.context: 389 # The context `redirect` variable can only be set by 390 # an expression policy or authentik itself, so we don't 391 # check if its an absolute URL or a relative one 392 self.cancel() 393 return to_stage_response( 394 self.request, redirect(self.plan.context.get(PLAN_CONTEXT_REDIRECT)) 395 ) 396 next_param = self.request.session.get(SESSION_KEY_GET, {}).get( 397 NEXT_ARG_NAME, "authentik_core:root-redirect" 398 ) 399 self.cancel() 400 if next_param and not is_url_absolute(next_param): 401 return to_stage_response(self.request, redirect_with_qs(next_param)) 402 return to_stage_response( 403 self.request, self.stage_invalid(error_message=_("Invalid next URL")) 404 ) 405 406 def stage_ok(self) -> HttpResponse: 407 """Callback called by stages upon successful completion. 408 Persists updated plan and context to session.""" 409 self._logger.debug( 410 "f(exec): Stage ok", 411 stage_class=class_to_path(self.current_stage_view.__class__), 412 ) 413 if isinstance(self.current_stage_view, StageView): 414 self.current_stage_view.cleanup() 415 self.request.session.get(SESSION_KEY_HISTORY, []).append(deepcopy(self.plan)) 416 self.plan.pop() 417 self.request.session[SESSION_KEY_PLAN] = self.plan 418 if self.plan.bindings: 419 self._logger.debug( 420 "f(exec): Continuing with next stage", 421 remaining=len(self.plan.bindings), 422 ) 423 kwargs = self.kwargs 424 kwargs.update({"flow_slug": self.flow.slug}) 425 return redirect_with_qs("authentik_api:flow-executor", self.request.GET, **kwargs) 426 # User passed all stages 427 self._logger.debug( 428 "f(exec): User passed all stages", 429 context=cleanse_dict(self.plan.context), 430 ) 431 return self._flow_done() 432 433 def stage_invalid(self, error_message: str | None = None) -> HttpResponse: 434 """Callback used stage when data is correct but a policy denies access 435 or the user account is disabled. 436 437 Optionally, an exception can be passed, which will be shown if the current user 438 is a superuser.""" 439 self._logger.debug("f(exec): Stage invalid") 440 if self.current_binding and self.current_binding.invalid_response_action in [ 441 InvalidResponseAction.RESTART, 442 InvalidResponseAction.RESTART_WITH_CONTEXT, 443 ]: 444 keep_context = ( 445 self.current_binding.invalid_response_action 446 == InvalidResponseAction.RESTART_WITH_CONTEXT 447 ) 448 self._logger.debug( 449 "f(exec): Invalid response, restarting flow", 450 keep_context=keep_context, 451 ) 452 return self.restart_flow(keep_context) 453 self.cancel() 454 challenge_view = AccessDeniedStage(self, error_message) 455 challenge_view.request = self.request 456 return to_stage_response(self.request, challenge_view.get(self.request)) 457 458 def cancel(self): 459 """Cancel current flow execution""" 460 keys_to_delete = [ 461 SESSION_KEY_PLAN, 462 SESSION_KEY_GET, 463 # We might need the initial POST payloads for later requests 464 # SESSION_KEY_POST, 465 # We don't delete the history on purpose, as a user might 466 # still be inspecting it. 467 # It's only deleted on a fresh executions 468 # SESSION_KEY_HISTORY, 469 ] 470 self._logger.debug("f(exec): cleaning up") 471 for key in keys_to_delete: 472 if key in self.request.session: 473 del self.request.session[key] 474 475 476class CancelView(View): 477 """View which cancels the currently active plan""" 478 479 def get(self, request: HttpRequest) -> HttpResponse: 480 """View which canels the currently active plan""" 481 if SESSION_KEY_PLAN in request.session: 482 del request.session[SESSION_KEY_PLAN] 483 LOGGER.debug("Canceled current plan") 484 next_url = self.request.GET.get(NEXT_ARG_NAME) 485 if next_url and not is_url_absolute(next_url): 486 return redirect(next_url) 487 return redirect("authentik_flows:default-invalidation") 488 489 490class ToDefaultFlow(View): 491 """Redirect to default flow matching by designation""" 492 493 designation: FlowDesignation | None = None 494 495 @staticmethod 496 def flow_by_policy(request: HttpRequest, **flow_filter) -> Flow | None: 497 """Get a Flow by `**flow_filter` and check if the request from `request` can access it.""" 498 flows = Flow.objects.filter(**flow_filter).order_by("slug") 499 for flow in flows: 500 engine = PolicyEngine(flow, request.user, request) 501 engine.build() 502 result = engine.result 503 if result.passing: 504 LOGGER.debug("flow_by_policy: flow passing", flow=flow) 505 return flow 506 LOGGER.warning("flow_by_policy: flow not passing", flow=flow, messages=result.messages) 507 LOGGER.debug("flow_by_policy: no flow found", filters=flow_filter) 508 return None 509 510 @staticmethod 511 def get_flow(request: HttpRequest, designation: FlowDesignation) -> Flow: 512 """Get a flow for the selected designation""" 513 brand: Brand = request.brand 514 flow = None 515 # First, attempt to get default flow from brand 516 if designation == FlowDesignation.AUTHENTICATION: 517 flow = brand.flow_authentication 518 elif designation == FlowDesignation.INVALIDATION: 519 flow = brand.flow_invalidation 520 if flow: 521 return flow 522 # If no flow was set, get the first based on slug and policy 523 flow = ToDefaultFlow.flow_by_policy(request, designation=designation) 524 if flow: 525 return flow 526 # If we still don't have a flow, 404 527 raise Http404 528 529 def dispatch(self, request: HttpRequest) -> HttpResponse: 530 flow = ToDefaultFlow.get_flow(request, self.designation) 531 # If user already has a pending plan, clear it so we don't have to later. 532 if SESSION_KEY_PLAN in self.request.session: 533 plan: FlowPlan = self.request.session[SESSION_KEY_PLAN] 534 if plan.flow_pk != flow.pk.hex: 535 LOGGER.warning( 536 "f(def): Found existing plan for other flow, deleting plan", 537 flow_slug=flow.slug, 538 ) 539 del self.request.session[SESSION_KEY_PLAN] 540 return redirect_with_qs("authentik_core:if-flow", request.GET, flow_slug=flow.slug) 541 542 543def to_stage_response(request: HttpRequest, source: HttpResponse) -> HttpResponse: 544 """Convert normal HttpResponse into JSON Response""" 545 if ( 546 isinstance(source, HttpResponseRedirect) 547 or source.status_code == HttpResponseRedirect.status_code 548 ): 549 redirect_url = source["Location"] 550 # Redirects to the same URL usually indicate an Error within a form 551 if request.get_full_path() == redirect_url: 552 return source 553 LOGGER.debug( 554 "converting to redirect challenge", 555 to=str(redirect_url), 556 current=request.path, 557 ) 558 return HttpChallengeResponse( 559 RedirectChallenge( 560 { 561 "to": str(redirect_url), 562 } 563 ) 564 ) 565 if isinstance(source, TemplateResponse): 566 return HttpChallengeResponse( 567 ShellChallenge( 568 { 569 "body": source.render().content.decode("utf-8"), 570 } 571 ) 572 ) 573 # Check for actual HttpResponse (without isinstance as we don't want to check inheritance) 574 if source.__class__ == HttpResponse: 575 return HttpChallengeResponse( 576 ShellChallenge( 577 { 578 "body": source.content.decode("utf-8"), 579 } 580 ) 581 ) 582 return source 583 584 585class ConfigureFlowInitView(LoginRequiredMixin, View): 586 """Initiate planner for selected change flow and redirect to flow executor, 587 or raise Http404 if no configure_flow has been set.""" 588 589 def get(self, request: HttpRequest, stage_uuid: str) -> HttpResponse: 590 """Initiate planner for selected change flow and redirect to flow executor, 591 or raise Http404 if no configure_flow has been set.""" 592 try: 593 stage: Stage = Stage.objects.get_subclass(pk=stage_uuid) 594 except Stage.DoesNotExist as exc: 595 raise Http404 from exc 596 if not isinstance(stage, ConfigurableStage): 597 LOGGER.debug("Stage does not inherit ConfigurableStage", stage=stage) 598 raise Http404 599 if not stage.configure_flow: 600 LOGGER.debug("Stage has no configure_flow set", stage=stage) 601 raise Http404 602 603 try: 604 plan = FlowPlanner(stage.configure_flow).plan( 605 request, {PLAN_CONTEXT_PENDING_USER: request.user} 606 ) 607 except FlowNonApplicableException: 608 LOGGER.warning("Flow not applicable to user") 609 raise Http404 from None 610 return plan.to_redirect(request, stage.configure_flow)
74def challenge_types(): 75 """This function returns a mapping which contains all subclasses of challenges 76 subclasses of Challenge, and Challenge itself.""" 77 mapping = {} 78 for cls in all_subclasses(Challenge): 79 if cls == WithUserInfoChallenge: 80 continue 81 mapping[cls().fields["component"].default] = cls 82 return mapping
This function returns a mapping which contains all subclasses of challenges subclasses of Challenge, and Challenge itself.
85def challenge_response_types(): 86 """This function returns a mapping which contains all subclasses of challenges 87 subclasses of Challenge, and Challenge itself.""" 88 mapping = {} 89 for cls in all_subclasses(ChallengeResponse): 90 mapping[cls(stage=None).fields["component"].default] = cls 91 return mapping
This function returns a mapping which contains all subclasses of challenges subclasses of Challenge, and Challenge itself.
94class InvalidStageError(SentryIgnoredException): 95 """Error raised when a challenge from a stage is not valid"""
Error raised when a challenge from a stage is not valid
98@method_decorator(xframe_options_sameorigin, name="dispatch") 99class FlowExecutorView(APIView): 100 """Flow executor, passing requests to Stage Views""" 101 102 permission_classes = [AllowAny] 103 104 flow: Flow = None 105 106 plan: FlowPlan | None = None 107 current_binding: FlowStageBinding | None = None 108 current_stage: Stage 109 current_stage_view: View 110 111 _logger: BoundLogger 112 113 def setup(self, request: HttpRequest, flow_slug: str): 114 super().setup(request, flow_slug=flow_slug) 115 if not self.flow: 116 self.flow = get_object_or_404(Flow.objects.select_related(), slug=flow_slug) 117 self._logger = get_logger().bind(flow_slug=flow_slug) 118 set_tag("authentik.flow", self.flow.slug) 119 120 def handle_invalid_flow(self, exc: FlowNonApplicableException) -> HttpResponse: 121 """When a flow is non-applicable check if user is on the correct domain""" 122 if self.flow.denied_action in [ 123 FlowDeniedAction.CONTINUE, 124 FlowDeniedAction.MESSAGE_CONTINUE, 125 ]: 126 next_url = self.request.GET.get(NEXT_ARG_NAME) 127 if next_url and not is_url_absolute(next_url): 128 self._logger.debug("f(exec): Redirecting to next on fail") 129 return to_stage_response(self.request, redirect(next_url)) 130 if self.flow.denied_action == FlowDeniedAction.CONTINUE: 131 return to_stage_response( 132 self.request, redirect(reverse("authentik_core:root-redirect")) 133 ) 134 return to_stage_response(self.request, self.stage_invalid(error_message=exc.messages)) 135 136 def _check_flow_token(self, key: str) -> FlowPlan | None: 137 """Check if the user is using a flow token to restore a plan""" 138 token: FlowToken | None = FlowToken.objects.filter(key=key).first() 139 if not token: 140 return None 141 plan = None 142 try: 143 plan = token.plan 144 except (AttributeError, EOFError, ImportError, IndexError) as exc: 145 LOGGER.warning("f(exec): Failed to restore token plan", exc=exc) 146 finally: 147 if token.revoke_on_execution: 148 token.delete() 149 if not isinstance(plan, FlowPlan): 150 return None 151 if existing_plan := self.request.session.get(SESSION_KEY_PLAN): 152 plan.context.update(existing_plan.context) 153 plan.context[PLAN_CONTEXT_IS_RESTORED] = token 154 self._logger.debug("f(exec): restored flow plan from token", plan=plan) 155 return plan 156 157 def dispatch(self, request: HttpRequest, flow_slug: str) -> HttpResponse: 158 with start_span(op="authentik.flow.executor.dispatch", name=self.flow.slug) as span: 159 span.set_data("authentik Flow", self.flow.slug) 160 get_params = QueryDict(request.GET.get(QS_QUERY, "")) 161 if QS_KEY_TOKEN in get_params: 162 plan = self._check_flow_token(get_params[QS_KEY_TOKEN]) 163 if plan: 164 self.request.session[SESSION_KEY_PLAN] = plan 165 # Early check if there's an active Plan for the current session 166 if SESSION_KEY_PLAN in self.request.session: 167 self.plan: FlowPlan = self.request.session[SESSION_KEY_PLAN] 168 if self.plan.flow_pk != self.flow.pk.hex: 169 self._logger.warning( 170 "f(exec): Found existing plan for other flow, deleting plan", 171 other_flow=self.plan.flow_pk, 172 ) 173 # Existing plan is deleted from session and instance 174 self.plan = None 175 self.cancel() 176 else: 177 self._logger.debug("f(exec): Continuing existing plan") 178 179 # Initial flow request, check if we have an upstream query string passed in 180 request.session[SESSION_KEY_GET] = get_params 181 # Don't check session again as we've either already loaded the plan or we need to plan 182 if not self.plan: 183 request.session[SESSION_KEY_HISTORY] = [] 184 self._logger.debug("f(exec): No active Plan found, initiating planner") 185 try: 186 self.plan = self._initiate_plan() 187 except FlowNonApplicableException as exc: 188 # If we're this flow is for authentication and the user is already authenticated 189 # continue to the next URL 190 if ( 191 self.flow.designation == FlowDesignation.AUTHENTICATION 192 and self.request.user.is_authenticated 193 ): 194 return self._flow_done() 195 self._logger.warning("f(exec): Flow not applicable to current user", exc=exc) 196 return self.handle_invalid_flow(exc) 197 except EmptyFlowException as exc: 198 self._logger.warning("f(exec): Flow is empty", exc=exc) 199 # To match behaviour with loading an empty flow plan from cache, 200 # we don't show an error message here, but rather call _flow_done() 201 return self._flow_done() 202 # We don't save the Plan after getting the next stage 203 # as it hasn't been successfully passed yet 204 try: 205 # This is the first time we actually access any attribute on the selected plan 206 # if the cached plan is from an older version, it might have different attributes 207 # in which case we just delete the plan and invalidate everything 208 next_binding = self.plan.next(self.request) 209 except Exception as exc: # noqa 210 self._logger.warning( 211 "f(exec): found incompatible flow plan, invalidating run", exc=exc 212 ) 213 keys = cache.keys(f"{CACHE_PREFIX}*") 214 cache.delete_many(keys) 215 return self.stage_invalid() 216 if not next_binding: 217 self._logger.debug("f(exec): no more stages, flow is done.") 218 return self._flow_done() 219 self.current_binding = next_binding 220 self.current_stage = next_binding.stage 221 self._logger.debug( 222 "f(exec): Current stage", 223 current_stage=self.current_stage, 224 flow_slug=self.flow.slug, 225 ) 226 try: 227 stage_cls = self.current_stage.view 228 except NotImplementedError as exc: 229 self._logger.debug("Error getting stage type", exc=exc) 230 return self.stage_invalid() 231 self.current_stage_view = stage_cls(self) 232 self.current_stage_view.args = self.args 233 self.current_stage_view.kwargs = self.kwargs 234 self.current_stage_view.request = request 235 try: 236 return super().dispatch(request) 237 except InvalidStageError as exc: 238 return self.stage_invalid(str(exc)) 239 240 def handle_exception(self, exc: Exception) -> HttpResponse: 241 """Handle exception in stage execution""" 242 if settings.DEBUG or settings.TEST: 243 raise exc 244 self._logger.warning(exc) 245 if not should_ignore_exception(exc): 246 capture_exception(exc) 247 Event.new( 248 action=EventAction.SYSTEM_EXCEPTION, 249 message="System exception during flow execution.", 250 ).with_exception(exc).from_http(self.request) 251 challenge = FlowErrorChallenge(self.request, exc) 252 challenge.is_valid(raise_exception=True) 253 return to_stage_response(self.request, HttpChallengeResponse(challenge)) 254 255 @extend_schema( 256 responses={ 257 200: PolymorphicProxySerializer( 258 component_name="ChallengeTypes", 259 serializers=challenge_types, 260 resource_type_field_name="component", 261 ), 262 }, 263 request=OpenApiTypes.NONE, 264 parameters=[ 265 OpenApiParameter( 266 name="query", 267 location=OpenApiParameter.QUERY, 268 required=True, 269 description="Querystring as received", 270 type=OpenApiTypes.STR, 271 ) 272 ], 273 operation_id="flows_executor_get", 274 ) 275 def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 276 """Get the next pending challenge from the currently active flow.""" 277 class_path = class_to_path(self.current_stage_view.__class__) 278 self._logger.debug( 279 "f(exec): Passing GET", 280 view_class=class_path, 281 stage=self.current_stage, 282 ) 283 try: 284 with ( 285 start_span( 286 op="authentik.flow.executor.stage", 287 name=class_path, 288 ) as span, 289 HIST_FLOW_EXECUTION_STAGE_TIME.labels( 290 method=request.method.upper(), 291 stage_type=class_path, 292 ).time(), 293 ): 294 span.set_data("Method", request.method.upper()) 295 span.set_data("authentik Stage", self.current_stage_view) 296 span.set_data("authentik Flow", self.flow.slug) 297 stage_response = self.current_stage_view.dispatch(request) 298 return to_stage_response(request, stage_response) 299 except Exception as exc: # noqa 300 return self.handle_exception(exc) 301 302 @extend_schema( 303 responses={ 304 200: PolymorphicProxySerializer( 305 component_name="ChallengeTypes", 306 serializers=challenge_types, 307 resource_type_field_name="component", 308 ), 309 }, 310 request=PolymorphicProxySerializer( 311 component_name="FlowChallengeResponse", 312 serializers=challenge_response_types, 313 resource_type_field_name="component", 314 ), 315 parameters=[ 316 OpenApiParameter( 317 name="query", 318 location=OpenApiParameter.QUERY, 319 required=True, 320 description="Querystring as received", 321 type=OpenApiTypes.STR, 322 ) 323 ], 324 operation_id="flows_executor_solve", 325 ) 326 def post(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 327 """Solve the previously retrieved challenge and advanced to the next stage.""" 328 class_path = class_to_path(self.current_stage_view.__class__) 329 self._logger.debug( 330 "f(exec): Passing POST", 331 view_class=class_path, 332 stage=self.current_stage, 333 ) 334 try: 335 with ( 336 start_span( 337 op="authentik.flow.executor.stage", 338 name=class_path, 339 ) as span, 340 HIST_FLOW_EXECUTION_STAGE_TIME.labels( 341 method=request.method.upper(), 342 stage_type=class_path, 343 ).time(), 344 ): 345 span.set_data("Method", request.method.upper()) 346 span.set_data("authentik Stage", self.current_stage_view) 347 span.set_data("authentik Flow", self.flow.slug) 348 stage_response = self.current_stage_view.dispatch(request) 349 return to_stage_response(request, stage_response) 350 except Exception as exc: # noqa 351 return self.handle_exception(exc) 352 353 def _initiate_plan(self) -> FlowPlan: 354 planner = FlowPlanner(self.flow) 355 plan = planner.plan(self.request) 356 self.request.session[SESSION_KEY_PLAN] = plan 357 try: 358 # Call the has_stages getter to check that 359 # there are no issues with the class we might've gotten 360 # from the cache. If there are errors, just delete all cached flows 361 _ = plan.has_stages 362 except Exception: # noqa 363 keys = cache.keys(f"{CACHE_PREFIX}*") 364 cache.delete_many(keys) 365 return self._initiate_plan() 366 return plan 367 368 def restart_flow(self, keep_context=False) -> HttpResponse: 369 """Restart the currently active flow, optionally keeping the current context""" 370 planner = FlowPlanner(self.flow) 371 planner.use_cache = False 372 default_context = None 373 if keep_context: 374 default_context = self.plan.context 375 try: 376 plan = planner.plan(self.request, default_context) 377 except FlowNonApplicableException as exc: 378 self._logger.warning("f(exec): Flow restart not applicable to current user", exc=exc) 379 return self.handle_invalid_flow(exc) 380 self.request.session[SESSION_KEY_PLAN] = plan 381 kwargs = self.kwargs 382 kwargs.update({"flow_slug": self.flow.slug}) 383 return redirect_with_qs("authentik_api:flow-executor", self.request.GET, **kwargs) 384 385 def _flow_done(self) -> HttpResponse: 386 """User Successfully passed all stages""" 387 # Since this is wrapped by the ExecutorShell, the next argument is saved in the session 388 # extract the next param before cancel as that cleans it 389 if self.plan and PLAN_CONTEXT_REDIRECT in self.plan.context: 390 # The context `redirect` variable can only be set by 391 # an expression policy or authentik itself, so we don't 392 # check if its an absolute URL or a relative one 393 self.cancel() 394 return to_stage_response( 395 self.request, redirect(self.plan.context.get(PLAN_CONTEXT_REDIRECT)) 396 ) 397 next_param = self.request.session.get(SESSION_KEY_GET, {}).get( 398 NEXT_ARG_NAME, "authentik_core:root-redirect" 399 ) 400 self.cancel() 401 if next_param and not is_url_absolute(next_param): 402 return to_stage_response(self.request, redirect_with_qs(next_param)) 403 return to_stage_response( 404 self.request, self.stage_invalid(error_message=_("Invalid next URL")) 405 ) 406 407 def stage_ok(self) -> HttpResponse: 408 """Callback called by stages upon successful completion. 409 Persists updated plan and context to session.""" 410 self._logger.debug( 411 "f(exec): Stage ok", 412 stage_class=class_to_path(self.current_stage_view.__class__), 413 ) 414 if isinstance(self.current_stage_view, StageView): 415 self.current_stage_view.cleanup() 416 self.request.session.get(SESSION_KEY_HISTORY, []).append(deepcopy(self.plan)) 417 self.plan.pop() 418 self.request.session[SESSION_KEY_PLAN] = self.plan 419 if self.plan.bindings: 420 self._logger.debug( 421 "f(exec): Continuing with next stage", 422 remaining=len(self.plan.bindings), 423 ) 424 kwargs = self.kwargs 425 kwargs.update({"flow_slug": self.flow.slug}) 426 return redirect_with_qs("authentik_api:flow-executor", self.request.GET, **kwargs) 427 # User passed all stages 428 self._logger.debug( 429 "f(exec): User passed all stages", 430 context=cleanse_dict(self.plan.context), 431 ) 432 return self._flow_done() 433 434 def stage_invalid(self, error_message: str | None = None) -> HttpResponse: 435 """Callback used stage when data is correct but a policy denies access 436 or the user account is disabled. 437 438 Optionally, an exception can be passed, which will be shown if the current user 439 is a superuser.""" 440 self._logger.debug("f(exec): Stage invalid") 441 if self.current_binding and self.current_binding.invalid_response_action in [ 442 InvalidResponseAction.RESTART, 443 InvalidResponseAction.RESTART_WITH_CONTEXT, 444 ]: 445 keep_context = ( 446 self.current_binding.invalid_response_action 447 == InvalidResponseAction.RESTART_WITH_CONTEXT 448 ) 449 self._logger.debug( 450 "f(exec): Invalid response, restarting flow", 451 keep_context=keep_context, 452 ) 453 return self.restart_flow(keep_context) 454 self.cancel() 455 challenge_view = AccessDeniedStage(self, error_message) 456 challenge_view.request = self.request 457 return to_stage_response(self.request, challenge_view.get(self.request)) 458 459 def cancel(self): 460 """Cancel current flow execution""" 461 keys_to_delete = [ 462 SESSION_KEY_PLAN, 463 SESSION_KEY_GET, 464 # We might need the initial POST payloads for later requests 465 # SESSION_KEY_POST, 466 # We don't delete the history on purpose, as a user might 467 # still be inspecting it. 468 # It's only deleted on a fresh executions 469 # SESSION_KEY_HISTORY, 470 ] 471 self._logger.debug("f(exec): cleaning up") 472 for key in keys_to_delete: 473 if key in self.request.session: 474 del self.request.session[key]
Flow executor, passing requests to Stage Views
113 def setup(self, request: HttpRequest, flow_slug: str): 114 super().setup(request, flow_slug=flow_slug) 115 if not self.flow: 116 self.flow = get_object_or_404(Flow.objects.select_related(), slug=flow_slug) 117 self._logger = get_logger().bind(flow_slug=flow_slug) 118 set_tag("authentik.flow", self.flow.slug)
Initialize attributes shared by all view methods.
120 def handle_invalid_flow(self, exc: FlowNonApplicableException) -> HttpResponse: 121 """When a flow is non-applicable check if user is on the correct domain""" 122 if self.flow.denied_action in [ 123 FlowDeniedAction.CONTINUE, 124 FlowDeniedAction.MESSAGE_CONTINUE, 125 ]: 126 next_url = self.request.GET.get(NEXT_ARG_NAME) 127 if next_url and not is_url_absolute(next_url): 128 self._logger.debug("f(exec): Redirecting to next on fail") 129 return to_stage_response(self.request, redirect(next_url)) 130 if self.flow.denied_action == FlowDeniedAction.CONTINUE: 131 return to_stage_response( 132 self.request, redirect(reverse("authentik_core:root-redirect")) 133 ) 134 return to_stage_response(self.request, self.stage_invalid(error_message=exc.messages))
When a flow is non-applicable check if user is on the correct domain
157 def dispatch(self, request: HttpRequest, flow_slug: str) -> HttpResponse: 158 with start_span(op="authentik.flow.executor.dispatch", name=self.flow.slug) as span: 159 span.set_data("authentik Flow", self.flow.slug) 160 get_params = QueryDict(request.GET.get(QS_QUERY, "")) 161 if QS_KEY_TOKEN in get_params: 162 plan = self._check_flow_token(get_params[QS_KEY_TOKEN]) 163 if plan: 164 self.request.session[SESSION_KEY_PLAN] = plan 165 # Early check if there's an active Plan for the current session 166 if SESSION_KEY_PLAN in self.request.session: 167 self.plan: FlowPlan = self.request.session[SESSION_KEY_PLAN] 168 if self.plan.flow_pk != self.flow.pk.hex: 169 self._logger.warning( 170 "f(exec): Found existing plan for other flow, deleting plan", 171 other_flow=self.plan.flow_pk, 172 ) 173 # Existing plan is deleted from session and instance 174 self.plan = None 175 self.cancel() 176 else: 177 self._logger.debug("f(exec): Continuing existing plan") 178 179 # Initial flow request, check if we have an upstream query string passed in 180 request.session[SESSION_KEY_GET] = get_params 181 # Don't check session again as we've either already loaded the plan or we need to plan 182 if not self.plan: 183 request.session[SESSION_KEY_HISTORY] = [] 184 self._logger.debug("f(exec): No active Plan found, initiating planner") 185 try: 186 self.plan = self._initiate_plan() 187 except FlowNonApplicableException as exc: 188 # If we're this flow is for authentication and the user is already authenticated 189 # continue to the next URL 190 if ( 191 self.flow.designation == FlowDesignation.AUTHENTICATION 192 and self.request.user.is_authenticated 193 ): 194 return self._flow_done() 195 self._logger.warning("f(exec): Flow not applicable to current user", exc=exc) 196 return self.handle_invalid_flow(exc) 197 except EmptyFlowException as exc: 198 self._logger.warning("f(exec): Flow is empty", exc=exc) 199 # To match behaviour with loading an empty flow plan from cache, 200 # we don't show an error message here, but rather call _flow_done() 201 return self._flow_done() 202 # We don't save the Plan after getting the next stage 203 # as it hasn't been successfully passed yet 204 try: 205 # This is the first time we actually access any attribute on the selected plan 206 # if the cached plan is from an older version, it might have different attributes 207 # in which case we just delete the plan and invalidate everything 208 next_binding = self.plan.next(self.request) 209 except Exception as exc: # noqa 210 self._logger.warning( 211 "f(exec): found incompatible flow plan, invalidating run", exc=exc 212 ) 213 keys = cache.keys(f"{CACHE_PREFIX}*") 214 cache.delete_many(keys) 215 return self.stage_invalid() 216 if not next_binding: 217 self._logger.debug("f(exec): no more stages, flow is done.") 218 return self._flow_done() 219 self.current_binding = next_binding 220 self.current_stage = next_binding.stage 221 self._logger.debug( 222 "f(exec): Current stage", 223 current_stage=self.current_stage, 224 flow_slug=self.flow.slug, 225 ) 226 try: 227 stage_cls = self.current_stage.view 228 except NotImplementedError as exc: 229 self._logger.debug("Error getting stage type", exc=exc) 230 return self.stage_invalid() 231 self.current_stage_view = stage_cls(self) 232 self.current_stage_view.args = self.args 233 self.current_stage_view.kwargs = self.kwargs 234 self.current_stage_view.request = request 235 try: 236 return super().dispatch(request) 237 except InvalidStageError as exc: 238 return self.stage_invalid(str(exc))
.dispatch() is pretty much the same as Django's regular dispatch,
but with extra hooks for startup, finalize, and exception handling.
240 def handle_exception(self, exc: Exception) -> HttpResponse: 241 """Handle exception in stage execution""" 242 if settings.DEBUG or settings.TEST: 243 raise exc 244 self._logger.warning(exc) 245 if not should_ignore_exception(exc): 246 capture_exception(exc) 247 Event.new( 248 action=EventAction.SYSTEM_EXCEPTION, 249 message="System exception during flow execution.", 250 ).with_exception(exc).from_http(self.request) 251 challenge = FlowErrorChallenge(self.request, exc) 252 challenge.is_valid(raise_exception=True) 253 return to_stage_response(self.request, HttpChallengeResponse(challenge))
Handle exception in stage execution
255 @extend_schema( 256 responses={ 257 200: PolymorphicProxySerializer( 258 component_name="ChallengeTypes", 259 serializers=challenge_types, 260 resource_type_field_name="component", 261 ), 262 }, 263 request=OpenApiTypes.NONE, 264 parameters=[ 265 OpenApiParameter( 266 name="query", 267 location=OpenApiParameter.QUERY, 268 required=True, 269 description="Querystring as received", 270 type=OpenApiTypes.STR, 271 ) 272 ], 273 operation_id="flows_executor_get", 274 ) 275 def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 276 """Get the next pending challenge from the currently active flow.""" 277 class_path = class_to_path(self.current_stage_view.__class__) 278 self._logger.debug( 279 "f(exec): Passing GET", 280 view_class=class_path, 281 stage=self.current_stage, 282 ) 283 try: 284 with ( 285 start_span( 286 op="authentik.flow.executor.stage", 287 name=class_path, 288 ) as span, 289 HIST_FLOW_EXECUTION_STAGE_TIME.labels( 290 method=request.method.upper(), 291 stage_type=class_path, 292 ).time(), 293 ): 294 span.set_data("Method", request.method.upper()) 295 span.set_data("authentik Stage", self.current_stage_view) 296 span.set_data("authentik Flow", self.flow.slug) 297 stage_response = self.current_stage_view.dispatch(request) 298 return to_stage_response(request, stage_response) 299 except Exception as exc: # noqa 300 return self.handle_exception(exc)
Get the next pending challenge from the currently active flow.
302 @extend_schema( 303 responses={ 304 200: PolymorphicProxySerializer( 305 component_name="ChallengeTypes", 306 serializers=challenge_types, 307 resource_type_field_name="component", 308 ), 309 }, 310 request=PolymorphicProxySerializer( 311 component_name="FlowChallengeResponse", 312 serializers=challenge_response_types, 313 resource_type_field_name="component", 314 ), 315 parameters=[ 316 OpenApiParameter( 317 name="query", 318 location=OpenApiParameter.QUERY, 319 required=True, 320 description="Querystring as received", 321 type=OpenApiTypes.STR, 322 ) 323 ], 324 operation_id="flows_executor_solve", 325 ) 326 def post(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 327 """Solve the previously retrieved challenge and advanced to the next stage.""" 328 class_path = class_to_path(self.current_stage_view.__class__) 329 self._logger.debug( 330 "f(exec): Passing POST", 331 view_class=class_path, 332 stage=self.current_stage, 333 ) 334 try: 335 with ( 336 start_span( 337 op="authentik.flow.executor.stage", 338 name=class_path, 339 ) as span, 340 HIST_FLOW_EXECUTION_STAGE_TIME.labels( 341 method=request.method.upper(), 342 stage_type=class_path, 343 ).time(), 344 ): 345 span.set_data("Method", request.method.upper()) 346 span.set_data("authentik Stage", self.current_stage_view) 347 span.set_data("authentik Flow", self.flow.slug) 348 stage_response = self.current_stage_view.dispatch(request) 349 return to_stage_response(request, stage_response) 350 except Exception as exc: # noqa 351 return self.handle_exception(exc)
Solve the previously retrieved challenge and advanced to the next stage.
368 def restart_flow(self, keep_context=False) -> HttpResponse: 369 """Restart the currently active flow, optionally keeping the current context""" 370 planner = FlowPlanner(self.flow) 371 planner.use_cache = False 372 default_context = None 373 if keep_context: 374 default_context = self.plan.context 375 try: 376 plan = planner.plan(self.request, default_context) 377 except FlowNonApplicableException as exc: 378 self._logger.warning("f(exec): Flow restart not applicable to current user", exc=exc) 379 return self.handle_invalid_flow(exc) 380 self.request.session[SESSION_KEY_PLAN] = plan 381 kwargs = self.kwargs 382 kwargs.update({"flow_slug": self.flow.slug}) 383 return redirect_with_qs("authentik_api:flow-executor", self.request.GET, **kwargs)
Restart the currently active flow, optionally keeping the current context
407 def stage_ok(self) -> HttpResponse: 408 """Callback called by stages upon successful completion. 409 Persists updated plan and context to session.""" 410 self._logger.debug( 411 "f(exec): Stage ok", 412 stage_class=class_to_path(self.current_stage_view.__class__), 413 ) 414 if isinstance(self.current_stage_view, StageView): 415 self.current_stage_view.cleanup() 416 self.request.session.get(SESSION_KEY_HISTORY, []).append(deepcopy(self.plan)) 417 self.plan.pop() 418 self.request.session[SESSION_KEY_PLAN] = self.plan 419 if self.plan.bindings: 420 self._logger.debug( 421 "f(exec): Continuing with next stage", 422 remaining=len(self.plan.bindings), 423 ) 424 kwargs = self.kwargs 425 kwargs.update({"flow_slug": self.flow.slug}) 426 return redirect_with_qs("authentik_api:flow-executor", self.request.GET, **kwargs) 427 # User passed all stages 428 self._logger.debug( 429 "f(exec): User passed all stages", 430 context=cleanse_dict(self.plan.context), 431 ) 432 return self._flow_done()
Callback called by stages upon successful completion. Persists updated plan and context to session.
434 def stage_invalid(self, error_message: str | None = None) -> HttpResponse: 435 """Callback used stage when data is correct but a policy denies access 436 or the user account is disabled. 437 438 Optionally, an exception can be passed, which will be shown if the current user 439 is a superuser.""" 440 self._logger.debug("f(exec): Stage invalid") 441 if self.current_binding and self.current_binding.invalid_response_action in [ 442 InvalidResponseAction.RESTART, 443 InvalidResponseAction.RESTART_WITH_CONTEXT, 444 ]: 445 keep_context = ( 446 self.current_binding.invalid_response_action 447 == InvalidResponseAction.RESTART_WITH_CONTEXT 448 ) 449 self._logger.debug( 450 "f(exec): Invalid response, restarting flow", 451 keep_context=keep_context, 452 ) 453 return self.restart_flow(keep_context) 454 self.cancel() 455 challenge_view = AccessDeniedStage(self, error_message) 456 challenge_view.request = self.request 457 return to_stage_response(self.request, challenge_view.get(self.request))
Callback used stage when data is correct but a policy denies access or the user account is disabled.
Optionally, an exception can be passed, which will be shown if the current user is a superuser.
459 def cancel(self): 460 """Cancel current flow execution""" 461 keys_to_delete = [ 462 SESSION_KEY_PLAN, 463 SESSION_KEY_GET, 464 # We might need the initial POST payloads for later requests 465 # SESSION_KEY_POST, 466 # We don't delete the history on purpose, as a user might 467 # still be inspecting it. 468 # It's only deleted on a fresh executions 469 # SESSION_KEY_HISTORY, 470 ] 471 self._logger.debug("f(exec): cleaning up") 472 for key in keys_to_delete: 473 if key in self.request.session: 474 del self.request.session[key]
Cancel current flow execution
477class CancelView(View): 478 """View which cancels the currently active plan""" 479 480 def get(self, request: HttpRequest) -> HttpResponse: 481 """View which canels the currently active plan""" 482 if SESSION_KEY_PLAN in request.session: 483 del request.session[SESSION_KEY_PLAN] 484 LOGGER.debug("Canceled current plan") 485 next_url = self.request.GET.get(NEXT_ARG_NAME) 486 if next_url and not is_url_absolute(next_url): 487 return redirect(next_url) 488 return redirect("authentik_flows:default-invalidation")
View which cancels the currently active plan
480 def get(self, request: HttpRequest) -> HttpResponse: 481 """View which canels the currently active plan""" 482 if SESSION_KEY_PLAN in request.session: 483 del request.session[SESSION_KEY_PLAN] 484 LOGGER.debug("Canceled current plan") 485 next_url = self.request.GET.get(NEXT_ARG_NAME) 486 if next_url and not is_url_absolute(next_url): 487 return redirect(next_url) 488 return redirect("authentik_flows:default-invalidation")
View which canels the currently active plan
491class ToDefaultFlow(View): 492 """Redirect to default flow matching by designation""" 493 494 designation: FlowDesignation | None = None 495 496 @staticmethod 497 def flow_by_policy(request: HttpRequest, **flow_filter) -> Flow | None: 498 """Get a Flow by `**flow_filter` and check if the request from `request` can access it.""" 499 flows = Flow.objects.filter(**flow_filter).order_by("slug") 500 for flow in flows: 501 engine = PolicyEngine(flow, request.user, request) 502 engine.build() 503 result = engine.result 504 if result.passing: 505 LOGGER.debug("flow_by_policy: flow passing", flow=flow) 506 return flow 507 LOGGER.warning("flow_by_policy: flow not passing", flow=flow, messages=result.messages) 508 LOGGER.debug("flow_by_policy: no flow found", filters=flow_filter) 509 return None 510 511 @staticmethod 512 def get_flow(request: HttpRequest, designation: FlowDesignation) -> Flow: 513 """Get a flow for the selected designation""" 514 brand: Brand = request.brand 515 flow = None 516 # First, attempt to get default flow from brand 517 if designation == FlowDesignation.AUTHENTICATION: 518 flow = brand.flow_authentication 519 elif designation == FlowDesignation.INVALIDATION: 520 flow = brand.flow_invalidation 521 if flow: 522 return flow 523 # If no flow was set, get the first based on slug and policy 524 flow = ToDefaultFlow.flow_by_policy(request, designation=designation) 525 if flow: 526 return flow 527 # If we still don't have a flow, 404 528 raise Http404 529 530 def dispatch(self, request: HttpRequest) -> HttpResponse: 531 flow = ToDefaultFlow.get_flow(request, self.designation) 532 # If user already has a pending plan, clear it so we don't have to later. 533 if SESSION_KEY_PLAN in self.request.session: 534 plan: FlowPlan = self.request.session[SESSION_KEY_PLAN] 535 if plan.flow_pk != flow.pk.hex: 536 LOGGER.warning( 537 "f(def): Found existing plan for other flow, deleting plan", 538 flow_slug=flow.slug, 539 ) 540 del self.request.session[SESSION_KEY_PLAN] 541 return redirect_with_qs("authentik_core:if-flow", request.GET, flow_slug=flow.slug)
Redirect to default flow matching by designation
496 @staticmethod 497 def flow_by_policy(request: HttpRequest, **flow_filter) -> Flow | None: 498 """Get a Flow by `**flow_filter` and check if the request from `request` can access it.""" 499 flows = Flow.objects.filter(**flow_filter).order_by("slug") 500 for flow in flows: 501 engine = PolicyEngine(flow, request.user, request) 502 engine.build() 503 result = engine.result 504 if result.passing: 505 LOGGER.debug("flow_by_policy: flow passing", flow=flow) 506 return flow 507 LOGGER.warning("flow_by_policy: flow not passing", flow=flow, messages=result.messages) 508 LOGGER.debug("flow_by_policy: no flow found", filters=flow_filter) 509 return None
Get a Flow by **flow_filter and check if the request from request can access it.
511 @staticmethod 512 def get_flow(request: HttpRequest, designation: FlowDesignation) -> Flow: 513 """Get a flow for the selected designation""" 514 brand: Brand = request.brand 515 flow = None 516 # First, attempt to get default flow from brand 517 if designation == FlowDesignation.AUTHENTICATION: 518 flow = brand.flow_authentication 519 elif designation == FlowDesignation.INVALIDATION: 520 flow = brand.flow_invalidation 521 if flow: 522 return flow 523 # If no flow was set, get the first based on slug and policy 524 flow = ToDefaultFlow.flow_by_policy(request, designation=designation) 525 if flow: 526 return flow 527 # If we still don't have a flow, 404 528 raise Http404
Get a flow for the selected designation
530 def dispatch(self, request: HttpRequest) -> HttpResponse: 531 flow = ToDefaultFlow.get_flow(request, self.designation) 532 # If user already has a pending plan, clear it so we don't have to later. 533 if SESSION_KEY_PLAN in self.request.session: 534 plan: FlowPlan = self.request.session[SESSION_KEY_PLAN] 535 if plan.flow_pk != flow.pk.hex: 536 LOGGER.warning( 537 "f(def): Found existing plan for other flow, deleting plan", 538 flow_slug=flow.slug, 539 ) 540 del self.request.session[SESSION_KEY_PLAN] 541 return redirect_with_qs("authentik_core:if-flow", request.GET, flow_slug=flow.slug)
544def to_stage_response(request: HttpRequest, source: HttpResponse) -> HttpResponse: 545 """Convert normal HttpResponse into JSON Response""" 546 if ( 547 isinstance(source, HttpResponseRedirect) 548 or source.status_code == HttpResponseRedirect.status_code 549 ): 550 redirect_url = source["Location"] 551 # Redirects to the same URL usually indicate an Error within a form 552 if request.get_full_path() == redirect_url: 553 return source 554 LOGGER.debug( 555 "converting to redirect challenge", 556 to=str(redirect_url), 557 current=request.path, 558 ) 559 return HttpChallengeResponse( 560 RedirectChallenge( 561 { 562 "to": str(redirect_url), 563 } 564 ) 565 ) 566 if isinstance(source, TemplateResponse): 567 return HttpChallengeResponse( 568 ShellChallenge( 569 { 570 "body": source.render().content.decode("utf-8"), 571 } 572 ) 573 ) 574 # Check for actual HttpResponse (without isinstance as we don't want to check inheritance) 575 if source.__class__ == HttpResponse: 576 return HttpChallengeResponse( 577 ShellChallenge( 578 { 579 "body": source.content.decode("utf-8"), 580 } 581 ) 582 ) 583 return source
Convert normal HttpResponse into JSON Response
586class ConfigureFlowInitView(LoginRequiredMixin, View): 587 """Initiate planner for selected change flow and redirect to flow executor, 588 or raise Http404 if no configure_flow has been set.""" 589 590 def get(self, request: HttpRequest, stage_uuid: str) -> HttpResponse: 591 """Initiate planner for selected change flow and redirect to flow executor, 592 or raise Http404 if no configure_flow has been set.""" 593 try: 594 stage: Stage = Stage.objects.get_subclass(pk=stage_uuid) 595 except Stage.DoesNotExist as exc: 596 raise Http404 from exc 597 if not isinstance(stage, ConfigurableStage): 598 LOGGER.debug("Stage does not inherit ConfigurableStage", stage=stage) 599 raise Http404 600 if not stage.configure_flow: 601 LOGGER.debug("Stage has no configure_flow set", stage=stage) 602 raise Http404 603 604 try: 605 plan = FlowPlanner(stage.configure_flow).plan( 606 request, {PLAN_CONTEXT_PENDING_USER: request.user} 607 ) 608 except FlowNonApplicableException: 609 LOGGER.warning("Flow not applicable to user") 610 raise Http404 from None 611 return plan.to_redirect(request, stage.configure_flow)
Initiate planner for selected change flow and redirect to flow executor, or raise Http404 if no configure_flow has been set.
590 def get(self, request: HttpRequest, stage_uuid: str) -> HttpResponse: 591 """Initiate planner for selected change flow and redirect to flow executor, 592 or raise Http404 if no configure_flow has been set.""" 593 try: 594 stage: Stage = Stage.objects.get_subclass(pk=stage_uuid) 595 except Stage.DoesNotExist as exc: 596 raise Http404 from exc 597 if not isinstance(stage, ConfigurableStage): 598 LOGGER.debug("Stage does not inherit ConfigurableStage", stage=stage) 599 raise Http404 600 if not stage.configure_flow: 601 LOGGER.debug("Stage has no configure_flow set", stage=stage) 602 raise Http404 603 604 try: 605 plan = FlowPlanner(stage.configure_flow).plan( 606 request, {PLAN_CONTEXT_PENDING_USER: request.user} 607 ) 608 except FlowNonApplicableException: 609 LOGGER.warning("Flow not applicable to user") 610 raise Http404 from None 611 return plan.to_redirect(request, stage.configure_flow)
Initiate planner for selected change flow and redirect to flow executor, or raise Http404 if no configure_flow has been set.