authentik.core.sources.stage
Source flow manager stages
1"""Source flow manager stages""" 2 3from django.http import HttpRequest, HttpResponse 4 5from authentik.core.models import User, UserSourceConnection 6from authentik.events.models import Event, EventAction 7from authentik.flows.planner import PLAN_CONTEXT_PENDING_USER 8from authentik.flows.stage import StageView 9 10PLAN_CONTEXT_SOURCES_CONNECTION = "goauthentik.io/sources/connection" 11 12 13class PostSourceStage(StageView): 14 """Dynamically injected stage which saves the Connection after 15 the user has been enrolled.""" 16 17 def dispatch(self, request: HttpRequest) -> HttpResponse: 18 """Stage used after the user has been enrolled""" 19 connection: UserSourceConnection = self.executor.plan.context[ 20 PLAN_CONTEXT_SOURCES_CONNECTION 21 ] 22 user: User = self.executor.plan.context[PLAN_CONTEXT_PENDING_USER] 23 connection.user = user 24 linked = connection.pk is None 25 connection.save() 26 if linked: 27 Event.new( 28 EventAction.SOURCE_LINKED, 29 message="Linked Source", 30 source=connection.source, 31 ).from_http(self.request) 32 return self.executor.stage_ok()
PLAN_CONTEXT_SOURCES_CONNECTION =
'goauthentik.io/sources/connection'
14class PostSourceStage(StageView): 15 """Dynamically injected stage which saves the Connection after 16 the user has been enrolled.""" 17 18 def dispatch(self, request: HttpRequest) -> HttpResponse: 19 """Stage used after the user has been enrolled""" 20 connection: UserSourceConnection = self.executor.plan.context[ 21 PLAN_CONTEXT_SOURCES_CONNECTION 22 ] 23 user: User = self.executor.plan.context[PLAN_CONTEXT_PENDING_USER] 24 connection.user = user 25 linked = connection.pk is None 26 connection.save() 27 if linked: 28 Event.new( 29 EventAction.SOURCE_LINKED, 30 message="Linked Source", 31 source=connection.source, 32 ).from_http(self.request) 33 return self.executor.stage_ok()
Dynamically injected stage which saves the Connection after the user has been enrolled.
def
dispatch( self, request: django.http.request.HttpRequest) -> django.http.response.HttpResponse:
18 def dispatch(self, request: HttpRequest) -> HttpResponse: 19 """Stage used after the user has been enrolled""" 20 connection: UserSourceConnection = self.executor.plan.context[ 21 PLAN_CONTEXT_SOURCES_CONNECTION 22 ] 23 user: User = self.executor.plan.context[PLAN_CONTEXT_PENDING_USER] 24 connection.user = user 25 linked = connection.pk is None 26 connection.save() 27 if linked: 28 Event.new( 29 EventAction.SOURCE_LINKED, 30 message="Linked Source", 31 source=connection.source, 32 ).from_http(self.request) 33 return self.executor.stage_ok()
Stage used after the user has been enrolled