authentik.flows.views.inspector

Flow Inspector

  1"""Flow Inspector"""
  2
  3from hashlib import sha256
  4from typing import Any
  5
  6from django.conf import settings
  7from django.http import Http404
  8from django.http.request import HttpRequest
  9from django.http.response import HttpResponse
 10from django.shortcuts import get_object_or_404
 11from django.utils.decorators import method_decorator
 12from django.views.decorators.clickjacking import xframe_options_sameorigin
 13from drf_spectacular.types import OpenApiTypes
 14from drf_spectacular.utils import OpenApiResponse, extend_schema
 15from rest_framework.fields import BooleanField, ListField, SerializerMethodField
 16from rest_framework.permissions import IsAuthenticated
 17from rest_framework.request import Request
 18from rest_framework.response import Response
 19from rest_framework.views import APIView
 20from structlog.stdlib import BoundLogger, get_logger
 21
 22from authentik.core.api.utils import PassiveSerializer
 23from authentik.events.utils import sanitize_dict
 24from authentik.flows.api.bindings import FlowStageBindingSerializer
 25from authentik.flows.models import Flow
 26from authentik.flows.planner import FlowPlan
 27from authentik.flows.views.executor import SESSION_KEY_HISTORY, SESSION_KEY_PLAN
 28
 29MIN_FLOW_LENGTH = 2
 30
 31
 32class FlowInspectorPlanSerializer(PassiveSerializer):
 33    """Serializer for an active FlowPlan"""
 34
 35    current_stage = SerializerMethodField()
 36    next_planned_stage = SerializerMethodField(required=False)
 37    plan_context = SerializerMethodField()
 38    session_id = SerializerMethodField()
 39
 40    def get_current_stage(self, plan: FlowPlan) -> FlowStageBindingSerializer:
 41        """Get the current stage"""
 42        return FlowStageBindingSerializer(instance=plan.bindings[0]).data
 43
 44    def get_next_planned_stage(self, plan: FlowPlan) -> FlowStageBindingSerializer:
 45        """Get the next planned stage"""
 46        if len(plan.bindings) < MIN_FLOW_LENGTH:
 47            return FlowStageBindingSerializer().data
 48        return FlowStageBindingSerializer(instance=plan.bindings[1]).data
 49
 50    def get_plan_context(self, plan: FlowPlan) -> dict[str, Any]:
 51        """Get the plan's context, sanitized"""
 52        return sanitize_dict(plan.context)
 53
 54    def get_session_id(self, _plan: FlowPlan) -> str:
 55        """Get a unique session ID"""
 56        request: Request = self.context["request"]
 57        return sha256(request._request.session.session_key.encode("ascii")).hexdigest()
 58
 59
 60class FlowInspectionSerializer(PassiveSerializer):
 61    """Serializer for inspect endpoint"""
 62
 63    plans = ListField(child=FlowInspectorPlanSerializer())
 64    current_plan = FlowInspectorPlanSerializer(required=False)
 65    is_completed = BooleanField()
 66
 67
 68@method_decorator(xframe_options_sameorigin, name="dispatch")
 69class FlowInspectorView(APIView):
 70    """Flow inspector API"""
 71
 72    flow: Flow
 73    _logger: BoundLogger
 74    permission_classes = [IsAuthenticated]
 75
 76    def setup(self, request: HttpRequest, flow_slug: str):
 77        super().setup(request, flow_slug=flow_slug)
 78        self._logger = get_logger().bind(flow_slug=flow_slug)
 79        self.flow = get_object_or_404(Flow.objects.select_related(), slug=flow_slug)
 80        if settings.DEBUG:
 81            return
 82        if request.user.has_perm(
 83            "authentik_flows.inspect_flow", self.flow
 84        ) or request.user.has_perm("authentik_flows.inspect_flow"):
 85            return
 86        raise Http404
 87
 88    @extend_schema(
 89        responses={
 90            200: FlowInspectionSerializer(),
 91            400: OpenApiResponse(description="No flow plan in session."),
 92        },
 93        request=OpenApiTypes.NONE,
 94        operation_id="flows_inspector_get",
 95    )
 96    def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
 97        """Get current flow state and record it"""
 98        plans = []
 99        for plan in request.session.get(SESSION_KEY_HISTORY, []):
100            plan: FlowPlan
101            if plan.flow_pk != self.flow.pk.hex:
102                continue
103            plan_serializer = FlowInspectorPlanSerializer(
104                instance=plan, context={"request": request}
105            )
106            plans.append(plan_serializer.data)
107        is_completed = False
108        if SESSION_KEY_PLAN in request.session:
109            current_plan: FlowPlan = request.session[SESSION_KEY_PLAN]
110        else:
111            try:
112                current_plan = request.session.get(SESSION_KEY_HISTORY, [])[-1]
113            except IndexError:
114                return Response(status=400)
115            is_completed = True
116        current_serializer = FlowInspectorPlanSerializer(
117            instance=current_plan, context={"request": request}
118        )
119        response = {
120            "plans": plans,
121            "current_plan": current_serializer.data,
122            "is_completed": is_completed,
123        }
124        return Response(response)
MIN_FLOW_LENGTH = 2
class FlowInspectorPlanSerializer(authentik.core.api.utils.PassiveSerializer):
33class FlowInspectorPlanSerializer(PassiveSerializer):
34    """Serializer for an active FlowPlan"""
35
36    current_stage = SerializerMethodField()
37    next_planned_stage = SerializerMethodField(required=False)
38    plan_context = SerializerMethodField()
39    session_id = SerializerMethodField()
40
41    def get_current_stage(self, plan: FlowPlan) -> FlowStageBindingSerializer:
42        """Get the current stage"""
43        return FlowStageBindingSerializer(instance=plan.bindings[0]).data
44
45    def get_next_planned_stage(self, plan: FlowPlan) -> FlowStageBindingSerializer:
46        """Get the next planned stage"""
47        if len(plan.bindings) < MIN_FLOW_LENGTH:
48            return FlowStageBindingSerializer().data
49        return FlowStageBindingSerializer(instance=plan.bindings[1]).data
50
51    def get_plan_context(self, plan: FlowPlan) -> dict[str, Any]:
52        """Get the plan's context, sanitized"""
53        return sanitize_dict(plan.context)
54
55    def get_session_id(self, _plan: FlowPlan) -> str:
56        """Get a unique session ID"""
57        request: Request = self.context["request"]
58        return sha256(request._request.session.session_key.encode("ascii")).hexdigest()

Serializer for an active FlowPlan

current_stage
next_planned_stage
plan_context
session_id
41    def get_current_stage(self, plan: FlowPlan) -> FlowStageBindingSerializer:
42        """Get the current stage"""
43        return FlowStageBindingSerializer(instance=plan.bindings[0]).data

Get the current stage

def get_next_planned_stage( self, plan: authentik.flows.planner.FlowPlan) -> authentik.flows.api.bindings.FlowStageBindingSerializer:
45    def get_next_planned_stage(self, plan: FlowPlan) -> FlowStageBindingSerializer:
46        """Get the next planned stage"""
47        if len(plan.bindings) < MIN_FLOW_LENGTH:
48            return FlowStageBindingSerializer().data
49        return FlowStageBindingSerializer(instance=plan.bindings[1]).data

Get the next planned stage

def get_plan_context(self, plan: authentik.flows.planner.FlowPlan) -> dict[str, typing.Any]:
51    def get_plan_context(self, plan: FlowPlan) -> dict[str, Any]:
52        """Get the plan's context, sanitized"""
53        return sanitize_dict(plan.context)

Get the plan's context, sanitized

def get_session_id(self, _plan: authentik.flows.planner.FlowPlan) -> str:
55    def get_session_id(self, _plan: FlowPlan) -> str:
56        """Get a unique session ID"""
57        request: Request = self.context["request"]
58        return sha256(request._request.session.session_key.encode("ascii")).hexdigest()

Get a unique session ID

class FlowInspectionSerializer(authentik.core.api.utils.PassiveSerializer):
61class FlowInspectionSerializer(PassiveSerializer):
62    """Serializer for inspect endpoint"""
63
64    plans = ListField(child=FlowInspectorPlanSerializer())
65    current_plan = FlowInspectorPlanSerializer(required=False)
66    is_completed = BooleanField()

Serializer for inspect endpoint

plans
current_plan
is_completed
@method_decorator(xframe_options_sameorigin, name='dispatch')
class FlowInspectorView(rest_framework.views.APIView):
 69@method_decorator(xframe_options_sameorigin, name="dispatch")
 70class FlowInspectorView(APIView):
 71    """Flow inspector API"""
 72
 73    flow: Flow
 74    _logger: BoundLogger
 75    permission_classes = [IsAuthenticated]
 76
 77    def setup(self, request: HttpRequest, flow_slug: str):
 78        super().setup(request, flow_slug=flow_slug)
 79        self._logger = get_logger().bind(flow_slug=flow_slug)
 80        self.flow = get_object_or_404(Flow.objects.select_related(), slug=flow_slug)
 81        if settings.DEBUG:
 82            return
 83        if request.user.has_perm(
 84            "authentik_flows.inspect_flow", self.flow
 85        ) or request.user.has_perm("authentik_flows.inspect_flow"):
 86            return
 87        raise Http404
 88
 89    @extend_schema(
 90        responses={
 91            200: FlowInspectionSerializer(),
 92            400: OpenApiResponse(description="No flow plan in session."),
 93        },
 94        request=OpenApiTypes.NONE,
 95        operation_id="flows_inspector_get",
 96    )
 97    def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
 98        """Get current flow state and record it"""
 99        plans = []
100        for plan in request.session.get(SESSION_KEY_HISTORY, []):
101            plan: FlowPlan
102            if plan.flow_pk != self.flow.pk.hex:
103                continue
104            plan_serializer = FlowInspectorPlanSerializer(
105                instance=plan, context={"request": request}
106            )
107            plans.append(plan_serializer.data)
108        is_completed = False
109        if SESSION_KEY_PLAN in request.session:
110            current_plan: FlowPlan = request.session[SESSION_KEY_PLAN]
111        else:
112            try:
113                current_plan = request.session.get(SESSION_KEY_HISTORY, [])[-1]
114            except IndexError:
115                return Response(status=400)
116            is_completed = True
117        current_serializer = FlowInspectorPlanSerializer(
118            instance=current_plan, context={"request": request}
119        )
120        response = {
121            "plans": plans,
122            "current_plan": current_serializer.data,
123            "is_completed": is_completed,
124        }
125        return Response(response)

Flow inspector API

permission_classes = [<class 'rest_framework.permissions.IsAuthenticated'>]
def setup(self, request: django.http.request.HttpRequest, flow_slug: str):
77    def setup(self, request: HttpRequest, flow_slug: str):
78        super().setup(request, flow_slug=flow_slug)
79        self._logger = get_logger().bind(flow_slug=flow_slug)
80        self.flow = get_object_or_404(Flow.objects.select_related(), slug=flow_slug)
81        if settings.DEBUG:
82            return
83        if request.user.has_perm(
84            "authentik_flows.inspect_flow", self.flow
85        ) or request.user.has_perm("authentik_flows.inspect_flow"):
86            return
87        raise Http404

Initialize attributes shared by all view methods.

@extend_schema(responses={200: FlowInspectionSerializer(), 400: OpenApiResponse(description='No flow plan in session.')}, request=OpenApiTypes.NONE, operation_id='flows_inspector_get')
def get( self, request: django.http.request.HttpRequest, *args, **kwargs) -> django.http.response.HttpResponse:
 89    @extend_schema(
 90        responses={
 91            200: FlowInspectionSerializer(),
 92            400: OpenApiResponse(description="No flow plan in session."),
 93        },
 94        request=OpenApiTypes.NONE,
 95        operation_id="flows_inspector_get",
 96    )
 97    def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
 98        """Get current flow state and record it"""
 99        plans = []
100        for plan in request.session.get(SESSION_KEY_HISTORY, []):
101            plan: FlowPlan
102            if plan.flow_pk != self.flow.pk.hex:
103                continue
104            plan_serializer = FlowInspectorPlanSerializer(
105                instance=plan, context={"request": request}
106            )
107            plans.append(plan_serializer.data)
108        is_completed = False
109        if SESSION_KEY_PLAN in request.session:
110            current_plan: FlowPlan = request.session[SESSION_KEY_PLAN]
111        else:
112            try:
113                current_plan = request.session.get(SESSION_KEY_HISTORY, [])[-1]
114            except IndexError:
115                return Response(status=400)
116            is_completed = True
117        current_serializer = FlowInspectorPlanSerializer(
118            instance=current_plan, context={"request": request}
119        )
120        response = {
121            "plans": plans,
122            "current_plan": current_serializer.data,
123            "is_completed": is_completed,
124        }
125        return Response(response)

Get current flow state and record it

def dispatch(self, request, *args, **kwargs):
492    def dispatch(self, request, *args, **kwargs):
493        """
494        `.dispatch()` is pretty much the same as Django's regular dispatch,
495        but with extra hooks for startup, finalize, and exception handling.
496        """
497        self.args = args
498        self.kwargs = kwargs
499        request = self.initialize_request(request, *args, **kwargs)
500        self.request = request
501        self.headers = self.default_response_headers  # deprecate?
502
503        try:
504            self.initial(request, *args, **kwargs)
505
506            # Get the appropriate handler method
507            if request.method.lower() in self.http_method_names:
508                handler = getattr(self, request.method.lower(),
509                                  self.http_method_not_allowed)
510            else:
511                handler = self.http_method_not_allowed
512
513            response = handler(request, *args, **kwargs)
514
515        except Exception as exc:
516            response = self.handle_exception(exc)
517
518        self.response = self.finalize_response(request, response, *args, **kwargs)
519        return self.response

.dispatch() is pretty much the same as Django's regular dispatch, but with extra hooks for startup, finalize, and exception handling.