authentik.flows.views.executor

authentik multi-stage authentication engine

  1"""authentik multi-stage authentication engine"""
  2
  3from copy import deepcopy
  4
  5from django.conf import settings
  6from django.contrib.auth.mixins import LoginRequiredMixin
  7from django.core.cache import cache
  8from django.http import Http404, HttpRequest, HttpResponse, HttpResponseRedirect
  9from django.http.request import QueryDict
 10from django.shortcuts import get_object_or_404, redirect
 11from django.template.response import TemplateResponse
 12from django.urls import reverse
 13from django.utils.decorators import method_decorator
 14from django.utils.translation import gettext as _
 15from django.views.decorators.clickjacking import xframe_options_sameorigin
 16from django.views.generic import View
 17from drf_spectacular.types import OpenApiTypes
 18from drf_spectacular.utils import OpenApiParameter, PolymorphicProxySerializer, extend_schema
 19from rest_framework.permissions import AllowAny
 20from rest_framework.views import APIView
 21from sentry_sdk import capture_exception, start_span
 22from sentry_sdk.api import set_tag
 23from structlog.stdlib import BoundLogger, get_logger
 24
 25from authentik.brands.models import Brand
 26from authentik.events.models import Event, EventAction, cleanse_dict
 27from authentik.flows.apps import HIST_FLOW_EXECUTION_STAGE_TIME
 28from authentik.flows.challenge import (
 29    Challenge,
 30    ChallengeResponse,
 31    FlowErrorChallenge,
 32    HttpChallengeResponse,
 33    RedirectChallenge,
 34    ShellChallenge,
 35    WithUserInfoChallenge,
 36)
 37from authentik.flows.exceptions import EmptyFlowException, FlowNonApplicableException
 38from authentik.flows.models import (
 39    ConfigurableStage,
 40    Flow,
 41    FlowDeniedAction,
 42    FlowDesignation,
 43    FlowStageBinding,
 44    FlowToken,
 45    InvalidResponseAction,
 46    Stage,
 47)
 48from authentik.flows.planner import (
 49    CACHE_PREFIX,
 50    PLAN_CONTEXT_IS_RESTORED,
 51    PLAN_CONTEXT_PENDING_USER,
 52    PLAN_CONTEXT_REDIRECT,
 53    FlowPlan,
 54    FlowPlanner,
 55)
 56from authentik.flows.stage import AccessDeniedStage, StageView
 57from authentik.lib.sentry import SentryIgnoredException, should_ignore_exception
 58from authentik.lib.utils.reflection import all_subclasses, class_to_path
 59from authentik.lib.utils.urls import is_url_absolute, redirect_with_qs
 60from authentik.policies.engine import PolicyEngine
 61
 62LOGGER = get_logger()
 63# Argument used to redirect user after login
 64NEXT_ARG_NAME = "next"
 65SESSION_KEY_PLAN = "authentik/flows/plan"
 66SESSION_KEY_GET = "authentik/flows/get"
 67SESSION_KEY_POST = "authentik/flows/post"
 68SESSION_KEY_HISTORY = "authentik/flows/history"
 69QS_KEY_TOKEN = "flow_token"  # nosec
 70QS_QUERY = "query"
 71
 72
 73def challenge_types():
 74    """This function returns a mapping which contains all subclasses of challenges
 75    subclasses of Challenge, and Challenge itself."""
 76    mapping = {}
 77    for cls in all_subclasses(Challenge):
 78        if cls == WithUserInfoChallenge:
 79            continue
 80        mapping[cls().fields["component"].default] = cls
 81    return mapping
 82
 83
 84def challenge_response_types():
 85    """This function returns a mapping which contains all subclasses of challenges
 86    subclasses of Challenge, and Challenge itself."""
 87    mapping = {}
 88    for cls in all_subclasses(ChallengeResponse):
 89        mapping[cls(stage=None).fields["component"].default] = cls
 90    return mapping
 91
 92
 93class InvalidStageError(SentryIgnoredException):
 94    """Error raised when a challenge from a stage is not valid"""
 95
 96
 97@method_decorator(xframe_options_sameorigin, name="dispatch")
 98class FlowExecutorView(APIView):
 99    """Flow executor, passing requests to Stage Views"""
100
101    permission_classes = [AllowAny]
102
103    flow: Flow = None
104
105    plan: FlowPlan | None = None
106    current_binding: FlowStageBinding | None = None
107    current_stage: Stage
108    current_stage_view: View
109
110    _logger: BoundLogger
111
112    def setup(self, request: HttpRequest, flow_slug: str):
113        super().setup(request, flow_slug=flow_slug)
114        if not self.flow:
115            self.flow = get_object_or_404(Flow.objects.select_related(), slug=flow_slug)
116        self._logger = get_logger().bind(flow_slug=flow_slug)
117        set_tag("authentik.flow", self.flow.slug)
118
119    def handle_invalid_flow(self, exc: FlowNonApplicableException) -> HttpResponse:
120        """When a flow is non-applicable check if user is on the correct domain"""
121        if self.flow.denied_action in [
122            FlowDeniedAction.CONTINUE,
123            FlowDeniedAction.MESSAGE_CONTINUE,
124        ]:
125            next_url = self.request.GET.get(NEXT_ARG_NAME)
126            if next_url and not is_url_absolute(next_url):
127                self._logger.debug("f(exec): Redirecting to next on fail")
128                return to_stage_response(self.request, redirect(next_url))
129        if self.flow.denied_action == FlowDeniedAction.CONTINUE:
130            return to_stage_response(
131                self.request, redirect(reverse("authentik_core:root-redirect"))
132            )
133        return to_stage_response(self.request, self.stage_invalid(error_message=exc.messages))
134
135    def _check_flow_token(self, key: str) -> FlowPlan | None:
136        """Check if the user is using a flow token to restore a plan"""
137        token: FlowToken | None = FlowToken.objects.filter(key=key).first()
138        if not token:
139            return None
140        plan = None
141        try:
142            plan = token.plan
143        except (AttributeError, EOFError, ImportError, IndexError) as exc:
144            LOGGER.warning("f(exec): Failed to restore token plan", exc=exc)
145        finally:
146            if token.revoke_on_execution:
147                token.delete()
148        if not isinstance(plan, FlowPlan):
149            return None
150        if existing_plan := self.request.session.get(SESSION_KEY_PLAN):
151            plan.context.update(existing_plan.context)
152        plan.context[PLAN_CONTEXT_IS_RESTORED] = token
153        self._logger.debug("f(exec): restored flow plan from token", plan=plan)
154        return plan
155
156    def dispatch(self, request: HttpRequest, flow_slug: str) -> HttpResponse:
157        with start_span(op="authentik.flow.executor.dispatch", name=self.flow.slug) as span:
158            span.set_data("authentik Flow", self.flow.slug)
159            get_params = QueryDict(request.GET.get(QS_QUERY, ""))
160            if QS_KEY_TOKEN in get_params:
161                plan = self._check_flow_token(get_params[QS_KEY_TOKEN])
162                if plan:
163                    self.request.session[SESSION_KEY_PLAN] = plan
164            # Early check if there's an active Plan for the current session
165            if SESSION_KEY_PLAN in self.request.session:
166                self.plan: FlowPlan = self.request.session[SESSION_KEY_PLAN]
167                if self.plan.flow_pk != self.flow.pk.hex:
168                    self._logger.warning(
169                        "f(exec): Found existing plan for other flow, deleting plan",
170                        other_flow=self.plan.flow_pk,
171                    )
172                    # Existing plan is deleted from session and instance
173                    self.plan = None
174                    self.cancel()
175                else:
176                    self._logger.debug("f(exec): Continuing existing plan")
177
178            # Initial flow request, check if we have an upstream query string passed in
179            request.session[SESSION_KEY_GET] = get_params
180            # Don't check session again as we've either already loaded the plan or we need to plan
181            if not self.plan:
182                request.session[SESSION_KEY_HISTORY] = []
183                self._logger.debug("f(exec): No active Plan found, initiating planner")
184                try:
185                    self.plan = self._initiate_plan()
186                except FlowNonApplicableException as exc:
187                    # If we're this flow is for authentication and the user is already authenticated
188                    # continue to the next URL
189                    if (
190                        self.flow.designation == FlowDesignation.AUTHENTICATION
191                        and self.request.user.is_authenticated
192                    ):
193                        return self._flow_done()
194                    self._logger.warning("f(exec): Flow not applicable to current user", exc=exc)
195                    return self.handle_invalid_flow(exc)
196                except EmptyFlowException as exc:
197                    self._logger.warning("f(exec): Flow is empty", exc=exc)
198                    # To match behaviour with loading an empty flow plan from cache,
199                    # we don't show an error message here, but rather call _flow_done()
200                    return self._flow_done()
201            # We don't save the Plan after getting the next stage
202            # as it hasn't been successfully passed yet
203            try:
204                # This is the first time we actually access any attribute on the selected plan
205                # if the cached plan is from an older version, it might have different attributes
206                # in which case we just delete the plan and invalidate everything
207                next_binding = self.plan.next(self.request)
208            except Exception as exc:  # noqa
209                self._logger.warning(
210                    "f(exec): found incompatible flow plan, invalidating run", exc=exc
211                )
212                keys = cache.keys(f"{CACHE_PREFIX}*")
213                cache.delete_many(keys)
214                return self.stage_invalid()
215            if not next_binding:
216                self._logger.debug("f(exec): no more stages, flow is done.")
217                return self._flow_done()
218            self.current_binding = next_binding
219            self.current_stage = next_binding.stage
220            self._logger.debug(
221                "f(exec): Current stage",
222                current_stage=self.current_stage,
223                flow_slug=self.flow.slug,
224            )
225            try:
226                stage_cls = self.current_stage.view
227            except NotImplementedError as exc:
228                self._logger.debug("Error getting stage type", exc=exc)
229                return self.stage_invalid()
230            self.current_stage_view = stage_cls(self)
231            self.current_stage_view.args = self.args
232            self.current_stage_view.kwargs = self.kwargs
233            self.current_stage_view.request = request
234            try:
235                return super().dispatch(request)
236            except InvalidStageError as exc:
237                return self.stage_invalid(str(exc))
238
239    def handle_exception(self, exc: Exception) -> HttpResponse:
240        """Handle exception in stage execution"""
241        if settings.DEBUG or settings.TEST:
242            raise exc
243        self._logger.warning(exc)
244        if not should_ignore_exception(exc):
245            capture_exception(exc)
246            Event.new(
247                action=EventAction.SYSTEM_EXCEPTION,
248                message="System exception during flow execution.",
249            ).with_exception(exc).from_http(self.request)
250        challenge = FlowErrorChallenge(self.request, exc)
251        challenge.is_valid(raise_exception=True)
252        return to_stage_response(self.request, HttpChallengeResponse(challenge))
253
254    @extend_schema(
255        responses={
256            200: PolymorphicProxySerializer(
257                component_name="ChallengeTypes",
258                serializers=challenge_types,
259                resource_type_field_name="component",
260            ),
261        },
262        request=OpenApiTypes.NONE,
263        parameters=[
264            OpenApiParameter(
265                name="query",
266                location=OpenApiParameter.QUERY,
267                required=True,
268                description="Querystring as received",
269                type=OpenApiTypes.STR,
270            )
271        ],
272        operation_id="flows_executor_get",
273    )
274    def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
275        """Get the next pending challenge from the currently active flow."""
276        class_path = class_to_path(self.current_stage_view.__class__)
277        self._logger.debug(
278            "f(exec): Passing GET",
279            view_class=class_path,
280            stage=self.current_stage,
281        )
282        try:
283            with (
284                start_span(
285                    op="authentik.flow.executor.stage",
286                    name=class_path,
287                ) as span,
288                HIST_FLOW_EXECUTION_STAGE_TIME.labels(
289                    method=request.method.upper(),
290                    stage_type=class_path,
291                ).time(),
292            ):
293                span.set_data("Method", request.method.upper())
294                span.set_data("authentik Stage", self.current_stage_view)
295                span.set_data("authentik Flow", self.flow.slug)
296                stage_response = self.current_stage_view.dispatch(request)
297                return to_stage_response(request, stage_response)
298        except Exception as exc:  # noqa
299            return self.handle_exception(exc)
300
301    @extend_schema(
302        responses={
303            200: PolymorphicProxySerializer(
304                component_name="ChallengeTypes",
305                serializers=challenge_types,
306                resource_type_field_name="component",
307            ),
308        },
309        request=PolymorphicProxySerializer(
310            component_name="FlowChallengeResponse",
311            serializers=challenge_response_types,
312            resource_type_field_name="component",
313        ),
314        parameters=[
315            OpenApiParameter(
316                name="query",
317                location=OpenApiParameter.QUERY,
318                required=True,
319                description="Querystring as received",
320                type=OpenApiTypes.STR,
321            )
322        ],
323        operation_id="flows_executor_solve",
324    )
325    def post(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
326        """Solve the previously retrieved challenge and advanced to the next stage."""
327        class_path = class_to_path(self.current_stage_view.__class__)
328        self._logger.debug(
329            "f(exec): Passing POST",
330            view_class=class_path,
331            stage=self.current_stage,
332        )
333        try:
334            with (
335                start_span(
336                    op="authentik.flow.executor.stage",
337                    name=class_path,
338                ) as span,
339                HIST_FLOW_EXECUTION_STAGE_TIME.labels(
340                    method=request.method.upper(),
341                    stage_type=class_path,
342                ).time(),
343            ):
344                span.set_data("Method", request.method.upper())
345                span.set_data("authentik Stage", self.current_stage_view)
346                span.set_data("authentik Flow", self.flow.slug)
347                stage_response = self.current_stage_view.dispatch(request)
348                return to_stage_response(request, stage_response)
349        except Exception as exc:  # noqa
350            return self.handle_exception(exc)
351
352    def _initiate_plan(self) -> FlowPlan:
353        planner = FlowPlanner(self.flow)
354        plan = planner.plan(self.request)
355        self.request.session[SESSION_KEY_PLAN] = plan
356        try:
357            # Call the has_stages getter to check that
358            # there are no issues with the class we might've gotten
359            # from the cache. If there are errors, just delete all cached flows
360            _ = plan.has_stages
361        except Exception:  # noqa
362            keys = cache.keys(f"{CACHE_PREFIX}*")
363            cache.delete_many(keys)
364            return self._initiate_plan()
365        return plan
366
367    def restart_flow(self, keep_context=False) -> HttpResponse:
368        """Restart the currently active flow, optionally keeping the current context"""
369        planner = FlowPlanner(self.flow)
370        planner.use_cache = False
371        default_context = None
372        if keep_context:
373            default_context = self.plan.context
374        try:
375            plan = planner.plan(self.request, default_context)
376        except FlowNonApplicableException as exc:
377            self._logger.warning("f(exec): Flow restart not applicable to current user", exc=exc)
378            return self.handle_invalid_flow(exc)
379        self.request.session[SESSION_KEY_PLAN] = plan
380        kwargs = self.kwargs
381        kwargs.update({"flow_slug": self.flow.slug})
382        return redirect_with_qs("authentik_api:flow-executor", self.request.GET, **kwargs)
383
384    def _flow_done(self) -> HttpResponse:
385        """User Successfully passed all stages"""
386        # Since this is wrapped by the ExecutorShell, the next argument is saved in the session
387        # extract the next param before cancel as that cleans it
388        if self.plan and PLAN_CONTEXT_REDIRECT in self.plan.context:
389            # The context `redirect` variable can only be set by
390            # an expression policy or authentik itself, so we don't
391            # check if its an absolute URL or a relative one
392            self.cancel()
393            return to_stage_response(
394                self.request, redirect(self.plan.context.get(PLAN_CONTEXT_REDIRECT))
395            )
396        next_param = self.request.session.get(SESSION_KEY_GET, {}).get(
397            NEXT_ARG_NAME, "authentik_core:root-redirect"
398        )
399        self.cancel()
400        if next_param and not is_url_absolute(next_param):
401            return to_stage_response(self.request, redirect_with_qs(next_param))
402        return to_stage_response(
403            self.request, self.stage_invalid(error_message=_("Invalid next URL"))
404        )
405
406    def stage_ok(self) -> HttpResponse:
407        """Callback called by stages upon successful completion.
408        Persists updated plan and context to session."""
409        self._logger.debug(
410            "f(exec): Stage ok",
411            stage_class=class_to_path(self.current_stage_view.__class__),
412        )
413        if isinstance(self.current_stage_view, StageView):
414            self.current_stage_view.cleanup()
415        self.request.session.get(SESSION_KEY_HISTORY, []).append(deepcopy(self.plan))
416        self.plan.pop()
417        self.request.session[SESSION_KEY_PLAN] = self.plan
418        if self.plan.bindings:
419            self._logger.debug(
420                "f(exec): Continuing with next stage",
421                remaining=len(self.plan.bindings),
422            )
423            kwargs = self.kwargs
424            kwargs.update({"flow_slug": self.flow.slug})
425            return redirect_with_qs("authentik_api:flow-executor", self.request.GET, **kwargs)
426        # User passed all stages
427        self._logger.debug(
428            "f(exec): User passed all stages",
429            context=cleanse_dict(self.plan.context),
430        )
431        return self._flow_done()
432
433    def stage_invalid(self, error_message: str | None = None) -> HttpResponse:
434        """Callback used stage when data is correct but a policy denies access
435        or the user account is disabled.
436
437        Optionally, an exception can be passed, which will be shown if the current user
438        is a superuser."""
439        self._logger.debug("f(exec): Stage invalid")
440        if self.current_binding and self.current_binding.invalid_response_action in [
441            InvalidResponseAction.RESTART,
442            InvalidResponseAction.RESTART_WITH_CONTEXT,
443        ]:
444            keep_context = (
445                self.current_binding.invalid_response_action
446                == InvalidResponseAction.RESTART_WITH_CONTEXT
447            )
448            self._logger.debug(
449                "f(exec): Invalid response, restarting flow",
450                keep_context=keep_context,
451            )
452            return self.restart_flow(keep_context)
453        self.cancel()
454        challenge_view = AccessDeniedStage(self, error_message)
455        challenge_view.request = self.request
456        return to_stage_response(self.request, challenge_view.get(self.request))
457
458    def cancel(self):
459        """Cancel current flow execution"""
460        keys_to_delete = [
461            SESSION_KEY_PLAN,
462            SESSION_KEY_GET,
463            # We might need the initial POST payloads for later requests
464            # SESSION_KEY_POST,
465            # We don't delete the history on purpose, as a user might
466            # still be inspecting it.
467            # It's only deleted on a fresh executions
468            # SESSION_KEY_HISTORY,
469        ]
470        self._logger.debug("f(exec): cleaning up")
471        for key in keys_to_delete:
472            if key in self.request.session:
473                del self.request.session[key]
474
475
476class CancelView(View):
477    """View which cancels the currently active plan"""
478
479    def get(self, request: HttpRequest) -> HttpResponse:
480        """View which canels the currently active plan"""
481        if SESSION_KEY_PLAN in request.session:
482            del request.session[SESSION_KEY_PLAN]
483            LOGGER.debug("Canceled current plan")
484        next_url = self.request.GET.get(NEXT_ARG_NAME)
485        if next_url and not is_url_absolute(next_url):
486            return redirect(next_url)
487        return redirect("authentik_flows:default-invalidation")
488
489
490class ToDefaultFlow(View):
491    """Redirect to default flow matching by designation"""
492
493    designation: FlowDesignation | None = None
494
495    @staticmethod
496    def flow_by_policy(request: HttpRequest, **flow_filter) -> Flow | None:
497        """Get a Flow by `**flow_filter` and check if the request from `request` can access it."""
498        flows = Flow.objects.filter(**flow_filter).order_by("slug")
499        for flow in flows:
500            engine = PolicyEngine(flow, request.user, request)
501            engine.build()
502            result = engine.result
503            if result.passing:
504                LOGGER.debug("flow_by_policy: flow passing", flow=flow)
505                return flow
506            LOGGER.warning("flow_by_policy: flow not passing", flow=flow, messages=result.messages)
507        LOGGER.debug("flow_by_policy: no flow found", filters=flow_filter)
508        return None
509
510    @staticmethod
511    def get_flow(request: HttpRequest, designation: FlowDesignation) -> Flow:
512        """Get a flow for the selected designation"""
513        brand: Brand = request.brand
514        flow = None
515        # First, attempt to get default flow from brand
516        if designation == FlowDesignation.AUTHENTICATION:
517            flow = brand.flow_authentication
518        elif designation == FlowDesignation.INVALIDATION:
519            flow = brand.flow_invalidation
520        if flow:
521            return flow
522        # If no flow was set, get the first based on slug and policy
523        flow = ToDefaultFlow.flow_by_policy(request, designation=designation)
524        if flow:
525            return flow
526        # If we still don't have a flow, 404
527        raise Http404
528
529    def dispatch(self, request: HttpRequest) -> HttpResponse:
530        flow = ToDefaultFlow.get_flow(request, self.designation)
531        # If user already has a pending plan, clear it so we don't have to later.
532        if SESSION_KEY_PLAN in self.request.session:
533            plan: FlowPlan = self.request.session[SESSION_KEY_PLAN]
534            if plan.flow_pk != flow.pk.hex:
535                LOGGER.warning(
536                    "f(def): Found existing plan for other flow, deleting plan",
537                    flow_slug=flow.slug,
538                )
539                del self.request.session[SESSION_KEY_PLAN]
540        return redirect_with_qs("authentik_core:if-flow", request.GET, flow_slug=flow.slug)
541
542
543def to_stage_response(request: HttpRequest, source: HttpResponse) -> HttpResponse:
544    """Convert normal HttpResponse into JSON Response"""
545    if (
546        isinstance(source, HttpResponseRedirect)
547        or source.status_code == HttpResponseRedirect.status_code
548    ):
549        redirect_url = source["Location"]
550        # Redirects to the same URL usually indicate an Error within a form
551        if request.get_full_path() == redirect_url:
552            return source
553        LOGGER.debug(
554            "converting to redirect challenge",
555            to=str(redirect_url),
556            current=request.path,
557        )
558        return HttpChallengeResponse(
559            RedirectChallenge(
560                {
561                    "to": str(redirect_url),
562                }
563            )
564        )
565    if isinstance(source, TemplateResponse):
566        return HttpChallengeResponse(
567            ShellChallenge(
568                {
569                    "body": source.render().content.decode("utf-8"),
570                }
571            )
572        )
573    # Check for actual HttpResponse (without isinstance as we don't want to check inheritance)
574    if source.__class__ == HttpResponse:
575        return HttpChallengeResponse(
576            ShellChallenge(
577                {
578                    "body": source.content.decode("utf-8"),
579                }
580            )
581        )
582    return source
583
584
585class ConfigureFlowInitView(LoginRequiredMixin, View):
586    """Initiate planner for selected change flow and redirect to flow executor,
587    or raise Http404 if no configure_flow has been set."""
588
589    def get(self, request: HttpRequest, stage_uuid: str) -> HttpResponse:
590        """Initiate planner for selected change flow and redirect to flow executor,
591        or raise Http404 if no configure_flow has been set."""
592        try:
593            stage: Stage = Stage.objects.get_subclass(pk=stage_uuid)
594        except Stage.DoesNotExist as exc:
595            raise Http404 from exc
596        if not isinstance(stage, ConfigurableStage):
597            LOGGER.debug("Stage does not inherit ConfigurableStage", stage=stage)
598            raise Http404
599        if not stage.configure_flow:
600            LOGGER.debug("Stage has no configure_flow set", stage=stage)
601            raise Http404
602
603        try:
604            plan = FlowPlanner(stage.configure_flow).plan(
605                request, {PLAN_CONTEXT_PENDING_USER: request.user}
606            )
607        except FlowNonApplicableException:
608            LOGGER.warning("Flow not applicable to user")
609            raise Http404 from None
610        return plan.to_redirect(request, stage.configure_flow)
LOGGER = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
NEXT_ARG_NAME = 'next'
SESSION_KEY_PLAN = 'authentik/flows/plan'
SESSION_KEY_GET = 'authentik/flows/get'
SESSION_KEY_POST = 'authentik/flows/post'
SESSION_KEY_HISTORY = 'authentik/flows/history'
QS_KEY_TOKEN = 'flow_token'
QS_QUERY = 'query'
def challenge_types():
74def challenge_types():
75    """This function returns a mapping which contains all subclasses of challenges
76    subclasses of Challenge, and Challenge itself."""
77    mapping = {}
78    for cls in all_subclasses(Challenge):
79        if cls == WithUserInfoChallenge:
80            continue
81        mapping[cls().fields["component"].default] = cls
82    return mapping

This function returns a mapping which contains all subclasses of challenges subclasses of Challenge, and Challenge itself.

def challenge_response_types():
85def challenge_response_types():
86    """This function returns a mapping which contains all subclasses of challenges
87    subclasses of Challenge, and Challenge itself."""
88    mapping = {}
89    for cls in all_subclasses(ChallengeResponse):
90        mapping[cls(stage=None).fields["component"].default] = cls
91    return mapping

This function returns a mapping which contains all subclasses of challenges subclasses of Challenge, and Challenge itself.

class InvalidStageError(authentik.lib.sentry.SentryIgnoredException):
94class InvalidStageError(SentryIgnoredException):
95    """Error raised when a challenge from a stage is not valid"""

Error raised when a challenge from a stage is not valid

@method_decorator(xframe_options_sameorigin, name='dispatch')
class FlowExecutorView(rest_framework.views.APIView):
 98@method_decorator(xframe_options_sameorigin, name="dispatch")
 99class FlowExecutorView(APIView):
100    """Flow executor, passing requests to Stage Views"""
101
102    permission_classes = [AllowAny]
103
104    flow: Flow = None
105
106    plan: FlowPlan | None = None
107    current_binding: FlowStageBinding | None = None
108    current_stage: Stage
109    current_stage_view: View
110
111    _logger: BoundLogger
112
113    def setup(self, request: HttpRequest, flow_slug: str):
114        super().setup(request, flow_slug=flow_slug)
115        if not self.flow:
116            self.flow = get_object_or_404(Flow.objects.select_related(), slug=flow_slug)
117        self._logger = get_logger().bind(flow_slug=flow_slug)
118        set_tag("authentik.flow", self.flow.slug)
119
120    def handle_invalid_flow(self, exc: FlowNonApplicableException) -> HttpResponse:
121        """When a flow is non-applicable check if user is on the correct domain"""
122        if self.flow.denied_action in [
123            FlowDeniedAction.CONTINUE,
124            FlowDeniedAction.MESSAGE_CONTINUE,
125        ]:
126            next_url = self.request.GET.get(NEXT_ARG_NAME)
127            if next_url and not is_url_absolute(next_url):
128                self._logger.debug("f(exec): Redirecting to next on fail")
129                return to_stage_response(self.request, redirect(next_url))
130        if self.flow.denied_action == FlowDeniedAction.CONTINUE:
131            return to_stage_response(
132                self.request, redirect(reverse("authentik_core:root-redirect"))
133            )
134        return to_stage_response(self.request, self.stage_invalid(error_message=exc.messages))
135
136    def _check_flow_token(self, key: str) -> FlowPlan | None:
137        """Check if the user is using a flow token to restore a plan"""
138        token: FlowToken | None = FlowToken.objects.filter(key=key).first()
139        if not token:
140            return None
141        plan = None
142        try:
143            plan = token.plan
144        except (AttributeError, EOFError, ImportError, IndexError) as exc:
145            LOGGER.warning("f(exec): Failed to restore token plan", exc=exc)
146        finally:
147            if token.revoke_on_execution:
148                token.delete()
149        if not isinstance(plan, FlowPlan):
150            return None
151        if existing_plan := self.request.session.get(SESSION_KEY_PLAN):
152            plan.context.update(existing_plan.context)
153        plan.context[PLAN_CONTEXT_IS_RESTORED] = token
154        self._logger.debug("f(exec): restored flow plan from token", plan=plan)
155        return plan
156
157    def dispatch(self, request: HttpRequest, flow_slug: str) -> HttpResponse:
158        with start_span(op="authentik.flow.executor.dispatch", name=self.flow.slug) as span:
159            span.set_data("authentik Flow", self.flow.slug)
160            get_params = QueryDict(request.GET.get(QS_QUERY, ""))
161            if QS_KEY_TOKEN in get_params:
162                plan = self._check_flow_token(get_params[QS_KEY_TOKEN])
163                if plan:
164                    self.request.session[SESSION_KEY_PLAN] = plan
165            # Early check if there's an active Plan for the current session
166            if SESSION_KEY_PLAN in self.request.session:
167                self.plan: FlowPlan = self.request.session[SESSION_KEY_PLAN]
168                if self.plan.flow_pk != self.flow.pk.hex:
169                    self._logger.warning(
170                        "f(exec): Found existing plan for other flow, deleting plan",
171                        other_flow=self.plan.flow_pk,
172                    )
173                    # Existing plan is deleted from session and instance
174                    self.plan = None
175                    self.cancel()
176                else:
177                    self._logger.debug("f(exec): Continuing existing plan")
178
179            # Initial flow request, check if we have an upstream query string passed in
180            request.session[SESSION_KEY_GET] = get_params
181            # Don't check session again as we've either already loaded the plan or we need to plan
182            if not self.plan:
183                request.session[SESSION_KEY_HISTORY] = []
184                self._logger.debug("f(exec): No active Plan found, initiating planner")
185                try:
186                    self.plan = self._initiate_plan()
187                except FlowNonApplicableException as exc:
188                    # If we're this flow is for authentication and the user is already authenticated
189                    # continue to the next URL
190                    if (
191                        self.flow.designation == FlowDesignation.AUTHENTICATION
192                        and self.request.user.is_authenticated
193                    ):
194                        return self._flow_done()
195                    self._logger.warning("f(exec): Flow not applicable to current user", exc=exc)
196                    return self.handle_invalid_flow(exc)
197                except EmptyFlowException as exc:
198                    self._logger.warning("f(exec): Flow is empty", exc=exc)
199                    # To match behaviour with loading an empty flow plan from cache,
200                    # we don't show an error message here, but rather call _flow_done()
201                    return self._flow_done()
202            # We don't save the Plan after getting the next stage
203            # as it hasn't been successfully passed yet
204            try:
205                # This is the first time we actually access any attribute on the selected plan
206                # if the cached plan is from an older version, it might have different attributes
207                # in which case we just delete the plan and invalidate everything
208                next_binding = self.plan.next(self.request)
209            except Exception as exc:  # noqa
210                self._logger.warning(
211                    "f(exec): found incompatible flow plan, invalidating run", exc=exc
212                )
213                keys = cache.keys(f"{CACHE_PREFIX}*")
214                cache.delete_many(keys)
215                return self.stage_invalid()
216            if not next_binding:
217                self._logger.debug("f(exec): no more stages, flow is done.")
218                return self._flow_done()
219            self.current_binding = next_binding
220            self.current_stage = next_binding.stage
221            self._logger.debug(
222                "f(exec): Current stage",
223                current_stage=self.current_stage,
224                flow_slug=self.flow.slug,
225            )
226            try:
227                stage_cls = self.current_stage.view
228            except NotImplementedError as exc:
229                self._logger.debug("Error getting stage type", exc=exc)
230                return self.stage_invalid()
231            self.current_stage_view = stage_cls(self)
232            self.current_stage_view.args = self.args
233            self.current_stage_view.kwargs = self.kwargs
234            self.current_stage_view.request = request
235            try:
236                return super().dispatch(request)
237            except InvalidStageError as exc:
238                return self.stage_invalid(str(exc))
239
240    def handle_exception(self, exc: Exception) -> HttpResponse:
241        """Handle exception in stage execution"""
242        if settings.DEBUG or settings.TEST:
243            raise exc
244        self._logger.warning(exc)
245        if not should_ignore_exception(exc):
246            capture_exception(exc)
247            Event.new(
248                action=EventAction.SYSTEM_EXCEPTION,
249                message="System exception during flow execution.",
250            ).with_exception(exc).from_http(self.request)
251        challenge = FlowErrorChallenge(self.request, exc)
252        challenge.is_valid(raise_exception=True)
253        return to_stage_response(self.request, HttpChallengeResponse(challenge))
254
255    @extend_schema(
256        responses={
257            200: PolymorphicProxySerializer(
258                component_name="ChallengeTypes",
259                serializers=challenge_types,
260                resource_type_field_name="component",
261            ),
262        },
263        request=OpenApiTypes.NONE,
264        parameters=[
265            OpenApiParameter(
266                name="query",
267                location=OpenApiParameter.QUERY,
268                required=True,
269                description="Querystring as received",
270                type=OpenApiTypes.STR,
271            )
272        ],
273        operation_id="flows_executor_get",
274    )
275    def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
276        """Get the next pending challenge from the currently active flow."""
277        class_path = class_to_path(self.current_stage_view.__class__)
278        self._logger.debug(
279            "f(exec): Passing GET",
280            view_class=class_path,
281            stage=self.current_stage,
282        )
283        try:
284            with (
285                start_span(
286                    op="authentik.flow.executor.stage",
287                    name=class_path,
288                ) as span,
289                HIST_FLOW_EXECUTION_STAGE_TIME.labels(
290                    method=request.method.upper(),
291                    stage_type=class_path,
292                ).time(),
293            ):
294                span.set_data("Method", request.method.upper())
295                span.set_data("authentik Stage", self.current_stage_view)
296                span.set_data("authentik Flow", self.flow.slug)
297                stage_response = self.current_stage_view.dispatch(request)
298                return to_stage_response(request, stage_response)
299        except Exception as exc:  # noqa
300            return self.handle_exception(exc)
301
302    @extend_schema(
303        responses={
304            200: PolymorphicProxySerializer(
305                component_name="ChallengeTypes",
306                serializers=challenge_types,
307                resource_type_field_name="component",
308            ),
309        },
310        request=PolymorphicProxySerializer(
311            component_name="FlowChallengeResponse",
312            serializers=challenge_response_types,
313            resource_type_field_name="component",
314        ),
315        parameters=[
316            OpenApiParameter(
317                name="query",
318                location=OpenApiParameter.QUERY,
319                required=True,
320                description="Querystring as received",
321                type=OpenApiTypes.STR,
322            )
323        ],
324        operation_id="flows_executor_solve",
325    )
326    def post(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
327        """Solve the previously retrieved challenge and advanced to the next stage."""
328        class_path = class_to_path(self.current_stage_view.__class__)
329        self._logger.debug(
330            "f(exec): Passing POST",
331            view_class=class_path,
332            stage=self.current_stage,
333        )
334        try:
335            with (
336                start_span(
337                    op="authentik.flow.executor.stage",
338                    name=class_path,
339                ) as span,
340                HIST_FLOW_EXECUTION_STAGE_TIME.labels(
341                    method=request.method.upper(),
342                    stage_type=class_path,
343                ).time(),
344            ):
345                span.set_data("Method", request.method.upper())
346                span.set_data("authentik Stage", self.current_stage_view)
347                span.set_data("authentik Flow", self.flow.slug)
348                stage_response = self.current_stage_view.dispatch(request)
349                return to_stage_response(request, stage_response)
350        except Exception as exc:  # noqa
351            return self.handle_exception(exc)
352
353    def _initiate_plan(self) -> FlowPlan:
354        planner = FlowPlanner(self.flow)
355        plan = planner.plan(self.request)
356        self.request.session[SESSION_KEY_PLAN] = plan
357        try:
358            # Call the has_stages getter to check that
359            # there are no issues with the class we might've gotten
360            # from the cache. If there are errors, just delete all cached flows
361            _ = plan.has_stages
362        except Exception:  # noqa
363            keys = cache.keys(f"{CACHE_PREFIX}*")
364            cache.delete_many(keys)
365            return self._initiate_plan()
366        return plan
367
368    def restart_flow(self, keep_context=False) -> HttpResponse:
369        """Restart the currently active flow, optionally keeping the current context"""
370        planner = FlowPlanner(self.flow)
371        planner.use_cache = False
372        default_context = None
373        if keep_context:
374            default_context = self.plan.context
375        try:
376            plan = planner.plan(self.request, default_context)
377        except FlowNonApplicableException as exc:
378            self._logger.warning("f(exec): Flow restart not applicable to current user", exc=exc)
379            return self.handle_invalid_flow(exc)
380        self.request.session[SESSION_KEY_PLAN] = plan
381        kwargs = self.kwargs
382        kwargs.update({"flow_slug": self.flow.slug})
383        return redirect_with_qs("authentik_api:flow-executor", self.request.GET, **kwargs)
384
385    def _flow_done(self) -> HttpResponse:
386        """User Successfully passed all stages"""
387        # Since this is wrapped by the ExecutorShell, the next argument is saved in the session
388        # extract the next param before cancel as that cleans it
389        if self.plan and PLAN_CONTEXT_REDIRECT in self.plan.context:
390            # The context `redirect` variable can only be set by
391            # an expression policy or authentik itself, so we don't
392            # check if its an absolute URL or a relative one
393            self.cancel()
394            return to_stage_response(
395                self.request, redirect(self.plan.context.get(PLAN_CONTEXT_REDIRECT))
396            )
397        next_param = self.request.session.get(SESSION_KEY_GET, {}).get(
398            NEXT_ARG_NAME, "authentik_core:root-redirect"
399        )
400        self.cancel()
401        if next_param and not is_url_absolute(next_param):
402            return to_stage_response(self.request, redirect_with_qs(next_param))
403        return to_stage_response(
404            self.request, self.stage_invalid(error_message=_("Invalid next URL"))
405        )
406
407    def stage_ok(self) -> HttpResponse:
408        """Callback called by stages upon successful completion.
409        Persists updated plan and context to session."""
410        self._logger.debug(
411            "f(exec): Stage ok",
412            stage_class=class_to_path(self.current_stage_view.__class__),
413        )
414        if isinstance(self.current_stage_view, StageView):
415            self.current_stage_view.cleanup()
416        self.request.session.get(SESSION_KEY_HISTORY, []).append(deepcopy(self.plan))
417        self.plan.pop()
418        self.request.session[SESSION_KEY_PLAN] = self.plan
419        if self.plan.bindings:
420            self._logger.debug(
421                "f(exec): Continuing with next stage",
422                remaining=len(self.plan.bindings),
423            )
424            kwargs = self.kwargs
425            kwargs.update({"flow_slug": self.flow.slug})
426            return redirect_with_qs("authentik_api:flow-executor", self.request.GET, **kwargs)
427        # User passed all stages
428        self._logger.debug(
429            "f(exec): User passed all stages",
430            context=cleanse_dict(self.plan.context),
431        )
432        return self._flow_done()
433
434    def stage_invalid(self, error_message: str | None = None) -> HttpResponse:
435        """Callback used stage when data is correct but a policy denies access
436        or the user account is disabled.
437
438        Optionally, an exception can be passed, which will be shown if the current user
439        is a superuser."""
440        self._logger.debug("f(exec): Stage invalid")
441        if self.current_binding and self.current_binding.invalid_response_action in [
442            InvalidResponseAction.RESTART,
443            InvalidResponseAction.RESTART_WITH_CONTEXT,
444        ]:
445            keep_context = (
446                self.current_binding.invalid_response_action
447                == InvalidResponseAction.RESTART_WITH_CONTEXT
448            )
449            self._logger.debug(
450                "f(exec): Invalid response, restarting flow",
451                keep_context=keep_context,
452            )
453            return self.restart_flow(keep_context)
454        self.cancel()
455        challenge_view = AccessDeniedStage(self, error_message)
456        challenge_view.request = self.request
457        return to_stage_response(self.request, challenge_view.get(self.request))
458
459    def cancel(self):
460        """Cancel current flow execution"""
461        keys_to_delete = [
462            SESSION_KEY_PLAN,
463            SESSION_KEY_GET,
464            # We might need the initial POST payloads for later requests
465            # SESSION_KEY_POST,
466            # We don't delete the history on purpose, as a user might
467            # still be inspecting it.
468            # It's only deleted on a fresh executions
469            # SESSION_KEY_HISTORY,
470        ]
471        self._logger.debug("f(exec): cleaning up")
472        for key in keys_to_delete:
473            if key in self.request.session:
474                del self.request.session[key]

Flow executor, passing requests to Stage Views

permission_classes = [<class 'rest_framework.permissions.AllowAny'>]
plan: authentik.flows.planner.FlowPlan | None = None
current_binding: authentik.flows.models.FlowStageBinding | None = None
current_stage_view: django.views.generic.base.View
def setup(self, request: django.http.request.HttpRequest, flow_slug: str):
113    def setup(self, request: HttpRequest, flow_slug: str):
114        super().setup(request, flow_slug=flow_slug)
115        if not self.flow:
116            self.flow = get_object_or_404(Flow.objects.select_related(), slug=flow_slug)
117        self._logger = get_logger().bind(flow_slug=flow_slug)
118        set_tag("authentik.flow", self.flow.slug)

Initialize attributes shared by all view methods.

def handle_invalid_flow( self, exc: authentik.flows.exceptions.FlowNonApplicableException) -> django.http.response.HttpResponse:
120    def handle_invalid_flow(self, exc: FlowNonApplicableException) -> HttpResponse:
121        """When a flow is non-applicable check if user is on the correct domain"""
122        if self.flow.denied_action in [
123            FlowDeniedAction.CONTINUE,
124            FlowDeniedAction.MESSAGE_CONTINUE,
125        ]:
126            next_url = self.request.GET.get(NEXT_ARG_NAME)
127            if next_url and not is_url_absolute(next_url):
128                self._logger.debug("f(exec): Redirecting to next on fail")
129                return to_stage_response(self.request, redirect(next_url))
130        if self.flow.denied_action == FlowDeniedAction.CONTINUE:
131            return to_stage_response(
132                self.request, redirect(reverse("authentik_core:root-redirect"))
133            )
134        return to_stage_response(self.request, self.stage_invalid(error_message=exc.messages))

When a flow is non-applicable check if user is on the correct domain

def dispatch( self, request: django.http.request.HttpRequest, flow_slug: str) -> django.http.response.HttpResponse:
157    def dispatch(self, request: HttpRequest, flow_slug: str) -> HttpResponse:
158        with start_span(op="authentik.flow.executor.dispatch", name=self.flow.slug) as span:
159            span.set_data("authentik Flow", self.flow.slug)
160            get_params = QueryDict(request.GET.get(QS_QUERY, ""))
161            if QS_KEY_TOKEN in get_params:
162                plan = self._check_flow_token(get_params[QS_KEY_TOKEN])
163                if plan:
164                    self.request.session[SESSION_KEY_PLAN] = plan
165            # Early check if there's an active Plan for the current session
166            if SESSION_KEY_PLAN in self.request.session:
167                self.plan: FlowPlan = self.request.session[SESSION_KEY_PLAN]
168                if self.plan.flow_pk != self.flow.pk.hex:
169                    self._logger.warning(
170                        "f(exec): Found existing plan for other flow, deleting plan",
171                        other_flow=self.plan.flow_pk,
172                    )
173                    # Existing plan is deleted from session and instance
174                    self.plan = None
175                    self.cancel()
176                else:
177                    self._logger.debug("f(exec): Continuing existing plan")
178
179            # Initial flow request, check if we have an upstream query string passed in
180            request.session[SESSION_KEY_GET] = get_params
181            # Don't check session again as we've either already loaded the plan or we need to plan
182            if not self.plan:
183                request.session[SESSION_KEY_HISTORY] = []
184                self._logger.debug("f(exec): No active Plan found, initiating planner")
185                try:
186                    self.plan = self._initiate_plan()
187                except FlowNonApplicableException as exc:
188                    # If we're this flow is for authentication and the user is already authenticated
189                    # continue to the next URL
190                    if (
191                        self.flow.designation == FlowDesignation.AUTHENTICATION
192                        and self.request.user.is_authenticated
193                    ):
194                        return self._flow_done()
195                    self._logger.warning("f(exec): Flow not applicable to current user", exc=exc)
196                    return self.handle_invalid_flow(exc)
197                except EmptyFlowException as exc:
198                    self._logger.warning("f(exec): Flow is empty", exc=exc)
199                    # To match behaviour with loading an empty flow plan from cache,
200                    # we don't show an error message here, but rather call _flow_done()
201                    return self._flow_done()
202            # We don't save the Plan after getting the next stage
203            # as it hasn't been successfully passed yet
204            try:
205                # This is the first time we actually access any attribute on the selected plan
206                # if the cached plan is from an older version, it might have different attributes
207                # in which case we just delete the plan and invalidate everything
208                next_binding = self.plan.next(self.request)
209            except Exception as exc:  # noqa
210                self._logger.warning(
211                    "f(exec): found incompatible flow plan, invalidating run", exc=exc
212                )
213                keys = cache.keys(f"{CACHE_PREFIX}*")
214                cache.delete_many(keys)
215                return self.stage_invalid()
216            if not next_binding:
217                self._logger.debug("f(exec): no more stages, flow is done.")
218                return self._flow_done()
219            self.current_binding = next_binding
220            self.current_stage = next_binding.stage
221            self._logger.debug(
222                "f(exec): Current stage",
223                current_stage=self.current_stage,
224                flow_slug=self.flow.slug,
225            )
226            try:
227                stage_cls = self.current_stage.view
228            except NotImplementedError as exc:
229                self._logger.debug("Error getting stage type", exc=exc)
230                return self.stage_invalid()
231            self.current_stage_view = stage_cls(self)
232            self.current_stage_view.args = self.args
233            self.current_stage_view.kwargs = self.kwargs
234            self.current_stage_view.request = request
235            try:
236                return super().dispatch(request)
237            except InvalidStageError as exc:
238                return self.stage_invalid(str(exc))

.dispatch() is pretty much the same as Django's regular dispatch, but with extra hooks for startup, finalize, and exception handling.

def handle_exception(self, exc: Exception) -> django.http.response.HttpResponse:
240    def handle_exception(self, exc: Exception) -> HttpResponse:
241        """Handle exception in stage execution"""
242        if settings.DEBUG or settings.TEST:
243            raise exc
244        self._logger.warning(exc)
245        if not should_ignore_exception(exc):
246            capture_exception(exc)
247            Event.new(
248                action=EventAction.SYSTEM_EXCEPTION,
249                message="System exception during flow execution.",
250            ).with_exception(exc).from_http(self.request)
251        challenge = FlowErrorChallenge(self.request, exc)
252        challenge.is_valid(raise_exception=True)
253        return to_stage_response(self.request, HttpChallengeResponse(challenge))

Handle exception in stage execution

@extend_schema(responses={200: PolymorphicProxySerializer(component_name='ChallengeTypes', serializers=challenge_types, resource_type_field_name='component')}, request=OpenApiTypes.NONE, parameters=[OpenApiParameter(name='query', location=OpenApiParameter.QUERY, required=True, description='Querystring as received', type=OpenApiTypes.STR)], operation_id='flows_executor_get')
def get( self, request: django.http.request.HttpRequest, *args, **kwargs) -> django.http.response.HttpResponse:
255    @extend_schema(
256        responses={
257            200: PolymorphicProxySerializer(
258                component_name="ChallengeTypes",
259                serializers=challenge_types,
260                resource_type_field_name="component",
261            ),
262        },
263        request=OpenApiTypes.NONE,
264        parameters=[
265            OpenApiParameter(
266                name="query",
267                location=OpenApiParameter.QUERY,
268                required=True,
269                description="Querystring as received",
270                type=OpenApiTypes.STR,
271            )
272        ],
273        operation_id="flows_executor_get",
274    )
275    def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
276        """Get the next pending challenge from the currently active flow."""
277        class_path = class_to_path(self.current_stage_view.__class__)
278        self._logger.debug(
279            "f(exec): Passing GET",
280            view_class=class_path,
281            stage=self.current_stage,
282        )
283        try:
284            with (
285                start_span(
286                    op="authentik.flow.executor.stage",
287                    name=class_path,
288                ) as span,
289                HIST_FLOW_EXECUTION_STAGE_TIME.labels(
290                    method=request.method.upper(),
291                    stage_type=class_path,
292                ).time(),
293            ):
294                span.set_data("Method", request.method.upper())
295                span.set_data("authentik Stage", self.current_stage_view)
296                span.set_data("authentik Flow", self.flow.slug)
297                stage_response = self.current_stage_view.dispatch(request)
298                return to_stage_response(request, stage_response)
299        except Exception as exc:  # noqa
300            return self.handle_exception(exc)

Get the next pending challenge from the currently active flow.

@extend_schema(responses={200: PolymorphicProxySerializer(component_name='ChallengeTypes', serializers=challenge_types, resource_type_field_name='component')}, request=PolymorphicProxySerializer(component_name='FlowChallengeResponse', serializers=challenge_response_types, resource_type_field_name='component'), parameters=[OpenApiParameter(name='query', location=OpenApiParameter.QUERY, required=True, description='Querystring as received', type=OpenApiTypes.STR)], operation_id='flows_executor_solve')
def post( self, request: django.http.request.HttpRequest, *args, **kwargs) -> django.http.response.HttpResponse:
302    @extend_schema(
303        responses={
304            200: PolymorphicProxySerializer(
305                component_name="ChallengeTypes",
306                serializers=challenge_types,
307                resource_type_field_name="component",
308            ),
309        },
310        request=PolymorphicProxySerializer(
311            component_name="FlowChallengeResponse",
312            serializers=challenge_response_types,
313            resource_type_field_name="component",
314        ),
315        parameters=[
316            OpenApiParameter(
317                name="query",
318                location=OpenApiParameter.QUERY,
319                required=True,
320                description="Querystring as received",
321                type=OpenApiTypes.STR,
322            )
323        ],
324        operation_id="flows_executor_solve",
325    )
326    def post(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
327        """Solve the previously retrieved challenge and advanced to the next stage."""
328        class_path = class_to_path(self.current_stage_view.__class__)
329        self._logger.debug(
330            "f(exec): Passing POST",
331            view_class=class_path,
332            stage=self.current_stage,
333        )
334        try:
335            with (
336                start_span(
337                    op="authentik.flow.executor.stage",
338                    name=class_path,
339                ) as span,
340                HIST_FLOW_EXECUTION_STAGE_TIME.labels(
341                    method=request.method.upper(),
342                    stage_type=class_path,
343                ).time(),
344            ):
345                span.set_data("Method", request.method.upper())
346                span.set_data("authentik Stage", self.current_stage_view)
347                span.set_data("authentik Flow", self.flow.slug)
348                stage_response = self.current_stage_view.dispatch(request)
349                return to_stage_response(request, stage_response)
350        except Exception as exc:  # noqa
351            return self.handle_exception(exc)

Solve the previously retrieved challenge and advanced to the next stage.

def restart_flow(self, keep_context=False) -> django.http.response.HttpResponse:
368    def restart_flow(self, keep_context=False) -> HttpResponse:
369        """Restart the currently active flow, optionally keeping the current context"""
370        planner = FlowPlanner(self.flow)
371        planner.use_cache = False
372        default_context = None
373        if keep_context:
374            default_context = self.plan.context
375        try:
376            plan = planner.plan(self.request, default_context)
377        except FlowNonApplicableException as exc:
378            self._logger.warning("f(exec): Flow restart not applicable to current user", exc=exc)
379            return self.handle_invalid_flow(exc)
380        self.request.session[SESSION_KEY_PLAN] = plan
381        kwargs = self.kwargs
382        kwargs.update({"flow_slug": self.flow.slug})
383        return redirect_with_qs("authentik_api:flow-executor", self.request.GET, **kwargs)

Restart the currently active flow, optionally keeping the current context

def stage_ok(self) -> django.http.response.HttpResponse:
407    def stage_ok(self) -> HttpResponse:
408        """Callback called by stages upon successful completion.
409        Persists updated plan and context to session."""
410        self._logger.debug(
411            "f(exec): Stage ok",
412            stage_class=class_to_path(self.current_stage_view.__class__),
413        )
414        if isinstance(self.current_stage_view, StageView):
415            self.current_stage_view.cleanup()
416        self.request.session.get(SESSION_KEY_HISTORY, []).append(deepcopy(self.plan))
417        self.plan.pop()
418        self.request.session[SESSION_KEY_PLAN] = self.plan
419        if self.plan.bindings:
420            self._logger.debug(
421                "f(exec): Continuing with next stage",
422                remaining=len(self.plan.bindings),
423            )
424            kwargs = self.kwargs
425            kwargs.update({"flow_slug": self.flow.slug})
426            return redirect_with_qs("authentik_api:flow-executor", self.request.GET, **kwargs)
427        # User passed all stages
428        self._logger.debug(
429            "f(exec): User passed all stages",
430            context=cleanse_dict(self.plan.context),
431        )
432        return self._flow_done()

Callback called by stages upon successful completion. Persists updated plan and context to session.

def stage_invalid( self, error_message: str | None = None) -> django.http.response.HttpResponse:
434    def stage_invalid(self, error_message: str | None = None) -> HttpResponse:
435        """Callback used stage when data is correct but a policy denies access
436        or the user account is disabled.
437
438        Optionally, an exception can be passed, which will be shown if the current user
439        is a superuser."""
440        self._logger.debug("f(exec): Stage invalid")
441        if self.current_binding and self.current_binding.invalid_response_action in [
442            InvalidResponseAction.RESTART,
443            InvalidResponseAction.RESTART_WITH_CONTEXT,
444        ]:
445            keep_context = (
446                self.current_binding.invalid_response_action
447                == InvalidResponseAction.RESTART_WITH_CONTEXT
448            )
449            self._logger.debug(
450                "f(exec): Invalid response, restarting flow",
451                keep_context=keep_context,
452            )
453            return self.restart_flow(keep_context)
454        self.cancel()
455        challenge_view = AccessDeniedStage(self, error_message)
456        challenge_view.request = self.request
457        return to_stage_response(self.request, challenge_view.get(self.request))

Callback used stage when data is correct but a policy denies access or the user account is disabled.

Optionally, an exception can be passed, which will be shown if the current user is a superuser.

def cancel(self):
459    def cancel(self):
460        """Cancel current flow execution"""
461        keys_to_delete = [
462            SESSION_KEY_PLAN,
463            SESSION_KEY_GET,
464            # We might need the initial POST payloads for later requests
465            # SESSION_KEY_POST,
466            # We don't delete the history on purpose, as a user might
467            # still be inspecting it.
468            # It's only deleted on a fresh executions
469            # SESSION_KEY_HISTORY,
470        ]
471        self._logger.debug("f(exec): cleaning up")
472        for key in keys_to_delete:
473            if key in self.request.session:
474                del self.request.session[key]

Cancel current flow execution

class CancelView(django.views.generic.base.View):
477class CancelView(View):
478    """View which cancels the currently active plan"""
479
480    def get(self, request: HttpRequest) -> HttpResponse:
481        """View which canels the currently active plan"""
482        if SESSION_KEY_PLAN in request.session:
483            del request.session[SESSION_KEY_PLAN]
484            LOGGER.debug("Canceled current plan")
485        next_url = self.request.GET.get(NEXT_ARG_NAME)
486        if next_url and not is_url_absolute(next_url):
487            return redirect(next_url)
488        return redirect("authentik_flows:default-invalidation")

View which cancels the currently active plan

def get( self, request: django.http.request.HttpRequest) -> django.http.response.HttpResponse:
480    def get(self, request: HttpRequest) -> HttpResponse:
481        """View which canels the currently active plan"""
482        if SESSION_KEY_PLAN in request.session:
483            del request.session[SESSION_KEY_PLAN]
484            LOGGER.debug("Canceled current plan")
485        next_url = self.request.GET.get(NEXT_ARG_NAME)
486        if next_url and not is_url_absolute(next_url):
487            return redirect(next_url)
488        return redirect("authentik_flows:default-invalidation")

View which canels the currently active plan

class ToDefaultFlow(django.views.generic.base.View):
491class ToDefaultFlow(View):
492    """Redirect to default flow matching by designation"""
493
494    designation: FlowDesignation | None = None
495
496    @staticmethod
497    def flow_by_policy(request: HttpRequest, **flow_filter) -> Flow | None:
498        """Get a Flow by `**flow_filter` and check if the request from `request` can access it."""
499        flows = Flow.objects.filter(**flow_filter).order_by("slug")
500        for flow in flows:
501            engine = PolicyEngine(flow, request.user, request)
502            engine.build()
503            result = engine.result
504            if result.passing:
505                LOGGER.debug("flow_by_policy: flow passing", flow=flow)
506                return flow
507            LOGGER.warning("flow_by_policy: flow not passing", flow=flow, messages=result.messages)
508        LOGGER.debug("flow_by_policy: no flow found", filters=flow_filter)
509        return None
510
511    @staticmethod
512    def get_flow(request: HttpRequest, designation: FlowDesignation) -> Flow:
513        """Get a flow for the selected designation"""
514        brand: Brand = request.brand
515        flow = None
516        # First, attempt to get default flow from brand
517        if designation == FlowDesignation.AUTHENTICATION:
518            flow = brand.flow_authentication
519        elif designation == FlowDesignation.INVALIDATION:
520            flow = brand.flow_invalidation
521        if flow:
522            return flow
523        # If no flow was set, get the first based on slug and policy
524        flow = ToDefaultFlow.flow_by_policy(request, designation=designation)
525        if flow:
526            return flow
527        # If we still don't have a flow, 404
528        raise Http404
529
530    def dispatch(self, request: HttpRequest) -> HttpResponse:
531        flow = ToDefaultFlow.get_flow(request, self.designation)
532        # If user already has a pending plan, clear it so we don't have to later.
533        if SESSION_KEY_PLAN in self.request.session:
534            plan: FlowPlan = self.request.session[SESSION_KEY_PLAN]
535            if plan.flow_pk != flow.pk.hex:
536                LOGGER.warning(
537                    "f(def): Found existing plan for other flow, deleting plan",
538                    flow_slug=flow.slug,
539                )
540                del self.request.session[SESSION_KEY_PLAN]
541        return redirect_with_qs("authentik_core:if-flow", request.GET, flow_slug=flow.slug)

Redirect to default flow matching by designation

designation: authentik.flows.models.FlowDesignation | None = None
@staticmethod
def flow_by_policy( request: django.http.request.HttpRequest, **flow_filter) -> authentik.flows.models.Flow | None:
496    @staticmethod
497    def flow_by_policy(request: HttpRequest, **flow_filter) -> Flow | None:
498        """Get a Flow by `**flow_filter` and check if the request from `request` can access it."""
499        flows = Flow.objects.filter(**flow_filter).order_by("slug")
500        for flow in flows:
501            engine = PolicyEngine(flow, request.user, request)
502            engine.build()
503            result = engine.result
504            if result.passing:
505                LOGGER.debug("flow_by_policy: flow passing", flow=flow)
506                return flow
507            LOGGER.warning("flow_by_policy: flow not passing", flow=flow, messages=result.messages)
508        LOGGER.debug("flow_by_policy: no flow found", filters=flow_filter)
509        return None

Get a Flow by **flow_filter and check if the request from request can access it.

@staticmethod
def get_flow( request: django.http.request.HttpRequest, designation: authentik.flows.models.FlowDesignation) -> authentik.flows.models.Flow:
511    @staticmethod
512    def get_flow(request: HttpRequest, designation: FlowDesignation) -> Flow:
513        """Get a flow for the selected designation"""
514        brand: Brand = request.brand
515        flow = None
516        # First, attempt to get default flow from brand
517        if designation == FlowDesignation.AUTHENTICATION:
518            flow = brand.flow_authentication
519        elif designation == FlowDesignation.INVALIDATION:
520            flow = brand.flow_invalidation
521        if flow:
522            return flow
523        # If no flow was set, get the first based on slug and policy
524        flow = ToDefaultFlow.flow_by_policy(request, designation=designation)
525        if flow:
526            return flow
527        # If we still don't have a flow, 404
528        raise Http404

Get a flow for the selected designation

def dispatch( self, request: django.http.request.HttpRequest) -> django.http.response.HttpResponse:
530    def dispatch(self, request: HttpRequest) -> HttpResponse:
531        flow = ToDefaultFlow.get_flow(request, self.designation)
532        # If user already has a pending plan, clear it so we don't have to later.
533        if SESSION_KEY_PLAN in self.request.session:
534            plan: FlowPlan = self.request.session[SESSION_KEY_PLAN]
535            if plan.flow_pk != flow.pk.hex:
536                LOGGER.warning(
537                    "f(def): Found existing plan for other flow, deleting plan",
538                    flow_slug=flow.slug,
539                )
540                del self.request.session[SESSION_KEY_PLAN]
541        return redirect_with_qs("authentik_core:if-flow", request.GET, flow_slug=flow.slug)
def to_stage_response( request: django.http.request.HttpRequest, source: django.http.response.HttpResponse) -> django.http.response.HttpResponse:
544def to_stage_response(request: HttpRequest, source: HttpResponse) -> HttpResponse:
545    """Convert normal HttpResponse into JSON Response"""
546    if (
547        isinstance(source, HttpResponseRedirect)
548        or source.status_code == HttpResponseRedirect.status_code
549    ):
550        redirect_url = source["Location"]
551        # Redirects to the same URL usually indicate an Error within a form
552        if request.get_full_path() == redirect_url:
553            return source
554        LOGGER.debug(
555            "converting to redirect challenge",
556            to=str(redirect_url),
557            current=request.path,
558        )
559        return HttpChallengeResponse(
560            RedirectChallenge(
561                {
562                    "to": str(redirect_url),
563                }
564            )
565        )
566    if isinstance(source, TemplateResponse):
567        return HttpChallengeResponse(
568            ShellChallenge(
569                {
570                    "body": source.render().content.decode("utf-8"),
571                }
572            )
573        )
574    # Check for actual HttpResponse (without isinstance as we don't want to check inheritance)
575    if source.__class__ == HttpResponse:
576        return HttpChallengeResponse(
577            ShellChallenge(
578                {
579                    "body": source.content.decode("utf-8"),
580                }
581            )
582        )
583    return source

Convert normal HttpResponse into JSON Response

class ConfigureFlowInitView(django.contrib.auth.mixins.LoginRequiredMixin, django.views.generic.base.View):
586class ConfigureFlowInitView(LoginRequiredMixin, View):
587    """Initiate planner for selected change flow and redirect to flow executor,
588    or raise Http404 if no configure_flow has been set."""
589
590    def get(self, request: HttpRequest, stage_uuid: str) -> HttpResponse:
591        """Initiate planner for selected change flow and redirect to flow executor,
592        or raise Http404 if no configure_flow has been set."""
593        try:
594            stage: Stage = Stage.objects.get_subclass(pk=stage_uuid)
595        except Stage.DoesNotExist as exc:
596            raise Http404 from exc
597        if not isinstance(stage, ConfigurableStage):
598            LOGGER.debug("Stage does not inherit ConfigurableStage", stage=stage)
599            raise Http404
600        if not stage.configure_flow:
601            LOGGER.debug("Stage has no configure_flow set", stage=stage)
602            raise Http404
603
604        try:
605            plan = FlowPlanner(stage.configure_flow).plan(
606                request, {PLAN_CONTEXT_PENDING_USER: request.user}
607            )
608        except FlowNonApplicableException:
609            LOGGER.warning("Flow not applicable to user")
610            raise Http404 from None
611        return plan.to_redirect(request, stage.configure_flow)

Initiate planner for selected change flow and redirect to flow executor, or raise Http404 if no configure_flow has been set.

def get( self, request: django.http.request.HttpRequest, stage_uuid: str) -> django.http.response.HttpResponse:
590    def get(self, request: HttpRequest, stage_uuid: str) -> HttpResponse:
591        """Initiate planner for selected change flow and redirect to flow executor,
592        or raise Http404 if no configure_flow has been set."""
593        try:
594            stage: Stage = Stage.objects.get_subclass(pk=stage_uuid)
595        except Stage.DoesNotExist as exc:
596            raise Http404 from exc
597        if not isinstance(stage, ConfigurableStage):
598            LOGGER.debug("Stage does not inherit ConfigurableStage", stage=stage)
599            raise Http404
600        if not stage.configure_flow:
601            LOGGER.debug("Stage has no configure_flow set", stage=stage)
602            raise Http404
603
604        try:
605            plan = FlowPlanner(stage.configure_flow).plan(
606                request, {PLAN_CONTEXT_PENDING_USER: request.user}
607            )
608        except FlowNonApplicableException:
609            LOGGER.warning("Flow not applicable to user")
610            raise Http404 from None
611        return plan.to_redirect(request, stage.configure_flow)

Initiate planner for selected change flow and redirect to flow executor, or raise Http404 if no configure_flow has been set.