authentik.flows.views.inspector

Flow Inspector

  1"""Flow Inspector"""
  2
  3from hashlib import sha256
  4from typing import Any
  5
  6from django.conf import settings
  7from django.http import Http404
  8from django.http.request import HttpRequest
  9from django.http.response import HttpResponse
 10from django.shortcuts import get_object_or_404
 11from django.utils.decorators import method_decorator
 12from django.views.decorators.clickjacking import xframe_options_sameorigin
 13from drf_spectacular.types import OpenApiTypes
 14from drf_spectacular.utils import OpenApiResponse, extend_schema
 15from rest_framework.fields import BooleanField, ListField, SerializerMethodField
 16from rest_framework.permissions import IsAuthenticated
 17from rest_framework.request import Request
 18from rest_framework.response import Response
 19from rest_framework.views import APIView
 20from structlog.stdlib import BoundLogger, get_logger
 21
 22from authentik.core.api.utils import PassiveSerializer
 23from authentik.events.utils import sanitize_dict
 24from authentik.flows.api.bindings import FlowStageBindingSerializer
 25from authentik.flows.models import Flow
 26from authentik.flows.planner import FlowPlan
 27from authentik.flows.views.executor import SESSION_KEY_HISTORY, SESSION_KEY_PLAN
 28
 29MIN_FLOW_LENGTH = 2
 30
 31
 32class FlowInspectorPlanSerializer(PassiveSerializer):
 33    """Serializer for an active FlowPlan"""
 34
 35    current_stage = SerializerMethodField()
 36    next_planned_stage = SerializerMethodField(required=False)
 37    plan_context = SerializerMethodField()
 38    session_id = SerializerMethodField()
 39
 40    def get_current_stage(self, plan: FlowPlan) -> FlowStageBindingSerializer:
 41        """Get the current stage"""
 42        return FlowStageBindingSerializer(instance=plan.bindings[0]).data
 43
 44    def get_next_planned_stage(self, plan: FlowPlan) -> FlowStageBindingSerializer:
 45        """Get the next planned stage"""
 46        if len(plan.bindings) < MIN_FLOW_LENGTH:
 47            return FlowStageBindingSerializer().data
 48        return FlowStageBindingSerializer(instance=plan.bindings[1]).data
 49
 50    def get_plan_context(self, plan: FlowPlan) -> dict[str, Any]:
 51        """Get the plan's context, sanitized"""
 52        return sanitize_dict(plan.context)
 53
 54    def get_session_id(self, _plan: FlowPlan) -> str:
 55        """Get a unique session ID"""
 56        request: Request = self.context["request"]
 57        return sha256(request._request.session.session_key.encode("ascii")).hexdigest()
 58
 59
 60class FlowInspectionSerializer(PassiveSerializer):
 61    """Serializer for inspect endpoint"""
 62
 63    plans = ListField(child=FlowInspectorPlanSerializer())
 64    current_plan = FlowInspectorPlanSerializer(required=False)
 65    is_completed = BooleanField()
 66
 67
 68@method_decorator(xframe_options_sameorigin, name="dispatch")
 69class FlowInspectorView(APIView):
 70    """Flow inspector API"""
 71
 72    flow: Flow
 73    _logger: BoundLogger
 74
 75    def get_permissions(self):
 76        if settings.DEBUG:
 77            return []
 78        return [IsAuthenticated()]
 79
 80    def setup(self, request: HttpRequest, flow_slug: str):
 81        super().setup(request, flow_slug=flow_slug)
 82        self._logger = get_logger().bind(flow_slug=flow_slug)
 83        self.flow = get_object_or_404(Flow.objects.select_related(), slug=flow_slug)
 84        if settings.DEBUG:
 85            return
 86        if request.user.has_perm(
 87            "authentik_flows.inspect_flow", self.flow
 88        ) or request.user.has_perm("authentik_flows.inspect_flow"):
 89            return
 90        raise Http404
 91
 92    @extend_schema(
 93        responses={
 94            200: FlowInspectionSerializer(),
 95            400: OpenApiResponse(description="No flow plan in session."),
 96        },
 97        request=OpenApiTypes.NONE,
 98        operation_id="flows_inspector_get",
 99    )
100    def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
101        """Get current flow state and record it"""
102        plans = []
103        for plan in request.session.get(SESSION_KEY_HISTORY, []):
104            plan: FlowPlan
105            if plan.flow_pk != self.flow.pk.hex:
106                continue
107            plan_serializer = FlowInspectorPlanSerializer(
108                instance=plan, context={"request": request}
109            )
110            plans.append(plan_serializer.data)
111        is_completed = False
112        if SESSION_KEY_PLAN in request.session:
113            current_plan: FlowPlan = request.session[SESSION_KEY_PLAN]
114        else:
115            try:
116                current_plan = request.session.get(SESSION_KEY_HISTORY, [])[-1]
117            except IndexError:
118                return Response(status=400)
119            is_completed = True
120        current_serializer = FlowInspectorPlanSerializer(
121            instance=current_plan, context={"request": request}
122        )
123        response = {
124            "plans": plans,
125            "current_plan": current_serializer.data,
126            "is_completed": is_completed,
127        }
128        return Response(response)
MIN_FLOW_LENGTH = 2
class FlowInspectorPlanSerializer(authentik.core.api.utils.PassiveSerializer):
33class FlowInspectorPlanSerializer(PassiveSerializer):
34    """Serializer for an active FlowPlan"""
35
36    current_stage = SerializerMethodField()
37    next_planned_stage = SerializerMethodField(required=False)
38    plan_context = SerializerMethodField()
39    session_id = SerializerMethodField()
40
41    def get_current_stage(self, plan: FlowPlan) -> FlowStageBindingSerializer:
42        """Get the current stage"""
43        return FlowStageBindingSerializer(instance=plan.bindings[0]).data
44
45    def get_next_planned_stage(self, plan: FlowPlan) -> FlowStageBindingSerializer:
46        """Get the next planned stage"""
47        if len(plan.bindings) < MIN_FLOW_LENGTH:
48            return FlowStageBindingSerializer().data
49        return FlowStageBindingSerializer(instance=plan.bindings[1]).data
50
51    def get_plan_context(self, plan: FlowPlan) -> dict[str, Any]:
52        """Get the plan's context, sanitized"""
53        return sanitize_dict(plan.context)
54
55    def get_session_id(self, _plan: FlowPlan) -> str:
56        """Get a unique session ID"""
57        request: Request = self.context["request"]
58        return sha256(request._request.session.session_key.encode("ascii")).hexdigest()

Serializer for an active FlowPlan

current_stage
next_planned_stage
plan_context
session_id
41    def get_current_stage(self, plan: FlowPlan) -> FlowStageBindingSerializer:
42        """Get the current stage"""
43        return FlowStageBindingSerializer(instance=plan.bindings[0]).data

Get the current stage

def get_next_planned_stage( self, plan: authentik.flows.planner.FlowPlan) -> authentik.flows.api.bindings.FlowStageBindingSerializer:
45    def get_next_planned_stage(self, plan: FlowPlan) -> FlowStageBindingSerializer:
46        """Get the next planned stage"""
47        if len(plan.bindings) < MIN_FLOW_LENGTH:
48            return FlowStageBindingSerializer().data
49        return FlowStageBindingSerializer(instance=plan.bindings[1]).data

Get the next planned stage

def get_plan_context(self, plan: authentik.flows.planner.FlowPlan) -> dict[str, typing.Any]:
51    def get_plan_context(self, plan: FlowPlan) -> dict[str, Any]:
52        """Get the plan's context, sanitized"""
53        return sanitize_dict(plan.context)

Get the plan's context, sanitized

def get_session_id(self, _plan: authentik.flows.planner.FlowPlan) -> str:
55    def get_session_id(self, _plan: FlowPlan) -> str:
56        """Get a unique session ID"""
57        request: Request = self.context["request"]
58        return sha256(request._request.session.session_key.encode("ascii")).hexdigest()

Get a unique session ID

class FlowInspectionSerializer(authentik.core.api.utils.PassiveSerializer):
61class FlowInspectionSerializer(PassiveSerializer):
62    """Serializer for inspect endpoint"""
63
64    plans = ListField(child=FlowInspectorPlanSerializer())
65    current_plan = FlowInspectorPlanSerializer(required=False)
66    is_completed = BooleanField()

Serializer for inspect endpoint

plans
current_plan
is_completed
@method_decorator(xframe_options_sameorigin, name='dispatch')
class FlowInspectorView(rest_framework.views.APIView):
 69@method_decorator(xframe_options_sameorigin, name="dispatch")
 70class FlowInspectorView(APIView):
 71    """Flow inspector API"""
 72
 73    flow: Flow
 74    _logger: BoundLogger
 75
 76    def get_permissions(self):
 77        if settings.DEBUG:
 78            return []
 79        return [IsAuthenticated()]
 80
 81    def setup(self, request: HttpRequest, flow_slug: str):
 82        super().setup(request, flow_slug=flow_slug)
 83        self._logger = get_logger().bind(flow_slug=flow_slug)
 84        self.flow = get_object_or_404(Flow.objects.select_related(), slug=flow_slug)
 85        if settings.DEBUG:
 86            return
 87        if request.user.has_perm(
 88            "authentik_flows.inspect_flow", self.flow
 89        ) or request.user.has_perm("authentik_flows.inspect_flow"):
 90            return
 91        raise Http404
 92
 93    @extend_schema(
 94        responses={
 95            200: FlowInspectionSerializer(),
 96            400: OpenApiResponse(description="No flow plan in session."),
 97        },
 98        request=OpenApiTypes.NONE,
 99        operation_id="flows_inspector_get",
100    )
101    def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
102        """Get current flow state and record it"""
103        plans = []
104        for plan in request.session.get(SESSION_KEY_HISTORY, []):
105            plan: FlowPlan
106            if plan.flow_pk != self.flow.pk.hex:
107                continue
108            plan_serializer = FlowInspectorPlanSerializer(
109                instance=plan, context={"request": request}
110            )
111            plans.append(plan_serializer.data)
112        is_completed = False
113        if SESSION_KEY_PLAN in request.session:
114            current_plan: FlowPlan = request.session[SESSION_KEY_PLAN]
115        else:
116            try:
117                current_plan = request.session.get(SESSION_KEY_HISTORY, [])[-1]
118            except IndexError:
119                return Response(status=400)
120            is_completed = True
121        current_serializer = FlowInspectorPlanSerializer(
122            instance=current_plan, context={"request": request}
123        )
124        response = {
125            "plans": plans,
126            "current_plan": current_serializer.data,
127            "is_completed": is_completed,
128        }
129        return Response(response)

Flow inspector API

def get_permissions(self):
76    def get_permissions(self):
77        if settings.DEBUG:
78            return []
79        return [IsAuthenticated()]

Instantiates and returns the list of permissions that this view requires.

def setup(self, request: django.http.request.HttpRequest, flow_slug: str):
81    def setup(self, request: HttpRequest, flow_slug: str):
82        super().setup(request, flow_slug=flow_slug)
83        self._logger = get_logger().bind(flow_slug=flow_slug)
84        self.flow = get_object_or_404(Flow.objects.select_related(), slug=flow_slug)
85        if settings.DEBUG:
86            return
87        if request.user.has_perm(
88            "authentik_flows.inspect_flow", self.flow
89        ) or request.user.has_perm("authentik_flows.inspect_flow"):
90            return
91        raise Http404

Initialize attributes shared by all view methods.

@extend_schema(responses={200: FlowInspectionSerializer(), 400: OpenApiResponse(description='No flow plan in session.')}, request=OpenApiTypes.NONE, operation_id='flows_inspector_get')
def get( self, request: django.http.request.HttpRequest, *args, **kwargs) -> django.http.response.HttpResponse:
 93    @extend_schema(
 94        responses={
 95            200: FlowInspectionSerializer(),
 96            400: OpenApiResponse(description="No flow plan in session."),
 97        },
 98        request=OpenApiTypes.NONE,
 99        operation_id="flows_inspector_get",
100    )
101    def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
102        """Get current flow state and record it"""
103        plans = []
104        for plan in request.session.get(SESSION_KEY_HISTORY, []):
105            plan: FlowPlan
106            if plan.flow_pk != self.flow.pk.hex:
107                continue
108            plan_serializer = FlowInspectorPlanSerializer(
109                instance=plan, context={"request": request}
110            )
111            plans.append(plan_serializer.data)
112        is_completed = False
113        if SESSION_KEY_PLAN in request.session:
114            current_plan: FlowPlan = request.session[SESSION_KEY_PLAN]
115        else:
116            try:
117                current_plan = request.session.get(SESSION_KEY_HISTORY, [])[-1]
118            except IndexError:
119                return Response(status=400)
120            is_completed = True
121        current_serializer = FlowInspectorPlanSerializer(
122            instance=current_plan, context={"request": request}
123        )
124        response = {
125            "plans": plans,
126            "current_plan": current_serializer.data,
127            "is_completed": is_completed,
128        }
129        return Response(response)

Get current flow state and record it

def dispatch(self, request, *args, **kwargs):
492    def dispatch(self, request, *args, **kwargs):
493        """
494        `.dispatch()` is pretty much the same as Django's regular dispatch,
495        but with extra hooks for startup, finalize, and exception handling.
496        """
497        self.args = args
498        self.kwargs = kwargs
499        request = self.initialize_request(request, *args, **kwargs)
500        self.request = request
501        self.headers = self.default_response_headers  # deprecate?
502
503        try:
504            self.initial(request, *args, **kwargs)
505
506            # Get the appropriate handler method
507            if request.method.lower() in self.http_method_names:
508                handler = getattr(self, request.method.lower(),
509                                  self.http_method_not_allowed)
510            else:
511                handler = self.http_method_not_allowed
512
513            response = handler(request, *args, **kwargs)
514
515        except Exception as exc:
516            response = self.handle_exception(exc)
517
518        self.response = self.finalize_response(request, response, *args, **kwargs)
519        return self.response

.dispatch() is pretty much the same as Django's regular dispatch, but with extra hooks for startup, finalize, and exception handling.