authentik.core.sources.stage

Source flow manager stages

 1"""Source flow manager stages"""
 2
 3from django.http import HttpRequest, HttpResponse
 4
 5from authentik.core.models import User, UserSourceConnection
 6from authentik.events.models import Event, EventAction
 7from authentik.flows.planner import PLAN_CONTEXT_PENDING_USER
 8from authentik.flows.stage import StageView
 9
10PLAN_CONTEXT_SOURCES_CONNECTION = "goauthentik.io/sources/connection"
11
12
13class PostSourceStage(StageView):
14    """Dynamically injected stage which saves the Connection after
15    the user has been enrolled."""
16
17    def dispatch(self, request: HttpRequest) -> HttpResponse:
18        """Stage used after the user has been enrolled"""
19        connection: UserSourceConnection = self.executor.plan.context[
20            PLAN_CONTEXT_SOURCES_CONNECTION
21        ]
22        user: User = self.executor.plan.context[PLAN_CONTEXT_PENDING_USER]
23        connection.user = user
24        linked = connection.pk is None
25        connection.save()
26        if linked:
27            Event.new(
28                EventAction.SOURCE_LINKED,
29                message="Linked Source",
30                source=connection.source,
31            ).from_http(self.request)
32        return self.executor.stage_ok()
PLAN_CONTEXT_SOURCES_CONNECTION = 'goauthentik.io/sources/connection'
class PostSourceStage(authentik.flows.stage.StageView):
14class PostSourceStage(StageView):
15    """Dynamically injected stage which saves the Connection after
16    the user has been enrolled."""
17
18    def dispatch(self, request: HttpRequest) -> HttpResponse:
19        """Stage used after the user has been enrolled"""
20        connection: UserSourceConnection = self.executor.plan.context[
21            PLAN_CONTEXT_SOURCES_CONNECTION
22        ]
23        user: User = self.executor.plan.context[PLAN_CONTEXT_PENDING_USER]
24        connection.user = user
25        linked = connection.pk is None
26        connection.save()
27        if linked:
28            Event.new(
29                EventAction.SOURCE_LINKED,
30                message="Linked Source",
31                source=connection.source,
32            ).from_http(self.request)
33        return self.executor.stage_ok()

Dynamically injected stage which saves the Connection after the user has been enrolled.

def dispatch( self, request: django.http.request.HttpRequest) -> django.http.response.HttpResponse:
18    def dispatch(self, request: HttpRequest) -> HttpResponse:
19        """Stage used after the user has been enrolled"""
20        connection: UserSourceConnection = self.executor.plan.context[
21            PLAN_CONTEXT_SOURCES_CONNECTION
22        ]
23        user: User = self.executor.plan.context[PLAN_CONTEXT_PENDING_USER]
24        connection.user = user
25        linked = connection.pk is None
26        connection.save()
27        if linked:
28            Event.new(
29                EventAction.SOURCE_LINKED,
30                message="Linked Source",
31                source=connection.source,
32            ).from_http(self.request)
33        return self.executor.stage_ok()

Stage used after the user has been enrolled