authentik.flows.views.executor

authentik multi-stage authentication engine

  1"""authentik multi-stage authentication engine"""
  2
  3from copy import deepcopy
  4
  5from django.conf import settings
  6from django.contrib.auth.mixins import LoginRequiredMixin
  7from django.core.cache import cache
  8from django.http import Http404, HttpRequest, HttpResponse, HttpResponseRedirect
  9from django.http.request import QueryDict
 10from django.shortcuts import get_object_or_404, redirect
 11from django.template.response import TemplateResponse
 12from django.urls import reverse
 13from django.utils.decorators import method_decorator
 14from django.utils.translation import gettext as _
 15from django.views.decorators.clickjacking import xframe_options_sameorigin
 16from django.views.generic import View
 17from drf_spectacular.types import OpenApiTypes
 18from drf_spectacular.utils import OpenApiParameter, PolymorphicProxySerializer, extend_schema
 19from rest_framework.permissions import AllowAny
 20from rest_framework.views import APIView
 21from sentry_sdk import capture_exception, start_span
 22from sentry_sdk.api import set_tag
 23from structlog.stdlib import BoundLogger, get_logger
 24
 25from authentik.brands.models import Brand
 26from authentik.events.models import Event, EventAction, cleanse_dict
 27from authentik.flows.apps import HIST_FLOW_EXECUTION_STAGE_TIME
 28from authentik.flows.challenge import (
 29    Challenge,
 30    ChallengeResponse,
 31    FlowErrorChallenge,
 32    HttpChallengeResponse,
 33    RedirectChallenge,
 34    ShellChallenge,
 35    WithUserInfoChallenge,
 36)
 37from authentik.flows.exceptions import EmptyFlowException, FlowNonApplicableException
 38from authentik.flows.models import (
 39    ConfigurableStage,
 40    Flow,
 41    FlowDeniedAction,
 42    FlowDesignation,
 43    FlowStageBinding,
 44    FlowToken,
 45    InvalidResponseAction,
 46    Stage,
 47)
 48from authentik.flows.planner import (
 49    CACHE_PREFIX,
 50    PLAN_CONTEXT_IS_RESTORED,
 51    PLAN_CONTEXT_PENDING_USER,
 52    PLAN_CONTEXT_REDIRECT,
 53    FlowPlan,
 54    FlowPlanner,
 55)
 56from authentik.flows.stage import AccessDeniedStage, StageView
 57from authentik.lib.sentry import SentryIgnoredException, should_ignore_exception
 58from authentik.lib.utils.reflection import all_subclasses, class_to_path
 59from authentik.lib.utils.urls import is_url_absolute, redirect_with_qs
 60from authentik.policies.engine import PolicyEngine
 61
 62LOGGER = get_logger()
 63# Argument used to redirect user after login
 64NEXT_ARG_NAME = "next"
 65
 66SESSION_KEY_PLAN = "authentik/flows/plan"
 67SESSION_KEY_GET = "authentik/flows/get"
 68SESSION_KEY_POST = "authentik/flows/post"
 69SESSION_KEY_HISTORY = "authentik/flows/history"
 70QS_KEY_TOKEN = "flow_token"  # nosec
 71QS_QUERY = "query"
 72
 73
 74def challenge_types():
 75    """This function returns a mapping which contains all subclasses of challenges
 76    subclasses of Challenge, and Challenge itself."""
 77    mapping = {}
 78    for cls in all_subclasses(Challenge):
 79        if cls == WithUserInfoChallenge:
 80            continue
 81        mapping[cls().fields["component"].default] = cls
 82    return mapping
 83
 84
 85def challenge_response_types():
 86    """This function returns a mapping which contains all subclasses of challenges
 87    subclasses of Challenge, and Challenge itself."""
 88    mapping = {}
 89    for cls in all_subclasses(ChallengeResponse):
 90        mapping[cls(stage=None).fields["component"].default] = cls
 91    return mapping
 92
 93
 94class InvalidStageError(SentryIgnoredException):
 95    """Error raised when a challenge from a stage is not valid"""
 96
 97
 98@method_decorator(xframe_options_sameorigin, name="dispatch")
 99class FlowExecutorView(APIView):
100    """Flow executor, passing requests to Stage Views"""
101
102    permission_classes = [AllowAny]
103
104    flow: Flow = None
105
106    plan: FlowPlan | None = None
107    current_binding: FlowStageBinding | None = None
108    current_stage: Stage
109    current_stage_view: View
110
111    _logger: BoundLogger
112
113    def setup(self, request: HttpRequest, flow_slug: str):
114        super().setup(request, flow_slug=flow_slug)
115        if not self.flow:
116            self.flow = get_object_or_404(Flow.objects.select_related(), slug=flow_slug)
117        self._logger = get_logger().bind(flow_slug=flow_slug)
118        set_tag("authentik.flow", self.flow.slug)
119
120    def handle_invalid_flow(self, exc: FlowNonApplicableException) -> HttpResponse:
121        """When a flow is non-applicable check if user is on the correct domain"""
122        if self.flow.denied_action in [
123            FlowDeniedAction.CONTINUE,
124            FlowDeniedAction.MESSAGE_CONTINUE,
125        ]:
126            next_url = self.request.GET.get(NEXT_ARG_NAME)
127            if next_url and not is_url_absolute(next_url):
128                self._logger.debug("f(exec): Redirecting to next on fail")
129                return to_stage_response(self.request, redirect(next_url))
130        if self.flow.denied_action == FlowDeniedAction.CONTINUE:
131            return to_stage_response(
132                self.request, redirect(reverse("authentik_core:root-redirect"))
133            )
134        return to_stage_response(self.request, self.stage_invalid(error_message=exc.messages))
135
136    def _check_flow_token(self, key: str) -> FlowPlan | None:
137        """Check if the user is using a flow token to restore a plan"""
138        token: FlowToken | None = FlowToken.objects.filter(key=key).first()
139        if not token:
140            return None
141        plan = None
142        try:
143            plan = token.plan
144        except (AttributeError, EOFError, ImportError, IndexError) as exc:
145            LOGGER.warning("f(exec): Failed to restore token plan", exc=exc)
146        finally:
147            if token.revoke_on_execution:
148                token.delete()
149        if not isinstance(plan, FlowPlan):
150            return None
151        if existing_plan := self.request.session.get(SESSION_KEY_PLAN):
152            plan.context.update(existing_plan.context)
153        plan.context[PLAN_CONTEXT_IS_RESTORED] = token
154        self._logger.debug("f(exec): restored flow plan from token", plan=plan)
155        return plan
156
157    def dispatch(self, request: HttpRequest, flow_slug: str) -> HttpResponse:
158        with start_span(op="authentik.flow.executor.dispatch", name=self.flow.slug) as span:
159            span.set_data("authentik Flow", self.flow.slug)
160            get_params = QueryDict(request.GET.get(QS_QUERY, ""))
161            if QS_KEY_TOKEN in get_params:
162                plan = self._check_flow_token(get_params[QS_KEY_TOKEN])
163                if plan:
164                    self.request.session[SESSION_KEY_PLAN] = plan
165            # Early check if there's an active Plan for the current session
166            if SESSION_KEY_PLAN in self.request.session:
167                self.plan: FlowPlan = self.request.session[SESSION_KEY_PLAN]
168                if self.plan.flow_pk != self.flow.pk.hex:
169                    self._logger.warning(
170                        "f(exec): Found existing plan for other flow, deleting plan",
171                        other_flow=self.plan.flow_pk,
172                    )
173                    # Existing plan is deleted from session and instance
174                    self.plan = None
175                    self.cancel()
176                else:
177                    self._logger.debug("f(exec): Continuing existing plan")
178
179            # Initial flow request, check if we have an upstream query string passed in
180            request.session[SESSION_KEY_GET] = get_params
181            # Don't check session again as we've either already loaded the plan or we need to plan
182            if not self.plan:
183                request.session[SESSION_KEY_HISTORY] = []
184                self._logger.debug("f(exec): No active Plan found, initiating planner")
185                try:
186                    self.plan = self._initiate_plan()
187                except FlowNonApplicableException as exc:
188                    # If we're this flow is for authentication and the user is already authenticated
189                    # continue to the next URL
190                    if (
191                        self.flow.designation == FlowDesignation.AUTHENTICATION
192                        and self.request.user.is_authenticated
193                    ):
194                        return self._flow_done()
195                    self._logger.warning("f(exec): Flow not applicable to current user", exc=exc)
196                    return self.handle_invalid_flow(exc)
197                except EmptyFlowException as exc:
198                    self._logger.warning("f(exec): Flow is empty", exc=exc)
199                    # To match behaviour with loading an empty flow plan from cache,
200                    # we don't show an error message here, but rather call _flow_done()
201                    return self._flow_done()
202            # We don't save the Plan after getting the next stage
203            # as it hasn't been successfully passed yet
204            try:
205                # This is the first time we actually access any attribute on the selected plan
206                # if the cached plan is from an older version, it might have different attributes
207                # in which case we just delete the plan and invalidate everything
208                next_binding = self.plan.next(self.request)
209            except Exception as exc:  # noqa
210                self._logger.warning(
211                    "f(exec): found incompatible flow plan, invalidating run", exc=exc
212                )
213                keys = cache.keys(f"{CACHE_PREFIX}*")
214                cache.delete_many(keys)
215                return self.stage_invalid()
216            if not next_binding:
217                self._logger.debug("f(exec): no more stages, flow is done.")
218                return self._flow_done()
219            self.current_binding = next_binding
220            self.current_stage = next_binding.stage
221            self._logger.debug(
222                "f(exec): Current stage",
223                current_stage=self.current_stage,
224                flow_slug=self.flow.slug,
225            )
226            try:
227                stage_cls = self.current_stage.view
228            except NotImplementedError as exc:
229                self._logger.debug("Error getting stage type", exc=exc)
230                return self.stage_invalid()
231            self.current_stage_view = stage_cls(self)
232            self.current_stage_view.args = self.args
233            self.current_stage_view.kwargs = self.kwargs
234            self.current_stage_view.request = request
235            try:
236                return super().dispatch(request)
237            except InvalidStageError as exc:
238                return self.stage_invalid(str(exc))
239
240    def handle_exception(self, exc: Exception) -> HttpResponse:
241        """Handle exception in stage execution"""
242        if settings.DEBUG or settings.TEST:
243            raise exc
244        self._logger.warning(exc)
245        if not should_ignore_exception(exc):
246            capture_exception(exc)
247            Event.new(
248                action=EventAction.SYSTEM_EXCEPTION,
249                message="System exception during flow execution.",
250            ).with_exception(exc).from_http(self.request)
251        challenge = FlowErrorChallenge(self.request, exc)
252        challenge.is_valid(raise_exception=True)
253        return to_stage_response(self.request, HttpChallengeResponse(challenge))
254
255    @extend_schema(
256        responses={
257            200: PolymorphicProxySerializer(
258                component_name="ChallengeTypes",
259                serializers=challenge_types,
260                resource_type_field_name="component",
261            ),
262        },
263        request=OpenApiTypes.NONE,
264        parameters=[
265            OpenApiParameter(
266                name="query",
267                location=OpenApiParameter.QUERY,
268                required=True,
269                description="Querystring as received",
270                type=OpenApiTypes.STR,
271            )
272        ],
273        operation_id="flows_executor_get",
274    )
275    def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
276        """Get the next pending challenge from the currently active flow."""
277        class_path = class_to_path(self.current_stage_view.__class__)
278        self._logger.debug(
279            "f(exec): Passing GET",
280            view_class=class_path,
281            stage=self.current_stage,
282        )
283        try:
284            with (
285                start_span(
286                    op="authentik.flow.executor.stage",
287                    name=class_path,
288                ) as span,
289                HIST_FLOW_EXECUTION_STAGE_TIME.labels(
290                    method=request.method.upper(),
291                    stage_type=class_path,
292                ).time(),
293            ):
294                span.set_data("Method", request.method.upper())
295                span.set_data("authentik Stage", self.current_stage_view)
296                span.set_data("authentik Flow", self.flow.slug)
297                stage_response = self.current_stage_view.dispatch(request)
298                return to_stage_response(request, stage_response)
299        except Exception as exc:  # noqa
300            return self.handle_exception(exc)
301
302    @extend_schema(
303        responses={
304            200: PolymorphicProxySerializer(
305                component_name="ChallengeTypes",
306                serializers=challenge_types,
307                resource_type_field_name="component",
308            ),
309        },
310        request=PolymorphicProxySerializer(
311            component_name="FlowChallengeResponse",
312            serializers=challenge_response_types,
313            resource_type_field_name="component",
314        ),
315        parameters=[
316            OpenApiParameter(
317                name="query",
318                location=OpenApiParameter.QUERY,
319                required=True,
320                description="Querystring as received",
321                type=OpenApiTypes.STR,
322            )
323        ],
324        operation_id="flows_executor_solve",
325    )
326    def post(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
327        """Solve the previously retrieved challenge and advanced to the next stage."""
328        class_path = class_to_path(self.current_stage_view.__class__)
329        self._logger.debug(
330            "f(exec): Passing POST",
331            view_class=class_path,
332            stage=self.current_stage,
333        )
334        try:
335            with (
336                start_span(
337                    op="authentik.flow.executor.stage",
338                    name=class_path,
339                ) as span,
340                HIST_FLOW_EXECUTION_STAGE_TIME.labels(
341                    method=request.method.upper(),
342                    stage_type=class_path,
343                ).time(),
344            ):
345                span.set_data("Method", request.method.upper())
346                span.set_data("authentik Stage", self.current_stage_view)
347                span.set_data("authentik Flow", self.flow.slug)
348                stage_response = self.current_stage_view.dispatch(request)
349                return to_stage_response(request, stage_response)
350        except Exception as exc:  # noqa
351            return self.handle_exception(exc)
352
353    def _initiate_plan(self) -> FlowPlan:
354        planner = FlowPlanner(self.flow)
355        plan = planner.plan(self.request)
356        self.request.session[SESSION_KEY_PLAN] = plan
357        try:
358            # Call the has_stages getter to check that
359            # there are no issues with the class we might've gotten
360            # from the cache. If there are errors, just delete all cached flows
361            _ = plan.has_stages
362        except Exception:  # noqa
363            keys = cache.keys(f"{CACHE_PREFIX}*")
364            cache.delete_many(keys)
365            return self._initiate_plan()
366        return plan
367
368    def restart_flow(self, keep_context=False) -> HttpResponse:
369        """Restart the currently active flow, optionally keeping the current context"""
370        planner = FlowPlanner(self.flow)
371        planner.use_cache = False
372        default_context = None
373        if keep_context:
374            default_context = self.plan.context
375        try:
376            plan = planner.plan(self.request, default_context)
377        except FlowNonApplicableException as exc:
378            self._logger.warning("f(exec): Flow restart not applicable to current user", exc=exc)
379            return self.handle_invalid_flow(exc)
380        self.request.session[SESSION_KEY_PLAN] = plan
381        kwargs = self.kwargs
382        kwargs.update({"flow_slug": self.flow.slug})
383        return redirect_with_qs("authentik_api:flow-executor", self.request.GET, **kwargs)
384
385    def _flow_done(self) -> HttpResponse:
386        """User Successfully passed all stages"""
387        # Since this is wrapped by the ExecutorShell, the next argument is saved in the session
388        # extract the next param before cancel as that cleans it
389        if self.plan and PLAN_CONTEXT_REDIRECT in self.plan.context:
390            # The context `redirect` variable can only be set by
391            # an expression policy or authentik itself, so we don't
392            # check if its an absolute URL or a relative one
393            self.cancel()
394            return to_stage_response(
395                self.request, redirect(self.plan.context.get(PLAN_CONTEXT_REDIRECT))
396            )
397        next_param = self.request.session.get(SESSION_KEY_GET, {}).get(
398            NEXT_ARG_NAME, "authentik_core:root-redirect"
399        )
400        self.cancel()
401        if next_param and not is_url_absolute(next_param):
402            return to_stage_response(self.request, redirect_with_qs(next_param))
403        return to_stage_response(
404            self.request, self.stage_invalid(error_message=_("Invalid next URL"))
405        )
406
407    def stage_ok(self) -> HttpResponse:
408        """Callback called by stages upon successful completion.
409        Persists updated plan and context to session."""
410        self._logger.debug(
411            "f(exec): Stage ok",
412            stage_class=class_to_path(self.current_stage_view.__class__),
413        )
414        if isinstance(self.current_stage_view, StageView):
415            self.current_stage_view.cleanup()
416        self.request.session.get(SESSION_KEY_HISTORY, []).append(deepcopy(self.plan))
417        self.plan.pop()
418        self.request.session[SESSION_KEY_PLAN] = self.plan
419        if self.plan.bindings:
420            self._logger.debug(
421                "f(exec): Continuing with next stage",
422                remaining=len(self.plan.bindings),
423            )
424            kwargs = self.kwargs
425            kwargs.update({"flow_slug": self.flow.slug})
426            return redirect_with_qs("authentik_api:flow-executor", self.request.GET, **kwargs)
427        # User passed all stages
428        self._logger.debug(
429            "f(exec): User passed all stages",
430            context=cleanse_dict(self.plan.context),
431        )
432        return self._flow_done()
433
434    def stage_invalid(self, error_message: str | None = None) -> HttpResponse:
435        """Callback used stage when data is correct but a policy denies access
436        or the user account is disabled.
437
438        Optionally, an exception can be passed, which will be shown if the current user
439        is a superuser."""
440        self._logger.debug("f(exec): Stage invalid")
441        if self.current_binding and self.current_binding.invalid_response_action in [
442            InvalidResponseAction.RESTART,
443            InvalidResponseAction.RESTART_WITH_CONTEXT,
444        ]:
445            keep_context = (
446                self.current_binding.invalid_response_action
447                == InvalidResponseAction.RESTART_WITH_CONTEXT
448            )
449            self._logger.debug(
450                "f(exec): Invalid response, restarting flow",
451                keep_context=keep_context,
452            )
453            return self.restart_flow(keep_context)
454        self.cancel()
455        challenge_view = AccessDeniedStage(self, error_message)
456        challenge_view.request = self.request
457        return to_stage_response(self.request, challenge_view.get(self.request))
458
459    def cancel(self):
460        """Cancel current flow execution"""
461        keys_to_delete = [
462            SESSION_KEY_PLAN,
463            SESSION_KEY_GET,
464            # We might need the initial POST payloads for later requests
465            # SESSION_KEY_POST,
466            # We don't delete the history on purpose, as a user might
467            # still be inspecting it.
468            # It's only deleted on a fresh executions
469            # SESSION_KEY_HISTORY,
470        ]
471        self._logger.debug("f(exec): cleaning up")
472        for key in keys_to_delete:
473            if key in self.request.session:
474                del self.request.session[key]
475
476
477class CancelView(View):
478    """View which cancels the currently active plan"""
479
480    def get(self, request: HttpRequest) -> HttpResponse:
481        """View which canels the currently active plan"""
482        if SESSION_KEY_PLAN in request.session:
483            del request.session[SESSION_KEY_PLAN]
484            LOGGER.debug("Canceled current plan")
485        next_url = self.request.GET.get(NEXT_ARG_NAME)
486        if next_url and not is_url_absolute(next_url):
487            return redirect(next_url)
488        return redirect("authentik_flows:default-invalidation")
489
490
491class ToDefaultFlow(View):
492    """Redirect to default flow matching by designation"""
493
494    designation: FlowDesignation | None = None
495
496    @staticmethod
497    def flow_by_policy(request: HttpRequest, **flow_filter) -> Flow | None:
498        """Get a Flow by `**flow_filter` and check if the request from `request` can access it."""
499        flows = Flow.objects.filter(**flow_filter).order_by("slug")
500        for flow in flows:
501            engine = PolicyEngine(flow, request.user, request)
502            engine.build()
503            result = engine.result
504            if result.passing:
505                LOGGER.debug("flow_by_policy: flow passing", flow=flow)
506                return flow
507            LOGGER.warning("flow_by_policy: flow not passing", flow=flow, messages=result.messages)
508        LOGGER.debug("flow_by_policy: no flow found", filters=flow_filter)
509        return None
510
511    @staticmethod
512    def get_flow(request: HttpRequest, designation: FlowDesignation) -> Flow:
513        """Get a flow for the selected designation"""
514        brand: Brand = request.brand
515        flow = None
516        # First, attempt to get default flow from brand
517        if designation == FlowDesignation.AUTHENTICATION:
518            flow = brand.flow_authentication
519        elif designation == FlowDesignation.INVALIDATION:
520            flow = brand.flow_invalidation
521        if flow:
522            return flow
523        # If no flow was set, get the first based on slug and policy
524        flow = ToDefaultFlow.flow_by_policy(request, designation=designation)
525        if flow:
526            return flow
527        # If we still don't have a flow, 404
528        raise Http404
529
530    def dispatch(self, request: HttpRequest) -> HttpResponse:
531        flow = ToDefaultFlow.get_flow(request, self.designation)
532        # If user already has a pending plan, clear it so we don't have to later.
533        if SESSION_KEY_PLAN in self.request.session:
534            plan: FlowPlan = self.request.session[SESSION_KEY_PLAN]
535            if plan.flow_pk != flow.pk.hex:
536                LOGGER.warning(
537                    "f(def): Found existing plan for other flow, deleting plan",
538                    flow_slug=flow.slug,
539                )
540                del self.request.session[SESSION_KEY_PLAN]
541        return redirect_with_qs("authentik_core:if-flow", request.GET, flow_slug=flow.slug)
542
543
544def to_stage_response(request: HttpRequest, source: HttpResponse) -> HttpResponse:
545    """Convert normal HttpResponse into JSON Response"""
546    if (
547        isinstance(source, HttpResponseRedirect)
548        or source.status_code == HttpResponseRedirect.status_code
549    ):
550        redirect_url = source["Location"]
551        # Redirects to the same URL usually indicate an Error within a form
552        if request.get_full_path() == redirect_url:
553            return source
554        LOGGER.debug(
555            "converting to redirect challenge",
556            to=str(redirect_url),
557            current=request.path,
558        )
559        return HttpChallengeResponse(
560            RedirectChallenge(
561                {
562                    "to": str(redirect_url),
563                }
564            )
565        )
566    if isinstance(source, TemplateResponse):
567        return HttpChallengeResponse(
568            ShellChallenge(
569                {
570                    "body": source.render().content.decode("utf-8"),
571                }
572            )
573        )
574    # Check for actual HttpResponse (without isinstance as we don't want to check inheritance)
575    if source.__class__ == HttpResponse:
576        return HttpChallengeResponse(
577            ShellChallenge(
578                {
579                    "body": source.content.decode("utf-8"),
580                }
581            )
582        )
583    return source
584
585
586class ConfigureFlowInitView(LoginRequiredMixin, View):
587    """Initiate planner for selected change flow and redirect to flow executor,
588    or raise Http404 if no configure_flow has been set."""
589
590    def get(self, request: HttpRequest, stage_uuid: str) -> HttpResponse:
591        """Initiate planner for selected change flow and redirect to flow executor,
592        or raise Http404 if no configure_flow has been set."""
593        try:
594            stage: Stage = Stage.objects.get_subclass(pk=stage_uuid)
595        except Stage.DoesNotExist as exc:
596            raise Http404 from exc
597        if not isinstance(stage, ConfigurableStage):
598            LOGGER.debug("Stage does not inherit ConfigurableStage", stage=stage)
599            raise Http404
600        if not stage.configure_flow:
601            LOGGER.debug("Stage has no configure_flow set", stage=stage)
602            raise Http404
603
604        try:
605            plan = FlowPlanner(stage.configure_flow).plan(
606                request, {PLAN_CONTEXT_PENDING_USER: request.user}
607            )
608        except FlowNonApplicableException:
609            LOGGER.warning("Flow not applicable to user")
610            raise Http404 from None
611        return plan.to_redirect(request, stage.configure_flow)
LOGGER = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
NEXT_ARG_NAME = 'next'
SESSION_KEY_PLAN = 'authentik/flows/plan'
SESSION_KEY_GET = 'authentik/flows/get'
SESSION_KEY_POST = 'authentik/flows/post'
SESSION_KEY_HISTORY = 'authentik/flows/history'
QS_KEY_TOKEN = 'flow_token'
QS_QUERY = 'query'
def challenge_types():
75def challenge_types():
76    """This function returns a mapping which contains all subclasses of challenges
77    subclasses of Challenge, and Challenge itself."""
78    mapping = {}
79    for cls in all_subclasses(Challenge):
80        if cls == WithUserInfoChallenge:
81            continue
82        mapping[cls().fields["component"].default] = cls
83    return mapping

This function returns a mapping which contains all subclasses of challenges subclasses of Challenge, and Challenge itself.

def challenge_response_types():
86def challenge_response_types():
87    """This function returns a mapping which contains all subclasses of challenges
88    subclasses of Challenge, and Challenge itself."""
89    mapping = {}
90    for cls in all_subclasses(ChallengeResponse):
91        mapping[cls(stage=None).fields["component"].default] = cls
92    return mapping

This function returns a mapping which contains all subclasses of challenges subclasses of Challenge, and Challenge itself.

class InvalidStageError(authentik.lib.sentry.SentryIgnoredException):
95class InvalidStageError(SentryIgnoredException):
96    """Error raised when a challenge from a stage is not valid"""

Error raised when a challenge from a stage is not valid

@method_decorator(xframe_options_sameorigin, name='dispatch')
class FlowExecutorView(rest_framework.views.APIView):
 99@method_decorator(xframe_options_sameorigin, name="dispatch")
100class FlowExecutorView(APIView):
101    """Flow executor, passing requests to Stage Views"""
102
103    permission_classes = [AllowAny]
104
105    flow: Flow = None
106
107    plan: FlowPlan | None = None
108    current_binding: FlowStageBinding | None = None
109    current_stage: Stage
110    current_stage_view: View
111
112    _logger: BoundLogger
113
114    def setup(self, request: HttpRequest, flow_slug: str):
115        super().setup(request, flow_slug=flow_slug)
116        if not self.flow:
117            self.flow = get_object_or_404(Flow.objects.select_related(), slug=flow_slug)
118        self._logger = get_logger().bind(flow_slug=flow_slug)
119        set_tag("authentik.flow", self.flow.slug)
120
121    def handle_invalid_flow(self, exc: FlowNonApplicableException) -> HttpResponse:
122        """When a flow is non-applicable check if user is on the correct domain"""
123        if self.flow.denied_action in [
124            FlowDeniedAction.CONTINUE,
125            FlowDeniedAction.MESSAGE_CONTINUE,
126        ]:
127            next_url = self.request.GET.get(NEXT_ARG_NAME)
128            if next_url and not is_url_absolute(next_url):
129                self._logger.debug("f(exec): Redirecting to next on fail")
130                return to_stage_response(self.request, redirect(next_url))
131        if self.flow.denied_action == FlowDeniedAction.CONTINUE:
132            return to_stage_response(
133                self.request, redirect(reverse("authentik_core:root-redirect"))
134            )
135        return to_stage_response(self.request, self.stage_invalid(error_message=exc.messages))
136
137    def _check_flow_token(self, key: str) -> FlowPlan | None:
138        """Check if the user is using a flow token to restore a plan"""
139        token: FlowToken | None = FlowToken.objects.filter(key=key).first()
140        if not token:
141            return None
142        plan = None
143        try:
144            plan = token.plan
145        except (AttributeError, EOFError, ImportError, IndexError) as exc:
146            LOGGER.warning("f(exec): Failed to restore token plan", exc=exc)
147        finally:
148            if token.revoke_on_execution:
149                token.delete()
150        if not isinstance(plan, FlowPlan):
151            return None
152        if existing_plan := self.request.session.get(SESSION_KEY_PLAN):
153            plan.context.update(existing_plan.context)
154        plan.context[PLAN_CONTEXT_IS_RESTORED] = token
155        self._logger.debug("f(exec): restored flow plan from token", plan=plan)
156        return plan
157
158    def dispatch(self, request: HttpRequest, flow_slug: str) -> HttpResponse:
159        with start_span(op="authentik.flow.executor.dispatch", name=self.flow.slug) as span:
160            span.set_data("authentik Flow", self.flow.slug)
161            get_params = QueryDict(request.GET.get(QS_QUERY, ""))
162            if QS_KEY_TOKEN in get_params:
163                plan = self._check_flow_token(get_params[QS_KEY_TOKEN])
164                if plan:
165                    self.request.session[SESSION_KEY_PLAN] = plan
166            # Early check if there's an active Plan for the current session
167            if SESSION_KEY_PLAN in self.request.session:
168                self.plan: FlowPlan = self.request.session[SESSION_KEY_PLAN]
169                if self.plan.flow_pk != self.flow.pk.hex:
170                    self._logger.warning(
171                        "f(exec): Found existing plan for other flow, deleting plan",
172                        other_flow=self.plan.flow_pk,
173                    )
174                    # Existing plan is deleted from session and instance
175                    self.plan = None
176                    self.cancel()
177                else:
178                    self._logger.debug("f(exec): Continuing existing plan")
179
180            # Initial flow request, check if we have an upstream query string passed in
181            request.session[SESSION_KEY_GET] = get_params
182            # Don't check session again as we've either already loaded the plan or we need to plan
183            if not self.plan:
184                request.session[SESSION_KEY_HISTORY] = []
185                self._logger.debug("f(exec): No active Plan found, initiating planner")
186                try:
187                    self.plan = self._initiate_plan()
188                except FlowNonApplicableException as exc:
189                    # If we're this flow is for authentication and the user is already authenticated
190                    # continue to the next URL
191                    if (
192                        self.flow.designation == FlowDesignation.AUTHENTICATION
193                        and self.request.user.is_authenticated
194                    ):
195                        return self._flow_done()
196                    self._logger.warning("f(exec): Flow not applicable to current user", exc=exc)
197                    return self.handle_invalid_flow(exc)
198                except EmptyFlowException as exc:
199                    self._logger.warning("f(exec): Flow is empty", exc=exc)
200                    # To match behaviour with loading an empty flow plan from cache,
201                    # we don't show an error message here, but rather call _flow_done()
202                    return self._flow_done()
203            # We don't save the Plan after getting the next stage
204            # as it hasn't been successfully passed yet
205            try:
206                # This is the first time we actually access any attribute on the selected plan
207                # if the cached plan is from an older version, it might have different attributes
208                # in which case we just delete the plan and invalidate everything
209                next_binding = self.plan.next(self.request)
210            except Exception as exc:  # noqa
211                self._logger.warning(
212                    "f(exec): found incompatible flow plan, invalidating run", exc=exc
213                )
214                keys = cache.keys(f"{CACHE_PREFIX}*")
215                cache.delete_many(keys)
216                return self.stage_invalid()
217            if not next_binding:
218                self._logger.debug("f(exec): no more stages, flow is done.")
219                return self._flow_done()
220            self.current_binding = next_binding
221            self.current_stage = next_binding.stage
222            self._logger.debug(
223                "f(exec): Current stage",
224                current_stage=self.current_stage,
225                flow_slug=self.flow.slug,
226            )
227            try:
228                stage_cls = self.current_stage.view
229            except NotImplementedError as exc:
230                self._logger.debug("Error getting stage type", exc=exc)
231                return self.stage_invalid()
232            self.current_stage_view = stage_cls(self)
233            self.current_stage_view.args = self.args
234            self.current_stage_view.kwargs = self.kwargs
235            self.current_stage_view.request = request
236            try:
237                return super().dispatch(request)
238            except InvalidStageError as exc:
239                return self.stage_invalid(str(exc))
240
241    def handle_exception(self, exc: Exception) -> HttpResponse:
242        """Handle exception in stage execution"""
243        if settings.DEBUG or settings.TEST:
244            raise exc
245        self._logger.warning(exc)
246        if not should_ignore_exception(exc):
247            capture_exception(exc)
248            Event.new(
249                action=EventAction.SYSTEM_EXCEPTION,
250                message="System exception during flow execution.",
251            ).with_exception(exc).from_http(self.request)
252        challenge = FlowErrorChallenge(self.request, exc)
253        challenge.is_valid(raise_exception=True)
254        return to_stage_response(self.request, HttpChallengeResponse(challenge))
255
256    @extend_schema(
257        responses={
258            200: PolymorphicProxySerializer(
259                component_name="ChallengeTypes",
260                serializers=challenge_types,
261                resource_type_field_name="component",
262            ),
263        },
264        request=OpenApiTypes.NONE,
265        parameters=[
266            OpenApiParameter(
267                name="query",
268                location=OpenApiParameter.QUERY,
269                required=True,
270                description="Querystring as received",
271                type=OpenApiTypes.STR,
272            )
273        ],
274        operation_id="flows_executor_get",
275    )
276    def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
277        """Get the next pending challenge from the currently active flow."""
278        class_path = class_to_path(self.current_stage_view.__class__)
279        self._logger.debug(
280            "f(exec): Passing GET",
281            view_class=class_path,
282            stage=self.current_stage,
283        )
284        try:
285            with (
286                start_span(
287                    op="authentik.flow.executor.stage",
288                    name=class_path,
289                ) as span,
290                HIST_FLOW_EXECUTION_STAGE_TIME.labels(
291                    method=request.method.upper(),
292                    stage_type=class_path,
293                ).time(),
294            ):
295                span.set_data("Method", request.method.upper())
296                span.set_data("authentik Stage", self.current_stage_view)
297                span.set_data("authentik Flow", self.flow.slug)
298                stage_response = self.current_stage_view.dispatch(request)
299                return to_stage_response(request, stage_response)
300        except Exception as exc:  # noqa
301            return self.handle_exception(exc)
302
303    @extend_schema(
304        responses={
305            200: PolymorphicProxySerializer(
306                component_name="ChallengeTypes",
307                serializers=challenge_types,
308                resource_type_field_name="component",
309            ),
310        },
311        request=PolymorphicProxySerializer(
312            component_name="FlowChallengeResponse",
313            serializers=challenge_response_types,
314            resource_type_field_name="component",
315        ),
316        parameters=[
317            OpenApiParameter(
318                name="query",
319                location=OpenApiParameter.QUERY,
320                required=True,
321                description="Querystring as received",
322                type=OpenApiTypes.STR,
323            )
324        ],
325        operation_id="flows_executor_solve",
326    )
327    def post(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
328        """Solve the previously retrieved challenge and advanced to the next stage."""
329        class_path = class_to_path(self.current_stage_view.__class__)
330        self._logger.debug(
331            "f(exec): Passing POST",
332            view_class=class_path,
333            stage=self.current_stage,
334        )
335        try:
336            with (
337                start_span(
338                    op="authentik.flow.executor.stage",
339                    name=class_path,
340                ) as span,
341                HIST_FLOW_EXECUTION_STAGE_TIME.labels(
342                    method=request.method.upper(),
343                    stage_type=class_path,
344                ).time(),
345            ):
346                span.set_data("Method", request.method.upper())
347                span.set_data("authentik Stage", self.current_stage_view)
348                span.set_data("authentik Flow", self.flow.slug)
349                stage_response = self.current_stage_view.dispatch(request)
350                return to_stage_response(request, stage_response)
351        except Exception as exc:  # noqa
352            return self.handle_exception(exc)
353
354    def _initiate_plan(self) -> FlowPlan:
355        planner = FlowPlanner(self.flow)
356        plan = planner.plan(self.request)
357        self.request.session[SESSION_KEY_PLAN] = plan
358        try:
359            # Call the has_stages getter to check that
360            # there are no issues with the class we might've gotten
361            # from the cache. If there are errors, just delete all cached flows
362            _ = plan.has_stages
363        except Exception:  # noqa
364            keys = cache.keys(f"{CACHE_PREFIX}*")
365            cache.delete_many(keys)
366            return self._initiate_plan()
367        return plan
368
369    def restart_flow(self, keep_context=False) -> HttpResponse:
370        """Restart the currently active flow, optionally keeping the current context"""
371        planner = FlowPlanner(self.flow)
372        planner.use_cache = False
373        default_context = None
374        if keep_context:
375            default_context = self.plan.context
376        try:
377            plan = planner.plan(self.request, default_context)
378        except FlowNonApplicableException as exc:
379            self._logger.warning("f(exec): Flow restart not applicable to current user", exc=exc)
380            return self.handle_invalid_flow(exc)
381        self.request.session[SESSION_KEY_PLAN] = plan
382        kwargs = self.kwargs
383        kwargs.update({"flow_slug": self.flow.slug})
384        return redirect_with_qs("authentik_api:flow-executor", self.request.GET, **kwargs)
385
386    def _flow_done(self) -> HttpResponse:
387        """User Successfully passed all stages"""
388        # Since this is wrapped by the ExecutorShell, the next argument is saved in the session
389        # extract the next param before cancel as that cleans it
390        if self.plan and PLAN_CONTEXT_REDIRECT in self.plan.context:
391            # The context `redirect` variable can only be set by
392            # an expression policy or authentik itself, so we don't
393            # check if its an absolute URL or a relative one
394            self.cancel()
395            return to_stage_response(
396                self.request, redirect(self.plan.context.get(PLAN_CONTEXT_REDIRECT))
397            )
398        next_param = self.request.session.get(SESSION_KEY_GET, {}).get(
399            NEXT_ARG_NAME, "authentik_core:root-redirect"
400        )
401        self.cancel()
402        if next_param and not is_url_absolute(next_param):
403            return to_stage_response(self.request, redirect_with_qs(next_param))
404        return to_stage_response(
405            self.request, self.stage_invalid(error_message=_("Invalid next URL"))
406        )
407
408    def stage_ok(self) -> HttpResponse:
409        """Callback called by stages upon successful completion.
410        Persists updated plan and context to session."""
411        self._logger.debug(
412            "f(exec): Stage ok",
413            stage_class=class_to_path(self.current_stage_view.__class__),
414        )
415        if isinstance(self.current_stage_view, StageView):
416            self.current_stage_view.cleanup()
417        self.request.session.get(SESSION_KEY_HISTORY, []).append(deepcopy(self.plan))
418        self.plan.pop()
419        self.request.session[SESSION_KEY_PLAN] = self.plan
420        if self.plan.bindings:
421            self._logger.debug(
422                "f(exec): Continuing with next stage",
423                remaining=len(self.plan.bindings),
424            )
425            kwargs = self.kwargs
426            kwargs.update({"flow_slug": self.flow.slug})
427            return redirect_with_qs("authentik_api:flow-executor", self.request.GET, **kwargs)
428        # User passed all stages
429        self._logger.debug(
430            "f(exec): User passed all stages",
431            context=cleanse_dict(self.plan.context),
432        )
433        return self._flow_done()
434
435    def stage_invalid(self, error_message: str | None = None) -> HttpResponse:
436        """Callback used stage when data is correct but a policy denies access
437        or the user account is disabled.
438
439        Optionally, an exception can be passed, which will be shown if the current user
440        is a superuser."""
441        self._logger.debug("f(exec): Stage invalid")
442        if self.current_binding and self.current_binding.invalid_response_action in [
443            InvalidResponseAction.RESTART,
444            InvalidResponseAction.RESTART_WITH_CONTEXT,
445        ]:
446            keep_context = (
447                self.current_binding.invalid_response_action
448                == InvalidResponseAction.RESTART_WITH_CONTEXT
449            )
450            self._logger.debug(
451                "f(exec): Invalid response, restarting flow",
452                keep_context=keep_context,
453            )
454            return self.restart_flow(keep_context)
455        self.cancel()
456        challenge_view = AccessDeniedStage(self, error_message)
457        challenge_view.request = self.request
458        return to_stage_response(self.request, challenge_view.get(self.request))
459
460    def cancel(self):
461        """Cancel current flow execution"""
462        keys_to_delete = [
463            SESSION_KEY_PLAN,
464            SESSION_KEY_GET,
465            # We might need the initial POST payloads for later requests
466            # SESSION_KEY_POST,
467            # We don't delete the history on purpose, as a user might
468            # still be inspecting it.
469            # It's only deleted on a fresh executions
470            # SESSION_KEY_HISTORY,
471        ]
472        self._logger.debug("f(exec): cleaning up")
473        for key in keys_to_delete:
474            if key in self.request.session:
475                del self.request.session[key]

Flow executor, passing requests to Stage Views

permission_classes = [<class 'rest_framework.permissions.AllowAny'>]
plan: authentik.flows.planner.FlowPlan | None = None
current_binding: authentik.flows.models.FlowStageBinding | None = None
current_stage_view: django.views.generic.base.View
def setup(self, request: django.http.request.HttpRequest, flow_slug: str):
114    def setup(self, request: HttpRequest, flow_slug: str):
115        super().setup(request, flow_slug=flow_slug)
116        if not self.flow:
117            self.flow = get_object_or_404(Flow.objects.select_related(), slug=flow_slug)
118        self._logger = get_logger().bind(flow_slug=flow_slug)
119        set_tag("authentik.flow", self.flow.slug)

Initialize attributes shared by all view methods.

def handle_invalid_flow( self, exc: authentik.flows.exceptions.FlowNonApplicableException) -> django.http.response.HttpResponse:
121    def handle_invalid_flow(self, exc: FlowNonApplicableException) -> HttpResponse:
122        """When a flow is non-applicable check if user is on the correct domain"""
123        if self.flow.denied_action in [
124            FlowDeniedAction.CONTINUE,
125            FlowDeniedAction.MESSAGE_CONTINUE,
126        ]:
127            next_url = self.request.GET.get(NEXT_ARG_NAME)
128            if next_url and not is_url_absolute(next_url):
129                self._logger.debug("f(exec): Redirecting to next on fail")
130                return to_stage_response(self.request, redirect(next_url))
131        if self.flow.denied_action == FlowDeniedAction.CONTINUE:
132            return to_stage_response(
133                self.request, redirect(reverse("authentik_core:root-redirect"))
134            )
135        return to_stage_response(self.request, self.stage_invalid(error_message=exc.messages))

When a flow is non-applicable check if user is on the correct domain

def dispatch( self, request: django.http.request.HttpRequest, flow_slug: str) -> django.http.response.HttpResponse:
158    def dispatch(self, request: HttpRequest, flow_slug: str) -> HttpResponse:
159        with start_span(op="authentik.flow.executor.dispatch", name=self.flow.slug) as span:
160            span.set_data("authentik Flow", self.flow.slug)
161            get_params = QueryDict(request.GET.get(QS_QUERY, ""))
162            if QS_KEY_TOKEN in get_params:
163                plan = self._check_flow_token(get_params[QS_KEY_TOKEN])
164                if plan:
165                    self.request.session[SESSION_KEY_PLAN] = plan
166            # Early check if there's an active Plan for the current session
167            if SESSION_KEY_PLAN in self.request.session:
168                self.plan: FlowPlan = self.request.session[SESSION_KEY_PLAN]
169                if self.plan.flow_pk != self.flow.pk.hex:
170                    self._logger.warning(
171                        "f(exec): Found existing plan for other flow, deleting plan",
172                        other_flow=self.plan.flow_pk,
173                    )
174                    # Existing plan is deleted from session and instance
175                    self.plan = None
176                    self.cancel()
177                else:
178                    self._logger.debug("f(exec): Continuing existing plan")
179
180            # Initial flow request, check if we have an upstream query string passed in
181            request.session[SESSION_KEY_GET] = get_params
182            # Don't check session again as we've either already loaded the plan or we need to plan
183            if not self.plan:
184                request.session[SESSION_KEY_HISTORY] = []
185                self._logger.debug("f(exec): No active Plan found, initiating planner")
186                try:
187                    self.plan = self._initiate_plan()
188                except FlowNonApplicableException as exc:
189                    # If we're this flow is for authentication and the user is already authenticated
190                    # continue to the next URL
191                    if (
192                        self.flow.designation == FlowDesignation.AUTHENTICATION
193                        and self.request.user.is_authenticated
194                    ):
195                        return self._flow_done()
196                    self._logger.warning("f(exec): Flow not applicable to current user", exc=exc)
197                    return self.handle_invalid_flow(exc)
198                except EmptyFlowException as exc:
199                    self._logger.warning("f(exec): Flow is empty", exc=exc)
200                    # To match behaviour with loading an empty flow plan from cache,
201                    # we don't show an error message here, but rather call _flow_done()
202                    return self._flow_done()
203            # We don't save the Plan after getting the next stage
204            # as it hasn't been successfully passed yet
205            try:
206                # This is the first time we actually access any attribute on the selected plan
207                # if the cached plan is from an older version, it might have different attributes
208                # in which case we just delete the plan and invalidate everything
209                next_binding = self.plan.next(self.request)
210            except Exception as exc:  # noqa
211                self._logger.warning(
212                    "f(exec): found incompatible flow plan, invalidating run", exc=exc
213                )
214                keys = cache.keys(f"{CACHE_PREFIX}*")
215                cache.delete_many(keys)
216                return self.stage_invalid()
217            if not next_binding:
218                self._logger.debug("f(exec): no more stages, flow is done.")
219                return self._flow_done()
220            self.current_binding = next_binding
221            self.current_stage = next_binding.stage
222            self._logger.debug(
223                "f(exec): Current stage",
224                current_stage=self.current_stage,
225                flow_slug=self.flow.slug,
226            )
227            try:
228                stage_cls = self.current_stage.view
229            except NotImplementedError as exc:
230                self._logger.debug("Error getting stage type", exc=exc)
231                return self.stage_invalid()
232            self.current_stage_view = stage_cls(self)
233            self.current_stage_view.args = self.args
234            self.current_stage_view.kwargs = self.kwargs
235            self.current_stage_view.request = request
236            try:
237                return super().dispatch(request)
238            except InvalidStageError as exc:
239                return self.stage_invalid(str(exc))

.dispatch() is pretty much the same as Django's regular dispatch, but with extra hooks for startup, finalize, and exception handling.

def handle_exception(self, exc: Exception) -> django.http.response.HttpResponse:
241    def handle_exception(self, exc: Exception) -> HttpResponse:
242        """Handle exception in stage execution"""
243        if settings.DEBUG or settings.TEST:
244            raise exc
245        self._logger.warning(exc)
246        if not should_ignore_exception(exc):
247            capture_exception(exc)
248            Event.new(
249                action=EventAction.SYSTEM_EXCEPTION,
250                message="System exception during flow execution.",
251            ).with_exception(exc).from_http(self.request)
252        challenge = FlowErrorChallenge(self.request, exc)
253        challenge.is_valid(raise_exception=True)
254        return to_stage_response(self.request, HttpChallengeResponse(challenge))

Handle exception in stage execution

@extend_schema(responses={200: PolymorphicProxySerializer(component_name='ChallengeTypes', serializers=challenge_types, resource_type_field_name='component')}, request=OpenApiTypes.NONE, parameters=[OpenApiParameter(name='query', location=OpenApiParameter.QUERY, required=True, description='Querystring as received', type=OpenApiTypes.STR)], operation_id='flows_executor_get')
def get( self, request: django.http.request.HttpRequest, *args, **kwargs) -> django.http.response.HttpResponse:
256    @extend_schema(
257        responses={
258            200: PolymorphicProxySerializer(
259                component_name="ChallengeTypes",
260                serializers=challenge_types,
261                resource_type_field_name="component",
262            ),
263        },
264        request=OpenApiTypes.NONE,
265        parameters=[
266            OpenApiParameter(
267                name="query",
268                location=OpenApiParameter.QUERY,
269                required=True,
270                description="Querystring as received",
271                type=OpenApiTypes.STR,
272            )
273        ],
274        operation_id="flows_executor_get",
275    )
276    def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
277        """Get the next pending challenge from the currently active flow."""
278        class_path = class_to_path(self.current_stage_view.__class__)
279        self._logger.debug(
280            "f(exec): Passing GET",
281            view_class=class_path,
282            stage=self.current_stage,
283        )
284        try:
285            with (
286                start_span(
287                    op="authentik.flow.executor.stage",
288                    name=class_path,
289                ) as span,
290                HIST_FLOW_EXECUTION_STAGE_TIME.labels(
291                    method=request.method.upper(),
292                    stage_type=class_path,
293                ).time(),
294            ):
295                span.set_data("Method", request.method.upper())
296                span.set_data("authentik Stage", self.current_stage_view)
297                span.set_data("authentik Flow", self.flow.slug)
298                stage_response = self.current_stage_view.dispatch(request)
299                return to_stage_response(request, stage_response)
300        except Exception as exc:  # noqa
301            return self.handle_exception(exc)

Get the next pending challenge from the currently active flow.

@extend_schema(responses={200: PolymorphicProxySerializer(component_name='ChallengeTypes', serializers=challenge_types, resource_type_field_name='component')}, request=PolymorphicProxySerializer(component_name='FlowChallengeResponse', serializers=challenge_response_types, resource_type_field_name='component'), parameters=[OpenApiParameter(name='query', location=OpenApiParameter.QUERY, required=True, description='Querystring as received', type=OpenApiTypes.STR)], operation_id='flows_executor_solve')
def post( self, request: django.http.request.HttpRequest, *args, **kwargs) -> django.http.response.HttpResponse:
303    @extend_schema(
304        responses={
305            200: PolymorphicProxySerializer(
306                component_name="ChallengeTypes",
307                serializers=challenge_types,
308                resource_type_field_name="component",
309            ),
310        },
311        request=PolymorphicProxySerializer(
312            component_name="FlowChallengeResponse",
313            serializers=challenge_response_types,
314            resource_type_field_name="component",
315        ),
316        parameters=[
317            OpenApiParameter(
318                name="query",
319                location=OpenApiParameter.QUERY,
320                required=True,
321                description="Querystring as received",
322                type=OpenApiTypes.STR,
323            )
324        ],
325        operation_id="flows_executor_solve",
326    )
327    def post(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
328        """Solve the previously retrieved challenge and advanced to the next stage."""
329        class_path = class_to_path(self.current_stage_view.__class__)
330        self._logger.debug(
331            "f(exec): Passing POST",
332            view_class=class_path,
333            stage=self.current_stage,
334        )
335        try:
336            with (
337                start_span(
338                    op="authentik.flow.executor.stage",
339                    name=class_path,
340                ) as span,
341                HIST_FLOW_EXECUTION_STAGE_TIME.labels(
342                    method=request.method.upper(),
343                    stage_type=class_path,
344                ).time(),
345            ):
346                span.set_data("Method", request.method.upper())
347                span.set_data("authentik Stage", self.current_stage_view)
348                span.set_data("authentik Flow", self.flow.slug)
349                stage_response = self.current_stage_view.dispatch(request)
350                return to_stage_response(request, stage_response)
351        except Exception as exc:  # noqa
352            return self.handle_exception(exc)

Solve the previously retrieved challenge and advanced to the next stage.

def restart_flow(self, keep_context=False) -> django.http.response.HttpResponse:
369    def restart_flow(self, keep_context=False) -> HttpResponse:
370        """Restart the currently active flow, optionally keeping the current context"""
371        planner = FlowPlanner(self.flow)
372        planner.use_cache = False
373        default_context = None
374        if keep_context:
375            default_context = self.plan.context
376        try:
377            plan = planner.plan(self.request, default_context)
378        except FlowNonApplicableException as exc:
379            self._logger.warning("f(exec): Flow restart not applicable to current user", exc=exc)
380            return self.handle_invalid_flow(exc)
381        self.request.session[SESSION_KEY_PLAN] = plan
382        kwargs = self.kwargs
383        kwargs.update({"flow_slug": self.flow.slug})
384        return redirect_with_qs("authentik_api:flow-executor", self.request.GET, **kwargs)

Restart the currently active flow, optionally keeping the current context

def stage_ok(self) -> django.http.response.HttpResponse:
408    def stage_ok(self) -> HttpResponse:
409        """Callback called by stages upon successful completion.
410        Persists updated plan and context to session."""
411        self._logger.debug(
412            "f(exec): Stage ok",
413            stage_class=class_to_path(self.current_stage_view.__class__),
414        )
415        if isinstance(self.current_stage_view, StageView):
416            self.current_stage_view.cleanup()
417        self.request.session.get(SESSION_KEY_HISTORY, []).append(deepcopy(self.plan))
418        self.plan.pop()
419        self.request.session[SESSION_KEY_PLAN] = self.plan
420        if self.plan.bindings:
421            self._logger.debug(
422                "f(exec): Continuing with next stage",
423                remaining=len(self.plan.bindings),
424            )
425            kwargs = self.kwargs
426            kwargs.update({"flow_slug": self.flow.slug})
427            return redirect_with_qs("authentik_api:flow-executor", self.request.GET, **kwargs)
428        # User passed all stages
429        self._logger.debug(
430            "f(exec): User passed all stages",
431            context=cleanse_dict(self.plan.context),
432        )
433        return self._flow_done()

Callback called by stages upon successful completion. Persists updated plan and context to session.

def stage_invalid( self, error_message: str | None = None) -> django.http.response.HttpResponse:
435    def stage_invalid(self, error_message: str | None = None) -> HttpResponse:
436        """Callback used stage when data is correct but a policy denies access
437        or the user account is disabled.
438
439        Optionally, an exception can be passed, which will be shown if the current user
440        is a superuser."""
441        self._logger.debug("f(exec): Stage invalid")
442        if self.current_binding and self.current_binding.invalid_response_action in [
443            InvalidResponseAction.RESTART,
444            InvalidResponseAction.RESTART_WITH_CONTEXT,
445        ]:
446            keep_context = (
447                self.current_binding.invalid_response_action
448                == InvalidResponseAction.RESTART_WITH_CONTEXT
449            )
450            self._logger.debug(
451                "f(exec): Invalid response, restarting flow",
452                keep_context=keep_context,
453            )
454            return self.restart_flow(keep_context)
455        self.cancel()
456        challenge_view = AccessDeniedStage(self, error_message)
457        challenge_view.request = self.request
458        return to_stage_response(self.request, challenge_view.get(self.request))

Callback used stage when data is correct but a policy denies access or the user account is disabled.

Optionally, an exception can be passed, which will be shown if the current user is a superuser.

def cancel(self):
460    def cancel(self):
461        """Cancel current flow execution"""
462        keys_to_delete = [
463            SESSION_KEY_PLAN,
464            SESSION_KEY_GET,
465            # We might need the initial POST payloads for later requests
466            # SESSION_KEY_POST,
467            # We don't delete the history on purpose, as a user might
468            # still be inspecting it.
469            # It's only deleted on a fresh executions
470            # SESSION_KEY_HISTORY,
471        ]
472        self._logger.debug("f(exec): cleaning up")
473        for key in keys_to_delete:
474            if key in self.request.session:
475                del self.request.session[key]

Cancel current flow execution

class CancelView(django.views.generic.base.View):
478class CancelView(View):
479    """View which cancels the currently active plan"""
480
481    def get(self, request: HttpRequest) -> HttpResponse:
482        """View which canels the currently active plan"""
483        if SESSION_KEY_PLAN in request.session:
484            del request.session[SESSION_KEY_PLAN]
485            LOGGER.debug("Canceled current plan")
486        next_url = self.request.GET.get(NEXT_ARG_NAME)
487        if next_url and not is_url_absolute(next_url):
488            return redirect(next_url)
489        return redirect("authentik_flows:default-invalidation")

View which cancels the currently active plan

def get( self, request: django.http.request.HttpRequest) -> django.http.response.HttpResponse:
481    def get(self, request: HttpRequest) -> HttpResponse:
482        """View which canels the currently active plan"""
483        if SESSION_KEY_PLAN in request.session:
484            del request.session[SESSION_KEY_PLAN]
485            LOGGER.debug("Canceled current plan")
486        next_url = self.request.GET.get(NEXT_ARG_NAME)
487        if next_url and not is_url_absolute(next_url):
488            return redirect(next_url)
489        return redirect("authentik_flows:default-invalidation")

View which canels the currently active plan

class ToDefaultFlow(django.views.generic.base.View):
492class ToDefaultFlow(View):
493    """Redirect to default flow matching by designation"""
494
495    designation: FlowDesignation | None = None
496
497    @staticmethod
498    def flow_by_policy(request: HttpRequest, **flow_filter) -> Flow | None:
499        """Get a Flow by `**flow_filter` and check if the request from `request` can access it."""
500        flows = Flow.objects.filter(**flow_filter).order_by("slug")
501        for flow in flows:
502            engine = PolicyEngine(flow, request.user, request)
503            engine.build()
504            result = engine.result
505            if result.passing:
506                LOGGER.debug("flow_by_policy: flow passing", flow=flow)
507                return flow
508            LOGGER.warning("flow_by_policy: flow not passing", flow=flow, messages=result.messages)
509        LOGGER.debug("flow_by_policy: no flow found", filters=flow_filter)
510        return None
511
512    @staticmethod
513    def get_flow(request: HttpRequest, designation: FlowDesignation) -> Flow:
514        """Get a flow for the selected designation"""
515        brand: Brand = request.brand
516        flow = None
517        # First, attempt to get default flow from brand
518        if designation == FlowDesignation.AUTHENTICATION:
519            flow = brand.flow_authentication
520        elif designation == FlowDesignation.INVALIDATION:
521            flow = brand.flow_invalidation
522        if flow:
523            return flow
524        # If no flow was set, get the first based on slug and policy
525        flow = ToDefaultFlow.flow_by_policy(request, designation=designation)
526        if flow:
527            return flow
528        # If we still don't have a flow, 404
529        raise Http404
530
531    def dispatch(self, request: HttpRequest) -> HttpResponse:
532        flow = ToDefaultFlow.get_flow(request, self.designation)
533        # If user already has a pending plan, clear it so we don't have to later.
534        if SESSION_KEY_PLAN in self.request.session:
535            plan: FlowPlan = self.request.session[SESSION_KEY_PLAN]
536            if plan.flow_pk != flow.pk.hex:
537                LOGGER.warning(
538                    "f(def): Found existing plan for other flow, deleting plan",
539                    flow_slug=flow.slug,
540                )
541                del self.request.session[SESSION_KEY_PLAN]
542        return redirect_with_qs("authentik_core:if-flow", request.GET, flow_slug=flow.slug)

Redirect to default flow matching by designation

designation: authentik.flows.models.FlowDesignation | None = None
@staticmethod
def flow_by_policy( request: django.http.request.HttpRequest, **flow_filter) -> authentik.flows.models.Flow | None:
497    @staticmethod
498    def flow_by_policy(request: HttpRequest, **flow_filter) -> Flow | None:
499        """Get a Flow by `**flow_filter` and check if the request from `request` can access it."""
500        flows = Flow.objects.filter(**flow_filter).order_by("slug")
501        for flow in flows:
502            engine = PolicyEngine(flow, request.user, request)
503            engine.build()
504            result = engine.result
505            if result.passing:
506                LOGGER.debug("flow_by_policy: flow passing", flow=flow)
507                return flow
508            LOGGER.warning("flow_by_policy: flow not passing", flow=flow, messages=result.messages)
509        LOGGER.debug("flow_by_policy: no flow found", filters=flow_filter)
510        return None

Get a Flow by **flow_filter and check if the request from request can access it.

@staticmethod
def get_flow( request: django.http.request.HttpRequest, designation: authentik.flows.models.FlowDesignation) -> authentik.flows.models.Flow:
512    @staticmethod
513    def get_flow(request: HttpRequest, designation: FlowDesignation) -> Flow:
514        """Get a flow for the selected designation"""
515        brand: Brand = request.brand
516        flow = None
517        # First, attempt to get default flow from brand
518        if designation == FlowDesignation.AUTHENTICATION:
519            flow = brand.flow_authentication
520        elif designation == FlowDesignation.INVALIDATION:
521            flow = brand.flow_invalidation
522        if flow:
523            return flow
524        # If no flow was set, get the first based on slug and policy
525        flow = ToDefaultFlow.flow_by_policy(request, designation=designation)
526        if flow:
527            return flow
528        # If we still don't have a flow, 404
529        raise Http404

Get a flow for the selected designation

def dispatch( self, request: django.http.request.HttpRequest) -> django.http.response.HttpResponse:
531    def dispatch(self, request: HttpRequest) -> HttpResponse:
532        flow = ToDefaultFlow.get_flow(request, self.designation)
533        # If user already has a pending plan, clear it so we don't have to later.
534        if SESSION_KEY_PLAN in self.request.session:
535            plan: FlowPlan = self.request.session[SESSION_KEY_PLAN]
536            if plan.flow_pk != flow.pk.hex:
537                LOGGER.warning(
538                    "f(def): Found existing plan for other flow, deleting plan",
539                    flow_slug=flow.slug,
540                )
541                del self.request.session[SESSION_KEY_PLAN]
542        return redirect_with_qs("authentik_core:if-flow", request.GET, flow_slug=flow.slug)
def to_stage_response( request: django.http.request.HttpRequest, source: django.http.response.HttpResponse) -> django.http.response.HttpResponse:
545def to_stage_response(request: HttpRequest, source: HttpResponse) -> HttpResponse:
546    """Convert normal HttpResponse into JSON Response"""
547    if (
548        isinstance(source, HttpResponseRedirect)
549        or source.status_code == HttpResponseRedirect.status_code
550    ):
551        redirect_url = source["Location"]
552        # Redirects to the same URL usually indicate an Error within a form
553        if request.get_full_path() == redirect_url:
554            return source
555        LOGGER.debug(
556            "converting to redirect challenge",
557            to=str(redirect_url),
558            current=request.path,
559        )
560        return HttpChallengeResponse(
561            RedirectChallenge(
562                {
563                    "to": str(redirect_url),
564                }
565            )
566        )
567    if isinstance(source, TemplateResponse):
568        return HttpChallengeResponse(
569            ShellChallenge(
570                {
571                    "body": source.render().content.decode("utf-8"),
572                }
573            )
574        )
575    # Check for actual HttpResponse (without isinstance as we don't want to check inheritance)
576    if source.__class__ == HttpResponse:
577        return HttpChallengeResponse(
578            ShellChallenge(
579                {
580                    "body": source.content.decode("utf-8"),
581                }
582            )
583        )
584    return source

Convert normal HttpResponse into JSON Response

class ConfigureFlowInitView(django.contrib.auth.mixins.LoginRequiredMixin, django.views.generic.base.View):
587class ConfigureFlowInitView(LoginRequiredMixin, View):
588    """Initiate planner for selected change flow and redirect to flow executor,
589    or raise Http404 if no configure_flow has been set."""
590
591    def get(self, request: HttpRequest, stage_uuid: str) -> HttpResponse:
592        """Initiate planner for selected change flow and redirect to flow executor,
593        or raise Http404 if no configure_flow has been set."""
594        try:
595            stage: Stage = Stage.objects.get_subclass(pk=stage_uuid)
596        except Stage.DoesNotExist as exc:
597            raise Http404 from exc
598        if not isinstance(stage, ConfigurableStage):
599            LOGGER.debug("Stage does not inherit ConfigurableStage", stage=stage)
600            raise Http404
601        if not stage.configure_flow:
602            LOGGER.debug("Stage has no configure_flow set", stage=stage)
603            raise Http404
604
605        try:
606            plan = FlowPlanner(stage.configure_flow).plan(
607                request, {PLAN_CONTEXT_PENDING_USER: request.user}
608            )
609        except FlowNonApplicableException:
610            LOGGER.warning("Flow not applicable to user")
611            raise Http404 from None
612        return plan.to_redirect(request, stage.configure_flow)

Initiate planner for selected change flow and redirect to flow executor, or raise Http404 if no configure_flow has been set.

def get( self, request: django.http.request.HttpRequest, stage_uuid: str) -> django.http.response.HttpResponse:
591    def get(self, request: HttpRequest, stage_uuid: str) -> HttpResponse:
592        """Initiate planner for selected change flow and redirect to flow executor,
593        or raise Http404 if no configure_flow has been set."""
594        try:
595            stage: Stage = Stage.objects.get_subclass(pk=stage_uuid)
596        except Stage.DoesNotExist as exc:
597            raise Http404 from exc
598        if not isinstance(stage, ConfigurableStage):
599            LOGGER.debug("Stage does not inherit ConfigurableStage", stage=stage)
600            raise Http404
601        if not stage.configure_flow:
602            LOGGER.debug("Stage has no configure_flow set", stage=stage)
603            raise Http404
604
605        try:
606            plan = FlowPlanner(stage.configure_flow).plan(
607                request, {PLAN_CONTEXT_PENDING_USER: request.user}
608            )
609        except FlowNonApplicableException:
610            LOGGER.warning("Flow not applicable to user")
611            raise Http404 from None
612        return plan.to_redirect(request, stage.configure_flow)

Initiate planner for selected change flow and redirect to flow executor, or raise Http404 if no configure_flow has been set.