authentik.flows.views.inspector
Flow Inspector
1"""Flow Inspector""" 2 3from hashlib import sha256 4from typing import Any 5 6from django.conf import settings 7from django.http import Http404 8from django.http.request import HttpRequest 9from django.http.response import HttpResponse 10from django.shortcuts import get_object_or_404 11from django.utils.decorators import method_decorator 12from django.views.decorators.clickjacking import xframe_options_sameorigin 13from drf_spectacular.types import OpenApiTypes 14from drf_spectacular.utils import OpenApiResponse, extend_schema 15from rest_framework.fields import BooleanField, ListField, SerializerMethodField 16from rest_framework.permissions import IsAuthenticated 17from rest_framework.request import Request 18from rest_framework.response import Response 19from rest_framework.views import APIView 20from structlog.stdlib import BoundLogger, get_logger 21 22from authentik.core.api.utils import PassiveSerializer 23from authentik.events.utils import sanitize_dict 24from authentik.flows.api.bindings import FlowStageBindingSerializer 25from authentik.flows.models import Flow 26from authentik.flows.planner import FlowPlan 27from authentik.flows.views.executor import SESSION_KEY_HISTORY, SESSION_KEY_PLAN 28 29MIN_FLOW_LENGTH = 2 30 31 32class FlowInspectorPlanSerializer(PassiveSerializer): 33 """Serializer for an active FlowPlan""" 34 35 current_stage = SerializerMethodField() 36 next_planned_stage = SerializerMethodField(required=False) 37 plan_context = SerializerMethodField() 38 session_id = SerializerMethodField() 39 40 def get_current_stage(self, plan: FlowPlan) -> FlowStageBindingSerializer: 41 """Get the current stage""" 42 return FlowStageBindingSerializer(instance=plan.bindings[0]).data 43 44 def get_next_planned_stage(self, plan: FlowPlan) -> FlowStageBindingSerializer: 45 """Get the next planned stage""" 46 if len(plan.bindings) < MIN_FLOW_LENGTH: 47 return FlowStageBindingSerializer().data 48 return FlowStageBindingSerializer(instance=plan.bindings[1]).data 49 50 def get_plan_context(self, plan: FlowPlan) -> dict[str, Any]: 51 """Get the plan's context, sanitized""" 52 return sanitize_dict(plan.context) 53 54 def get_session_id(self, _plan: FlowPlan) -> str: 55 """Get a unique session ID""" 56 request: Request = self.context["request"] 57 return sha256(request._request.session.session_key.encode("ascii")).hexdigest() 58 59 60class FlowInspectionSerializer(PassiveSerializer): 61 """Serializer for inspect endpoint""" 62 63 plans = ListField(child=FlowInspectorPlanSerializer()) 64 current_plan = FlowInspectorPlanSerializer(required=False) 65 is_completed = BooleanField() 66 67 68@method_decorator(xframe_options_sameorigin, name="dispatch") 69class FlowInspectorView(APIView): 70 """Flow inspector API""" 71 72 flow: Flow 73 _logger: BoundLogger 74 75 def get_permissions(self): 76 if settings.DEBUG: 77 return [] 78 return [IsAuthenticated()] 79 80 def setup(self, request: HttpRequest, flow_slug: str): 81 super().setup(request, flow_slug=flow_slug) 82 self._logger = get_logger().bind(flow_slug=flow_slug) 83 self.flow = get_object_or_404(Flow.objects.select_related(), slug=flow_slug) 84 if settings.DEBUG: 85 return 86 if request.user.has_perm( 87 "authentik_flows.inspect_flow", self.flow 88 ) or request.user.has_perm("authentik_flows.inspect_flow"): 89 return 90 raise Http404 91 92 @extend_schema( 93 responses={ 94 200: FlowInspectionSerializer(), 95 400: OpenApiResponse(description="No flow plan in session."), 96 }, 97 request=OpenApiTypes.NONE, 98 operation_id="flows_inspector_get", 99 ) 100 def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 101 """Get current flow state and record it""" 102 plans = [] 103 for plan in request.session.get(SESSION_KEY_HISTORY, []): 104 plan: FlowPlan 105 if plan.flow_pk != self.flow.pk.hex: 106 continue 107 plan_serializer = FlowInspectorPlanSerializer( 108 instance=plan, context={"request": request} 109 ) 110 plans.append(plan_serializer.data) 111 is_completed = False 112 if SESSION_KEY_PLAN in request.session: 113 current_plan: FlowPlan = request.session[SESSION_KEY_PLAN] 114 else: 115 try: 116 current_plan = request.session.get(SESSION_KEY_HISTORY, [])[-1] 117 except IndexError: 118 return Response(status=400) 119 is_completed = True 120 current_serializer = FlowInspectorPlanSerializer( 121 instance=current_plan, context={"request": request} 122 ) 123 response = { 124 "plans": plans, 125 "current_plan": current_serializer.data, 126 "is_completed": is_completed, 127 } 128 return Response(response)
MIN_FLOW_LENGTH =
2
33class FlowInspectorPlanSerializer(PassiveSerializer): 34 """Serializer for an active FlowPlan""" 35 36 current_stage = SerializerMethodField() 37 next_planned_stage = SerializerMethodField(required=False) 38 plan_context = SerializerMethodField() 39 session_id = SerializerMethodField() 40 41 def get_current_stage(self, plan: FlowPlan) -> FlowStageBindingSerializer: 42 """Get the current stage""" 43 return FlowStageBindingSerializer(instance=plan.bindings[0]).data 44 45 def get_next_planned_stage(self, plan: FlowPlan) -> FlowStageBindingSerializer: 46 """Get the next planned stage""" 47 if len(plan.bindings) < MIN_FLOW_LENGTH: 48 return FlowStageBindingSerializer().data 49 return FlowStageBindingSerializer(instance=plan.bindings[1]).data 50 51 def get_plan_context(self, plan: FlowPlan) -> dict[str, Any]: 52 """Get the plan's context, sanitized""" 53 return sanitize_dict(plan.context) 54 55 def get_session_id(self, _plan: FlowPlan) -> str: 56 """Get a unique session ID""" 57 request: Request = self.context["request"] 58 return sha256(request._request.session.session_key.encode("ascii")).hexdigest()
Serializer for an active FlowPlan
def
get_current_stage( self, plan: authentik.flows.planner.FlowPlan) -> authentik.flows.api.bindings.FlowStageBindingSerializer:
41 def get_current_stage(self, plan: FlowPlan) -> FlowStageBindingSerializer: 42 """Get the current stage""" 43 return FlowStageBindingSerializer(instance=plan.bindings[0]).data
Get the current stage
def
get_next_planned_stage( self, plan: authentik.flows.planner.FlowPlan) -> authentik.flows.api.bindings.FlowStageBindingSerializer:
45 def get_next_planned_stage(self, plan: FlowPlan) -> FlowStageBindingSerializer: 46 """Get the next planned stage""" 47 if len(plan.bindings) < MIN_FLOW_LENGTH: 48 return FlowStageBindingSerializer().data 49 return FlowStageBindingSerializer(instance=plan.bindings[1]).data
Get the next planned stage
51 def get_plan_context(self, plan: FlowPlan) -> dict[str, Any]: 52 """Get the plan's context, sanitized""" 53 return sanitize_dict(plan.context)
Get the plan's context, sanitized
55 def get_session_id(self, _plan: FlowPlan) -> str: 56 """Get a unique session ID""" 57 request: Request = self.context["request"] 58 return sha256(request._request.session.session_key.encode("ascii")).hexdigest()
Get a unique session ID
Inherited Members
61class FlowInspectionSerializer(PassiveSerializer): 62 """Serializer for inspect endpoint""" 63 64 plans = ListField(child=FlowInspectorPlanSerializer()) 65 current_plan = FlowInspectorPlanSerializer(required=False) 66 is_completed = BooleanField()
Serializer for inspect endpoint
Inherited Members
@method_decorator(xframe_options_sameorigin, name='dispatch')
class
FlowInspectorView69@method_decorator(xframe_options_sameorigin, name="dispatch") 70class FlowInspectorView(APIView): 71 """Flow inspector API""" 72 73 flow: Flow 74 _logger: BoundLogger 75 76 def get_permissions(self): 77 if settings.DEBUG: 78 return [] 79 return [IsAuthenticated()] 80 81 def setup(self, request: HttpRequest, flow_slug: str): 82 super().setup(request, flow_slug=flow_slug) 83 self._logger = get_logger().bind(flow_slug=flow_slug) 84 self.flow = get_object_or_404(Flow.objects.select_related(), slug=flow_slug) 85 if settings.DEBUG: 86 return 87 if request.user.has_perm( 88 "authentik_flows.inspect_flow", self.flow 89 ) or request.user.has_perm("authentik_flows.inspect_flow"): 90 return 91 raise Http404 92 93 @extend_schema( 94 responses={ 95 200: FlowInspectionSerializer(), 96 400: OpenApiResponse(description="No flow plan in session."), 97 }, 98 request=OpenApiTypes.NONE, 99 operation_id="flows_inspector_get", 100 ) 101 def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 102 """Get current flow state and record it""" 103 plans = [] 104 for plan in request.session.get(SESSION_KEY_HISTORY, []): 105 plan: FlowPlan 106 if plan.flow_pk != self.flow.pk.hex: 107 continue 108 plan_serializer = FlowInspectorPlanSerializer( 109 instance=plan, context={"request": request} 110 ) 111 plans.append(plan_serializer.data) 112 is_completed = False 113 if SESSION_KEY_PLAN in request.session: 114 current_plan: FlowPlan = request.session[SESSION_KEY_PLAN] 115 else: 116 try: 117 current_plan = request.session.get(SESSION_KEY_HISTORY, [])[-1] 118 except IndexError: 119 return Response(status=400) 120 is_completed = True 121 current_serializer = FlowInspectorPlanSerializer( 122 instance=current_plan, context={"request": request} 123 ) 124 response = { 125 "plans": plans, 126 "current_plan": current_serializer.data, 127 "is_completed": is_completed, 128 } 129 return Response(response)
Flow inspector API
def
get_permissions(self):
Instantiates and returns the list of permissions that this view requires.
def
setup(self, request: django.http.request.HttpRequest, flow_slug: str):
81 def setup(self, request: HttpRequest, flow_slug: str): 82 super().setup(request, flow_slug=flow_slug) 83 self._logger = get_logger().bind(flow_slug=flow_slug) 84 self.flow = get_object_or_404(Flow.objects.select_related(), slug=flow_slug) 85 if settings.DEBUG: 86 return 87 if request.user.has_perm( 88 "authentik_flows.inspect_flow", self.flow 89 ) or request.user.has_perm("authentik_flows.inspect_flow"): 90 return 91 raise Http404
Initialize attributes shared by all view methods.
@extend_schema(responses={200: FlowInspectionSerializer(), 400: OpenApiResponse(description='No flow plan in session.')}, request=OpenApiTypes.NONE, operation_id='flows_inspector_get')
def
get( self, request: django.http.request.HttpRequest, *args, **kwargs) -> django.http.response.HttpResponse:
93 @extend_schema( 94 responses={ 95 200: FlowInspectionSerializer(), 96 400: OpenApiResponse(description="No flow plan in session."), 97 }, 98 request=OpenApiTypes.NONE, 99 operation_id="flows_inspector_get", 100 ) 101 def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 102 """Get current flow state and record it""" 103 plans = [] 104 for plan in request.session.get(SESSION_KEY_HISTORY, []): 105 plan: FlowPlan 106 if plan.flow_pk != self.flow.pk.hex: 107 continue 108 plan_serializer = FlowInspectorPlanSerializer( 109 instance=plan, context={"request": request} 110 ) 111 plans.append(plan_serializer.data) 112 is_completed = False 113 if SESSION_KEY_PLAN in request.session: 114 current_plan: FlowPlan = request.session[SESSION_KEY_PLAN] 115 else: 116 try: 117 current_plan = request.session.get(SESSION_KEY_HISTORY, [])[-1] 118 except IndexError: 119 return Response(status=400) 120 is_completed = True 121 current_serializer = FlowInspectorPlanSerializer( 122 instance=current_plan, context={"request": request} 123 ) 124 response = { 125 "plans": plans, 126 "current_plan": current_serializer.data, 127 "is_completed": is_completed, 128 } 129 return Response(response)
Get current flow state and record it
def
dispatch(self, request, *args, **kwargs):
492 def dispatch(self, request, *args, **kwargs): 493 """ 494 `.dispatch()` is pretty much the same as Django's regular dispatch, 495 but with extra hooks for startup, finalize, and exception handling. 496 """ 497 self.args = args 498 self.kwargs = kwargs 499 request = self.initialize_request(request, *args, **kwargs) 500 self.request = request 501 self.headers = self.default_response_headers # deprecate? 502 503 try: 504 self.initial(request, *args, **kwargs) 505 506 # Get the appropriate handler method 507 if request.method.lower() in self.http_method_names: 508 handler = getattr(self, request.method.lower(), 509 self.http_method_not_allowed) 510 else: 511 handler = self.http_method_not_allowed 512 513 response = handler(request, *args, **kwargs) 514 515 except Exception as exc: 516 response = self.handle_exception(exc) 517 518 self.response = self.finalize_response(request, response, *args, **kwargs) 519 return self.response
.dispatch() is pretty much the same as Django's regular dispatch,
but with extra hooks for startup, finalize, and exception handling.