authentik.endpoints.stage

 1from authentik.endpoints.models import Connector, EndpointStage, StageMode
 2from authentik.flows.stage import StageView
 3
 4PLAN_CONTEXT_ENDPOINT_CONNECTOR = "endpoint_connector"
 5
 6
 7class EndpointStageView(StageView):
 8
 9    def _get_inner(self) -> StageView | None:
10        stage: EndpointStage = self.executor.current_stage
11        connector: Connector = stage.connector
12        if not connector.enabled:
13            return None
14        inner_stage: type[StageView] | None = connector.stage
15        if not inner_stage:
16            return None
17        return inner_stage(self.executor, request=self.request)
18
19    def dispatch(self, request, *args, **kwargs):
20        inner = self._get_inner()
21        if inner is None:
22            stage: EndpointStage = self.executor.current_stage
23            if stage.mode == StageMode.OPTIONAL:
24                return self.executor.stage_ok()
25            else:
26                return self.executor.stage_invalid("Invalid stage configuration")
27        return inner.dispatch(request, *args, **kwargs)
28
29    def cleanup(self):
30        inner = self._get_inner()
31        if inner is not None:
32            return inner.cleanup()
PLAN_CONTEXT_ENDPOINT_CONNECTOR = 'endpoint_connector'
class EndpointStageView(authentik.flows.stage.StageView):
 8class EndpointStageView(StageView):
 9
10    def _get_inner(self) -> StageView | None:
11        stage: EndpointStage = self.executor.current_stage
12        connector: Connector = stage.connector
13        if not connector.enabled:
14            return None
15        inner_stage: type[StageView] | None = connector.stage
16        if not inner_stage:
17            return None
18        return inner_stage(self.executor, request=self.request)
19
20    def dispatch(self, request, *args, **kwargs):
21        inner = self._get_inner()
22        if inner is None:
23            stage: EndpointStage = self.executor.current_stage
24            if stage.mode == StageMode.OPTIONAL:
25                return self.executor.stage_ok()
26            else:
27                return self.executor.stage_invalid("Invalid stage configuration")
28        return inner.dispatch(request, *args, **kwargs)
29
30    def cleanup(self):
31        inner = self._get_inner()
32        if inner is not None:
33            return inner.cleanup()

Abstract Stage

def dispatch(self, request, *args, **kwargs):
20    def dispatch(self, request, *args, **kwargs):
21        inner = self._get_inner()
22        if inner is None:
23            stage: EndpointStage = self.executor.current_stage
24            if stage.mode == StageMode.OPTIONAL:
25                return self.executor.stage_ok()
26            else:
27                return self.executor.stage_invalid("Invalid stage configuration")
28        return inner.dispatch(request, *args, **kwargs)
def cleanup(self):
30    def cleanup(self):
31        inner = self._get_inner()
32        if inner is not None:
33            return inner.cleanup()

Cleanup session