authentik.flows.views.inspector
Flow Inspector
1"""Flow Inspector""" 2 3from hashlib import sha256 4from typing import Any 5 6from django.conf import settings 7from django.http import Http404 8from django.http.request import HttpRequest 9from django.http.response import HttpResponse 10from django.shortcuts import get_object_or_404 11from django.utils.decorators import method_decorator 12from django.views.decorators.clickjacking import xframe_options_sameorigin 13from drf_spectacular.types import OpenApiTypes 14from drf_spectacular.utils import OpenApiResponse, extend_schema 15from rest_framework.fields import BooleanField, ListField, SerializerMethodField 16from rest_framework.permissions import IsAuthenticated 17from rest_framework.request import Request 18from rest_framework.response import Response 19from rest_framework.views import APIView 20from structlog.stdlib import BoundLogger, get_logger 21 22from authentik.core.api.utils import PassiveSerializer 23from authentik.events.utils import sanitize_dict 24from authentik.flows.api.bindings import FlowStageBindingSerializer 25from authentik.flows.models import Flow 26from authentik.flows.planner import FlowPlan 27from authentik.flows.views.executor import SESSION_KEY_HISTORY, SESSION_KEY_PLAN 28 29MIN_FLOW_LENGTH = 2 30 31 32class FlowInspectorPlanSerializer(PassiveSerializer): 33 """Serializer for an active FlowPlan""" 34 35 current_stage = SerializerMethodField() 36 next_planned_stage = SerializerMethodField(required=False) 37 plan_context = SerializerMethodField() 38 session_id = SerializerMethodField() 39 40 def get_current_stage(self, plan: FlowPlan) -> FlowStageBindingSerializer: 41 """Get the current stage""" 42 return FlowStageBindingSerializer(instance=plan.bindings[0]).data 43 44 def get_next_planned_stage(self, plan: FlowPlan) -> FlowStageBindingSerializer: 45 """Get the next planned stage""" 46 if len(plan.bindings) < MIN_FLOW_LENGTH: 47 return FlowStageBindingSerializer().data 48 return FlowStageBindingSerializer(instance=plan.bindings[1]).data 49 50 def get_plan_context(self, plan: FlowPlan) -> dict[str, Any]: 51 """Get the plan's context, sanitized""" 52 return sanitize_dict(plan.context) 53 54 def get_session_id(self, _plan: FlowPlan) -> str: 55 """Get a unique session ID""" 56 request: Request = self.context["request"] 57 return sha256(request._request.session.session_key.encode("ascii")).hexdigest() 58 59 60class FlowInspectionSerializer(PassiveSerializer): 61 """Serializer for inspect endpoint""" 62 63 plans = ListField(child=FlowInspectorPlanSerializer()) 64 current_plan = FlowInspectorPlanSerializer(required=False) 65 is_completed = BooleanField() 66 67 68@method_decorator(xframe_options_sameorigin, name="dispatch") 69class FlowInspectorView(APIView): 70 """Flow inspector API""" 71 72 flow: Flow 73 _logger: BoundLogger 74 permission_classes = [IsAuthenticated] 75 76 def setup(self, request: HttpRequest, flow_slug: str): 77 super().setup(request, flow_slug=flow_slug) 78 self._logger = get_logger().bind(flow_slug=flow_slug) 79 self.flow = get_object_or_404(Flow.objects.select_related(), slug=flow_slug) 80 if settings.DEBUG: 81 return 82 if request.user.has_perm( 83 "authentik_flows.inspect_flow", self.flow 84 ) or request.user.has_perm("authentik_flows.inspect_flow"): 85 return 86 raise Http404 87 88 @extend_schema( 89 responses={ 90 200: FlowInspectionSerializer(), 91 400: OpenApiResponse(description="No flow plan in session."), 92 }, 93 request=OpenApiTypes.NONE, 94 operation_id="flows_inspector_get", 95 ) 96 def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 97 """Get current flow state and record it""" 98 plans = [] 99 for plan in request.session.get(SESSION_KEY_HISTORY, []): 100 plan: FlowPlan 101 if plan.flow_pk != self.flow.pk.hex: 102 continue 103 plan_serializer = FlowInspectorPlanSerializer( 104 instance=plan, context={"request": request} 105 ) 106 plans.append(plan_serializer.data) 107 is_completed = False 108 if SESSION_KEY_PLAN in request.session: 109 current_plan: FlowPlan = request.session[SESSION_KEY_PLAN] 110 else: 111 try: 112 current_plan = request.session.get(SESSION_KEY_HISTORY, [])[-1] 113 except IndexError: 114 return Response(status=400) 115 is_completed = True 116 current_serializer = FlowInspectorPlanSerializer( 117 instance=current_plan, context={"request": request} 118 ) 119 response = { 120 "plans": plans, 121 "current_plan": current_serializer.data, 122 "is_completed": is_completed, 123 } 124 return Response(response)
MIN_FLOW_LENGTH =
2
33class FlowInspectorPlanSerializer(PassiveSerializer): 34 """Serializer for an active FlowPlan""" 35 36 current_stage = SerializerMethodField() 37 next_planned_stage = SerializerMethodField(required=False) 38 plan_context = SerializerMethodField() 39 session_id = SerializerMethodField() 40 41 def get_current_stage(self, plan: FlowPlan) -> FlowStageBindingSerializer: 42 """Get the current stage""" 43 return FlowStageBindingSerializer(instance=plan.bindings[0]).data 44 45 def get_next_planned_stage(self, plan: FlowPlan) -> FlowStageBindingSerializer: 46 """Get the next planned stage""" 47 if len(plan.bindings) < MIN_FLOW_LENGTH: 48 return FlowStageBindingSerializer().data 49 return FlowStageBindingSerializer(instance=plan.bindings[1]).data 50 51 def get_plan_context(self, plan: FlowPlan) -> dict[str, Any]: 52 """Get the plan's context, sanitized""" 53 return sanitize_dict(plan.context) 54 55 def get_session_id(self, _plan: FlowPlan) -> str: 56 """Get a unique session ID""" 57 request: Request = self.context["request"] 58 return sha256(request._request.session.session_key.encode("ascii")).hexdigest()
Serializer for an active FlowPlan
def
get_current_stage( self, plan: authentik.flows.planner.FlowPlan) -> authentik.flows.api.bindings.FlowStageBindingSerializer:
41 def get_current_stage(self, plan: FlowPlan) -> FlowStageBindingSerializer: 42 """Get the current stage""" 43 return FlowStageBindingSerializer(instance=plan.bindings[0]).data
Get the current stage
def
get_next_planned_stage( self, plan: authentik.flows.planner.FlowPlan) -> authentik.flows.api.bindings.FlowStageBindingSerializer:
45 def get_next_planned_stage(self, plan: FlowPlan) -> FlowStageBindingSerializer: 46 """Get the next planned stage""" 47 if len(plan.bindings) < MIN_FLOW_LENGTH: 48 return FlowStageBindingSerializer().data 49 return FlowStageBindingSerializer(instance=plan.bindings[1]).data
Get the next planned stage
51 def get_plan_context(self, plan: FlowPlan) -> dict[str, Any]: 52 """Get the plan's context, sanitized""" 53 return sanitize_dict(plan.context)
Get the plan's context, sanitized
55 def get_session_id(self, _plan: FlowPlan) -> str: 56 """Get a unique session ID""" 57 request: Request = self.context["request"] 58 return sha256(request._request.session.session_key.encode("ascii")).hexdigest()
Get a unique session ID
Inherited Members
61class FlowInspectionSerializer(PassiveSerializer): 62 """Serializer for inspect endpoint""" 63 64 plans = ListField(child=FlowInspectorPlanSerializer()) 65 current_plan = FlowInspectorPlanSerializer(required=False) 66 is_completed = BooleanField()
Serializer for inspect endpoint
Inherited Members
@method_decorator(xframe_options_sameorigin, name='dispatch')
class
FlowInspectorView69@method_decorator(xframe_options_sameorigin, name="dispatch") 70class FlowInspectorView(APIView): 71 """Flow inspector API""" 72 73 flow: Flow 74 _logger: BoundLogger 75 permission_classes = [IsAuthenticated] 76 77 def setup(self, request: HttpRequest, flow_slug: str): 78 super().setup(request, flow_slug=flow_slug) 79 self._logger = get_logger().bind(flow_slug=flow_slug) 80 self.flow = get_object_or_404(Flow.objects.select_related(), slug=flow_slug) 81 if settings.DEBUG: 82 return 83 if request.user.has_perm( 84 "authentik_flows.inspect_flow", self.flow 85 ) or request.user.has_perm("authentik_flows.inspect_flow"): 86 return 87 raise Http404 88 89 @extend_schema( 90 responses={ 91 200: FlowInspectionSerializer(), 92 400: OpenApiResponse(description="No flow plan in session."), 93 }, 94 request=OpenApiTypes.NONE, 95 operation_id="flows_inspector_get", 96 ) 97 def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 98 """Get current flow state and record it""" 99 plans = [] 100 for plan in request.session.get(SESSION_KEY_HISTORY, []): 101 plan: FlowPlan 102 if plan.flow_pk != self.flow.pk.hex: 103 continue 104 plan_serializer = FlowInspectorPlanSerializer( 105 instance=plan, context={"request": request} 106 ) 107 plans.append(plan_serializer.data) 108 is_completed = False 109 if SESSION_KEY_PLAN in request.session: 110 current_plan: FlowPlan = request.session[SESSION_KEY_PLAN] 111 else: 112 try: 113 current_plan = request.session.get(SESSION_KEY_HISTORY, [])[-1] 114 except IndexError: 115 return Response(status=400) 116 is_completed = True 117 current_serializer = FlowInspectorPlanSerializer( 118 instance=current_plan, context={"request": request} 119 ) 120 response = { 121 "plans": plans, 122 "current_plan": current_serializer.data, 123 "is_completed": is_completed, 124 } 125 return Response(response)
Flow inspector API
def
setup(self, request: django.http.request.HttpRequest, flow_slug: str):
77 def setup(self, request: HttpRequest, flow_slug: str): 78 super().setup(request, flow_slug=flow_slug) 79 self._logger = get_logger().bind(flow_slug=flow_slug) 80 self.flow = get_object_or_404(Flow.objects.select_related(), slug=flow_slug) 81 if settings.DEBUG: 82 return 83 if request.user.has_perm( 84 "authentik_flows.inspect_flow", self.flow 85 ) or request.user.has_perm("authentik_flows.inspect_flow"): 86 return 87 raise Http404
Initialize attributes shared by all view methods.
@extend_schema(responses={200: FlowInspectionSerializer(), 400: OpenApiResponse(description='No flow plan in session.')}, request=OpenApiTypes.NONE, operation_id='flows_inspector_get')
def
get( self, request: django.http.request.HttpRequest, *args, **kwargs) -> django.http.response.HttpResponse:
89 @extend_schema( 90 responses={ 91 200: FlowInspectionSerializer(), 92 400: OpenApiResponse(description="No flow plan in session."), 93 }, 94 request=OpenApiTypes.NONE, 95 operation_id="flows_inspector_get", 96 ) 97 def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 98 """Get current flow state and record it""" 99 plans = [] 100 for plan in request.session.get(SESSION_KEY_HISTORY, []): 101 plan: FlowPlan 102 if plan.flow_pk != self.flow.pk.hex: 103 continue 104 plan_serializer = FlowInspectorPlanSerializer( 105 instance=plan, context={"request": request} 106 ) 107 plans.append(plan_serializer.data) 108 is_completed = False 109 if SESSION_KEY_PLAN in request.session: 110 current_plan: FlowPlan = request.session[SESSION_KEY_PLAN] 111 else: 112 try: 113 current_plan = request.session.get(SESSION_KEY_HISTORY, [])[-1] 114 except IndexError: 115 return Response(status=400) 116 is_completed = True 117 current_serializer = FlowInspectorPlanSerializer( 118 instance=current_plan, context={"request": request} 119 ) 120 response = { 121 "plans": plans, 122 "current_plan": current_serializer.data, 123 "is_completed": is_completed, 124 } 125 return Response(response)
Get current flow state and record it
def
dispatch(self, request, *args, **kwargs):
492 def dispatch(self, request, *args, **kwargs): 493 """ 494 `.dispatch()` is pretty much the same as Django's regular dispatch, 495 but with extra hooks for startup, finalize, and exception handling. 496 """ 497 self.args = args 498 self.kwargs = kwargs 499 request = self.initialize_request(request, *args, **kwargs) 500 self.request = request 501 self.headers = self.default_response_headers # deprecate? 502 503 try: 504 self.initial(request, *args, **kwargs) 505 506 # Get the appropriate handler method 507 if request.method.lower() in self.http_method_names: 508 handler = getattr(self, request.method.lower(), 509 self.http_method_not_allowed) 510 else: 511 handler = self.http_method_not_allowed 512 513 response = handler(request, *args, **kwargs) 514 515 except Exception as exc: 516 response = self.handle_exception(exc) 517 518 self.response = self.finalize_response(request, response, *args, **kwargs) 519 return self.response
.dispatch() is pretty much the same as Django's regular dispatch,
but with extra hooks for startup, finalize, and exception handling.