authentik.core.sources.flow_manager
Source decision helper
1"""Source decision helper""" 2 3from typing import Any 4 5from django.contrib import messages 6from django.db import IntegrityError, transaction 7from django.http import HttpRequest, HttpResponse 8from django.shortcuts import redirect 9from django.urls import reverse 10from django.utils.translation import gettext as _ 11from structlog.stdlib import get_logger 12 13from authentik.core.models import ( 14 Group, 15 GroupSourceConnection, 16 Source, 17 User, 18 UserSourceConnection, 19) 20from authentik.core.sources.mapper import SourceMapper 21from authentik.core.sources.matcher import Action, SourceMatcher 22from authentik.core.sources.stage import ( 23 PLAN_CONTEXT_SOURCES_CONNECTION, 24 PostSourceStage, 25) 26from authentik.events.models import Event, EventAction 27from authentik.flows.exceptions import FlowNonApplicableException 28from authentik.flows.models import Flow, FlowToken, Stage, in_memory_stage 29from authentik.flows.planner import ( 30 PLAN_CONTEXT_IS_RESTORED, 31 PLAN_CONTEXT_PENDING_USER, 32 PLAN_CONTEXT_REDIRECT, 33 PLAN_CONTEXT_SOURCE, 34 PLAN_CONTEXT_SSO, 35 FlowPlanner, 36) 37from authentik.flows.stage import StageView 38from authentik.flows.views.executor import NEXT_ARG_NAME, SESSION_KEY_GET 39from authentik.lib.views import bad_request_message 40from authentik.policies.denied import AccessDeniedResponse 41from authentik.policies.utils import delete_none_values 42from authentik.stages.password import BACKEND_INBUILT 43from authentik.stages.password.stage import PLAN_CONTEXT_AUTHENTICATION_BACKEND 44from authentik.stages.prompt.stage import PLAN_CONTEXT_PROMPT 45from authentik.stages.user_write.stage import PLAN_CONTEXT_USER_PATH 46 47LOGGER = get_logger() 48 49PLAN_CONTEXT_SOURCE_GROUPS = "source_groups" 50SESSION_KEY_SOURCE_FLOW_STAGES = "authentik/flows/source_flow_stages" 51SESSION_KEY_SOURCE_FLOW_CONTEXT = "authentik/flows/source_flow_context" 52SESSION_KEY_OVERRIDE_FLOW_TOKEN = "authentik/flows/source_override_flow_token" # nosec 53 54 55class MessageStage(StageView): 56 """Show a pre-configured message after the flow is done""" 57 58 def dispatch(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 59 """Show a pre-configured message after the flow is done""" 60 message = getattr(self.executor.current_stage, "message", "") 61 level = getattr(self.executor.current_stage, "level", messages.SUCCESS) 62 messages.add_message( 63 self.request, 64 level, 65 message, 66 ) 67 return self.executor.stage_ok() 68 69 70class SourceFlowManager: 71 """Help sources decide what they should do after authorization. Based on source settings and 72 previous connections, authenticate the user, enroll a new user, link to an existing user 73 or deny the request.""" 74 75 source: Source 76 mapper: SourceMapper 77 matcher: SourceMatcher 78 request: HttpRequest 79 80 identifier: str 81 82 user_connection_type: type[UserSourceConnection] 83 group_connection_type: type[GroupSourceConnection] 84 85 user_info: dict[str, Any] 86 policy_context: dict[str, Any] 87 user_properties: dict[str, Any | dict[str, Any]] 88 groups_properties: dict[str, dict[str, Any | dict[str, Any]]] 89 90 def __init__( 91 self, 92 source: Source, 93 request: HttpRequest, 94 identifier: str, 95 user_info: dict[str, Any], 96 policy_context: dict[str, Any], 97 ) -> None: 98 self.source = source 99 self.mapper = SourceMapper(self.source) 100 self.matcher = SourceMatcher( 101 self.source, self.user_connection_type, self.group_connection_type 102 ) 103 self.request = request 104 self.identifier = identifier 105 self.user_info = user_info 106 self._logger = get_logger().bind(source=source, identifier=identifier) 107 self.policy_context = policy_context 108 109 self.user_properties = self.mapper.build_object_properties( 110 object_type=User, request=request, user=None, **self.user_info 111 ) 112 self.groups_properties = { 113 group_id: self.mapper.build_object_properties( 114 object_type=Group, 115 request=request, 116 user=None, 117 group_id=group_id, 118 **self.user_info, 119 ) 120 for group_id in self.user_properties.setdefault("groups", []) 121 } 122 del self.user_properties["groups"] 123 124 def get_action(self, **kwargs) -> tuple[Action, UserSourceConnection | None]: # noqa: PLR0911 125 """decide which action should be taken""" 126 # When request is authenticated, always link 127 if self.request.user.is_authenticated: 128 new_connection = self.user_connection_type( 129 source=self.source, identifier=self.identifier 130 ) 131 new_connection.user = self.request.user 132 new_connection = self.update_user_connection(new_connection, **kwargs) 133 if existing := self.user_connection_type.objects.filter( 134 source=self.source, identifier=self.identifier 135 ).first(): 136 existing = self.update_user_connection(existing) 137 return Action.AUTH, existing 138 return Action.LINK, new_connection 139 140 action, connection = self.matcher.get_user_action(self.identifier, self.user_properties) 141 if connection: 142 connection = self.update_user_connection(connection, **kwargs) 143 return action, connection 144 145 def update_user_connection( 146 self, connection: UserSourceConnection, **kwargs 147 ) -> UserSourceConnection: # pragma: no cover 148 """Optionally make changes to the user connection after it is looked up/created.""" 149 return connection 150 151 def get_flow(self, **kwargs) -> HttpResponse: 152 """Get the flow response based on user_matching_mode""" 153 try: 154 action, connection = self.get_action(**kwargs) 155 except IntegrityError as exc: 156 self._logger.warning("failed to get action", exc=exc) 157 return redirect(reverse("authentik_core:root-redirect")) 158 self._logger.debug("get_action", action=action, connection=connection) 159 try: 160 if connection: 161 if action == Action.LINK: 162 self._logger.debug("Linking existing user") 163 return self.handle_existing_link(connection) 164 if action == Action.AUTH: 165 self._logger.debug("Handling auth user") 166 return self.handle_auth(connection) 167 if action == Action.ENROLL: 168 self._logger.debug("Handling enrollment of new user") 169 return self.handle_enroll(connection) 170 except FlowNonApplicableException as exc: 171 self._logger.warning("Flow non applicable", exc=exc) 172 return self.error_handler(exc) 173 # Default case, assume deny 174 error = Exception( 175 _( 176 "Request to authenticate with {source} has been denied. Please authenticate " 177 "with the source you've previously signed up with.".format_map( 178 {"source": self.source.name} 179 ) 180 ), 181 ) 182 return self.error_handler(error) 183 184 def error_handler(self, error: Exception) -> HttpResponse: 185 """Handle any errors by returning an access denied stage""" 186 response = AccessDeniedResponse(self.request) 187 response.error_message = str(error) 188 if isinstance(error, FlowNonApplicableException): 189 response.policy_result = error.policy_result 190 response.error_message = error.messages 191 return response 192 193 def get_stages_to_append(self, flow: Flow) -> list[Stage]: 194 """Hook to override stages which are appended to the flow""" 195 return [ 196 in_memory_stage(PostSourceStage), 197 ] 198 199 def _prepare_flow( 200 self, 201 flow: Flow | None, 202 connection: UserSourceConnection, 203 stages: list[StageView] | None = None, 204 **flow_context, 205 ) -> HttpResponse: 206 """Prepare Authentication Plan, redirect user FlowExecutor""" 207 # Ensure redirect is carried through when user was trying to 208 # authorize application 209 final_redirect = self.request.session.get(SESSION_KEY_GET, {}).get( 210 NEXT_ARG_NAME, "authentik_core:if-user" 211 ) 212 flow_context.update( 213 { 214 # Since we authenticate the user by their token, they have no backend set 215 PLAN_CONTEXT_AUTHENTICATION_BACKEND: BACKEND_INBUILT, 216 PLAN_CONTEXT_SSO: True, 217 PLAN_CONTEXT_SOURCE: self.source, 218 PLAN_CONTEXT_SOURCES_CONNECTION: connection, 219 PLAN_CONTEXT_SOURCE_GROUPS: self.groups_properties, 220 } 221 ) 222 flow_context.update(self.policy_context) 223 flow_context.setdefault(PLAN_CONTEXT_REDIRECT, final_redirect) 224 225 if not flow: 226 # We only check for the flow token here if we don't have a flow, otherwise we rely on 227 # SESSION_KEY_SOURCE_FLOW_STAGES to delegate the usage of this token and dynamically add 228 # stages that deal with this token to return to another flow 229 if SESSION_KEY_OVERRIDE_FLOW_TOKEN in self.request.session: 230 token: FlowToken = self.request.session.get(SESSION_KEY_OVERRIDE_FLOW_TOKEN) 231 self._logger.info( 232 "Replacing source flow with overridden flow", flow=token.flow.slug 233 ) 234 plan = token.plan 235 plan.context[PLAN_CONTEXT_IS_RESTORED] = token 236 plan.context.update(flow_context) 237 for stage in self.get_stages_to_append(flow): 238 plan.append_stage(stage) 239 if stages: 240 for stage in stages: 241 plan.append_stage(stage) 242 redirect = plan.to_redirect(self.request, token.flow) 243 token.delete() 244 return redirect 245 return bad_request_message( 246 self.request, 247 _("Configured flow does not exist."), 248 ) 249 # We run the Flow planner here so we can pass the Pending user in the context 250 planner = FlowPlanner(flow) 251 # We append some stages so the initial flow we get might be empty 252 planner.allow_empty_flows = True 253 planner.use_cache = False 254 plan = planner.plan(self.request, flow_context) 255 for stage in self.get_stages_to_append(flow): 256 plan.append_stage(stage) 257 plan.append_stage( 258 in_memory_stage(GroupUpdateStage, group_connection_type=self.group_connection_type) 259 ) 260 if stages: 261 for stage in stages: 262 plan.append_stage(stage) 263 for stage in self.request.session.get(SESSION_KEY_SOURCE_FLOW_STAGES, []): 264 plan.append_stage(stage) 265 plan.context.update(self.request.session.get(SESSION_KEY_SOURCE_FLOW_CONTEXT, {})) 266 return plan.to_redirect(self.request, flow) 267 268 def handle_auth( 269 self, 270 connection: UserSourceConnection, 271 ) -> HttpResponse: 272 """Login user and redirect.""" 273 return self._prepare_flow( 274 self.source.authentication_flow, 275 connection, 276 stages=[ 277 in_memory_stage( 278 MessageStage, 279 message=_( 280 "Successfully authenticated with {source}!".format_map( 281 {"source": self.source.name} 282 ) 283 ), 284 ) 285 ], 286 **{ 287 PLAN_CONTEXT_PENDING_USER: connection.user, 288 PLAN_CONTEXT_PROMPT: delete_none_values(self.user_properties), 289 PLAN_CONTEXT_USER_PATH: self.source.get_user_path(), 290 }, 291 ) 292 293 def handle_existing_link( 294 self, 295 connection: UserSourceConnection, 296 ) -> HttpResponse: 297 """Handler when the user was already authenticated and linked an external source 298 to their account.""" 299 # When request isn't authenticated we jump straight to auth 300 if not self.request.user.is_authenticated: 301 return self.handle_auth(connection) 302 # When an override flow token exists we actually still use a flow for link 303 # to continue the existing flow we came from 304 if SESSION_KEY_OVERRIDE_FLOW_TOKEN in self.request.session: 305 return self._prepare_flow(None, connection) 306 connection.save() 307 Event.new( 308 EventAction.SOURCE_LINKED, 309 message="Linked Source", 310 source=self.source, 311 ).from_http(self.request) 312 messages.success( 313 self.request, 314 _("Successfully linked {source}!".format_map({"source": self.source.name})), 315 ) 316 return redirect( 317 reverse( 318 "authentik_core:if-user", 319 ) 320 + "#/settings;page-sources" 321 ) 322 323 def handle_enroll( 324 self, 325 connection: UserSourceConnection, 326 ) -> HttpResponse: 327 """User was not authenticated and previous request was not authenticated.""" 328 # We run the Flow planner here so we can pass the Pending user in the context 329 if not self.source.enrollment_flow: 330 self._logger.warning("source has no enrollment flow") 331 return bad_request_message( 332 self.request, 333 _("Source is not configured for enrollment."), 334 ) 335 return self._prepare_flow( 336 self.source.enrollment_flow, 337 connection, 338 stages=[ 339 in_memory_stage( 340 MessageStage, 341 message=_( 342 "Successfully authenticated with {source}!".format_map( 343 {"source": self.source.name} 344 ) 345 ), 346 ) 347 ], 348 **{ 349 PLAN_CONTEXT_PROMPT: delete_none_values(self.user_properties), 350 PLAN_CONTEXT_USER_PATH: self.source.get_user_path(), 351 }, 352 ) 353 354 355class GroupUpdateStage(StageView): 356 """Dynamically injected stage which updates the user after enrollment/authentication.""" 357 358 def handle_group( 359 self, group_id: str, group_properties: dict[str, Any | dict[str, Any]] 360 ) -> Group | None: 361 action, connection = self.matcher.get_group_action(group_id, group_properties) 362 if action == Action.ENROLL: 363 group = Group.objects.create(**group_properties) 364 connection.group = group 365 connection.save() 366 return group 367 elif action in (Action.LINK, Action.AUTH): 368 group = connection.group 369 group.update_attributes(group_properties) 370 connection.save() 371 return group 372 373 return None 374 375 def handle_groups(self) -> bool: 376 self.source: Source = self.executor.plan.context[PLAN_CONTEXT_SOURCE] 377 self.user: User = self.executor.plan.context[PLAN_CONTEXT_PENDING_USER] 378 self.group_connection_type: GroupSourceConnection = ( 379 self.executor.current_stage.group_connection_type 380 ) 381 self.matcher = SourceMatcher(self.source, None, self.group_connection_type) 382 383 raw_groups: dict[str, dict[str, Any | dict[str, Any]]] = self.executor.plan.context[ 384 PLAN_CONTEXT_SOURCE_GROUPS 385 ] 386 groups: list[Group] = [] 387 388 for group_id, group_properties in raw_groups.items(): 389 group = self.handle_group(group_id, group_properties) 390 if not group: 391 return False 392 groups.append(group) 393 394 with transaction.atomic(): 395 self.user.groups.remove( 396 *self.user.groups.filter(groupsourceconnection__source=self.source) 397 ) 398 self.user.groups.add(*groups) 399 400 return True 401 402 def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 403 """Stage used after the user has been enrolled to sync their groups from source data""" 404 if self.handle_groups(): 405 return self.executor.stage_ok() 406 else: 407 return self.executor.stage_invalid("Failed to update groups. Please try again later.") 408 409 def post(self, request: HttpRequest) -> HttpResponse: 410 """Wrapper for post requests""" 411 return self.get(request)
56class MessageStage(StageView): 57 """Show a pre-configured message after the flow is done""" 58 59 def dispatch(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 60 """Show a pre-configured message after the flow is done""" 61 message = getattr(self.executor.current_stage, "message", "") 62 level = getattr(self.executor.current_stage, "level", messages.SUCCESS) 63 messages.add_message( 64 self.request, 65 level, 66 message, 67 ) 68 return self.executor.stage_ok()
Show a pre-configured message after the flow is done
59 def dispatch(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 60 """Show a pre-configured message after the flow is done""" 61 message = getattr(self.executor.current_stage, "message", "") 62 level = getattr(self.executor.current_stage, "level", messages.SUCCESS) 63 messages.add_message( 64 self.request, 65 level, 66 message, 67 ) 68 return self.executor.stage_ok()
Show a pre-configured message after the flow is done
Inherited Members
71class SourceFlowManager: 72 """Help sources decide what they should do after authorization. Based on source settings and 73 previous connections, authenticate the user, enroll a new user, link to an existing user 74 or deny the request.""" 75 76 source: Source 77 mapper: SourceMapper 78 matcher: SourceMatcher 79 request: HttpRequest 80 81 identifier: str 82 83 user_connection_type: type[UserSourceConnection] 84 group_connection_type: type[GroupSourceConnection] 85 86 user_info: dict[str, Any] 87 policy_context: dict[str, Any] 88 user_properties: dict[str, Any | dict[str, Any]] 89 groups_properties: dict[str, dict[str, Any | dict[str, Any]]] 90 91 def __init__( 92 self, 93 source: Source, 94 request: HttpRequest, 95 identifier: str, 96 user_info: dict[str, Any], 97 policy_context: dict[str, Any], 98 ) -> None: 99 self.source = source 100 self.mapper = SourceMapper(self.source) 101 self.matcher = SourceMatcher( 102 self.source, self.user_connection_type, self.group_connection_type 103 ) 104 self.request = request 105 self.identifier = identifier 106 self.user_info = user_info 107 self._logger = get_logger().bind(source=source, identifier=identifier) 108 self.policy_context = policy_context 109 110 self.user_properties = self.mapper.build_object_properties( 111 object_type=User, request=request, user=None, **self.user_info 112 ) 113 self.groups_properties = { 114 group_id: self.mapper.build_object_properties( 115 object_type=Group, 116 request=request, 117 user=None, 118 group_id=group_id, 119 **self.user_info, 120 ) 121 for group_id in self.user_properties.setdefault("groups", []) 122 } 123 del self.user_properties["groups"] 124 125 def get_action(self, **kwargs) -> tuple[Action, UserSourceConnection | None]: # noqa: PLR0911 126 """decide which action should be taken""" 127 # When request is authenticated, always link 128 if self.request.user.is_authenticated: 129 new_connection = self.user_connection_type( 130 source=self.source, identifier=self.identifier 131 ) 132 new_connection.user = self.request.user 133 new_connection = self.update_user_connection(new_connection, **kwargs) 134 if existing := self.user_connection_type.objects.filter( 135 source=self.source, identifier=self.identifier 136 ).first(): 137 existing = self.update_user_connection(existing) 138 return Action.AUTH, existing 139 return Action.LINK, new_connection 140 141 action, connection = self.matcher.get_user_action(self.identifier, self.user_properties) 142 if connection: 143 connection = self.update_user_connection(connection, **kwargs) 144 return action, connection 145 146 def update_user_connection( 147 self, connection: UserSourceConnection, **kwargs 148 ) -> UserSourceConnection: # pragma: no cover 149 """Optionally make changes to the user connection after it is looked up/created.""" 150 return connection 151 152 def get_flow(self, **kwargs) -> HttpResponse: 153 """Get the flow response based on user_matching_mode""" 154 try: 155 action, connection = self.get_action(**kwargs) 156 except IntegrityError as exc: 157 self._logger.warning("failed to get action", exc=exc) 158 return redirect(reverse("authentik_core:root-redirect")) 159 self._logger.debug("get_action", action=action, connection=connection) 160 try: 161 if connection: 162 if action == Action.LINK: 163 self._logger.debug("Linking existing user") 164 return self.handle_existing_link(connection) 165 if action == Action.AUTH: 166 self._logger.debug("Handling auth user") 167 return self.handle_auth(connection) 168 if action == Action.ENROLL: 169 self._logger.debug("Handling enrollment of new user") 170 return self.handle_enroll(connection) 171 except FlowNonApplicableException as exc: 172 self._logger.warning("Flow non applicable", exc=exc) 173 return self.error_handler(exc) 174 # Default case, assume deny 175 error = Exception( 176 _( 177 "Request to authenticate with {source} has been denied. Please authenticate " 178 "with the source you've previously signed up with.".format_map( 179 {"source": self.source.name} 180 ) 181 ), 182 ) 183 return self.error_handler(error) 184 185 def error_handler(self, error: Exception) -> HttpResponse: 186 """Handle any errors by returning an access denied stage""" 187 response = AccessDeniedResponse(self.request) 188 response.error_message = str(error) 189 if isinstance(error, FlowNonApplicableException): 190 response.policy_result = error.policy_result 191 response.error_message = error.messages 192 return response 193 194 def get_stages_to_append(self, flow: Flow) -> list[Stage]: 195 """Hook to override stages which are appended to the flow""" 196 return [ 197 in_memory_stage(PostSourceStage), 198 ] 199 200 def _prepare_flow( 201 self, 202 flow: Flow | None, 203 connection: UserSourceConnection, 204 stages: list[StageView] | None = None, 205 **flow_context, 206 ) -> HttpResponse: 207 """Prepare Authentication Plan, redirect user FlowExecutor""" 208 # Ensure redirect is carried through when user was trying to 209 # authorize application 210 final_redirect = self.request.session.get(SESSION_KEY_GET, {}).get( 211 NEXT_ARG_NAME, "authentik_core:if-user" 212 ) 213 flow_context.update( 214 { 215 # Since we authenticate the user by their token, they have no backend set 216 PLAN_CONTEXT_AUTHENTICATION_BACKEND: BACKEND_INBUILT, 217 PLAN_CONTEXT_SSO: True, 218 PLAN_CONTEXT_SOURCE: self.source, 219 PLAN_CONTEXT_SOURCES_CONNECTION: connection, 220 PLAN_CONTEXT_SOURCE_GROUPS: self.groups_properties, 221 } 222 ) 223 flow_context.update(self.policy_context) 224 flow_context.setdefault(PLAN_CONTEXT_REDIRECT, final_redirect) 225 226 if not flow: 227 # We only check for the flow token here if we don't have a flow, otherwise we rely on 228 # SESSION_KEY_SOURCE_FLOW_STAGES to delegate the usage of this token and dynamically add 229 # stages that deal with this token to return to another flow 230 if SESSION_KEY_OVERRIDE_FLOW_TOKEN in self.request.session: 231 token: FlowToken = self.request.session.get(SESSION_KEY_OVERRIDE_FLOW_TOKEN) 232 self._logger.info( 233 "Replacing source flow with overridden flow", flow=token.flow.slug 234 ) 235 plan = token.plan 236 plan.context[PLAN_CONTEXT_IS_RESTORED] = token 237 plan.context.update(flow_context) 238 for stage in self.get_stages_to_append(flow): 239 plan.append_stage(stage) 240 if stages: 241 for stage in stages: 242 plan.append_stage(stage) 243 redirect = plan.to_redirect(self.request, token.flow) 244 token.delete() 245 return redirect 246 return bad_request_message( 247 self.request, 248 _("Configured flow does not exist."), 249 ) 250 # We run the Flow planner here so we can pass the Pending user in the context 251 planner = FlowPlanner(flow) 252 # We append some stages so the initial flow we get might be empty 253 planner.allow_empty_flows = True 254 planner.use_cache = False 255 plan = planner.plan(self.request, flow_context) 256 for stage in self.get_stages_to_append(flow): 257 plan.append_stage(stage) 258 plan.append_stage( 259 in_memory_stage(GroupUpdateStage, group_connection_type=self.group_connection_type) 260 ) 261 if stages: 262 for stage in stages: 263 plan.append_stage(stage) 264 for stage in self.request.session.get(SESSION_KEY_SOURCE_FLOW_STAGES, []): 265 plan.append_stage(stage) 266 plan.context.update(self.request.session.get(SESSION_KEY_SOURCE_FLOW_CONTEXT, {})) 267 return plan.to_redirect(self.request, flow) 268 269 def handle_auth( 270 self, 271 connection: UserSourceConnection, 272 ) -> HttpResponse: 273 """Login user and redirect.""" 274 return self._prepare_flow( 275 self.source.authentication_flow, 276 connection, 277 stages=[ 278 in_memory_stage( 279 MessageStage, 280 message=_( 281 "Successfully authenticated with {source}!".format_map( 282 {"source": self.source.name} 283 ) 284 ), 285 ) 286 ], 287 **{ 288 PLAN_CONTEXT_PENDING_USER: connection.user, 289 PLAN_CONTEXT_PROMPT: delete_none_values(self.user_properties), 290 PLAN_CONTEXT_USER_PATH: self.source.get_user_path(), 291 }, 292 ) 293 294 def handle_existing_link( 295 self, 296 connection: UserSourceConnection, 297 ) -> HttpResponse: 298 """Handler when the user was already authenticated and linked an external source 299 to their account.""" 300 # When request isn't authenticated we jump straight to auth 301 if not self.request.user.is_authenticated: 302 return self.handle_auth(connection) 303 # When an override flow token exists we actually still use a flow for link 304 # to continue the existing flow we came from 305 if SESSION_KEY_OVERRIDE_FLOW_TOKEN in self.request.session: 306 return self._prepare_flow(None, connection) 307 connection.save() 308 Event.new( 309 EventAction.SOURCE_LINKED, 310 message="Linked Source", 311 source=self.source, 312 ).from_http(self.request) 313 messages.success( 314 self.request, 315 _("Successfully linked {source}!".format_map({"source": self.source.name})), 316 ) 317 return redirect( 318 reverse( 319 "authentik_core:if-user", 320 ) 321 + "#/settings;page-sources" 322 ) 323 324 def handle_enroll( 325 self, 326 connection: UserSourceConnection, 327 ) -> HttpResponse: 328 """User was not authenticated and previous request was not authenticated.""" 329 # We run the Flow planner here so we can pass the Pending user in the context 330 if not self.source.enrollment_flow: 331 self._logger.warning("source has no enrollment flow") 332 return bad_request_message( 333 self.request, 334 _("Source is not configured for enrollment."), 335 ) 336 return self._prepare_flow( 337 self.source.enrollment_flow, 338 connection, 339 stages=[ 340 in_memory_stage( 341 MessageStage, 342 message=_( 343 "Successfully authenticated with {source}!".format_map( 344 {"source": self.source.name} 345 ) 346 ), 347 ) 348 ], 349 **{ 350 PLAN_CONTEXT_PROMPT: delete_none_values(self.user_properties), 351 PLAN_CONTEXT_USER_PATH: self.source.get_user_path(), 352 }, 353 )
Help sources decide what they should do after authorization. Based on source settings and previous connections, authenticate the user, enroll a new user, link to an existing user or deny the request.
91 def __init__( 92 self, 93 source: Source, 94 request: HttpRequest, 95 identifier: str, 96 user_info: dict[str, Any], 97 policy_context: dict[str, Any], 98 ) -> None: 99 self.source = source 100 self.mapper = SourceMapper(self.source) 101 self.matcher = SourceMatcher( 102 self.source, self.user_connection_type, self.group_connection_type 103 ) 104 self.request = request 105 self.identifier = identifier 106 self.user_info = user_info 107 self._logger = get_logger().bind(source=source, identifier=identifier) 108 self.policy_context = policy_context 109 110 self.user_properties = self.mapper.build_object_properties( 111 object_type=User, request=request, user=None, **self.user_info 112 ) 113 self.groups_properties = { 114 group_id: self.mapper.build_object_properties( 115 object_type=Group, 116 request=request, 117 user=None, 118 group_id=group_id, 119 **self.user_info, 120 ) 121 for group_id in self.user_properties.setdefault("groups", []) 122 } 123 del self.user_properties["groups"]
125 def get_action(self, **kwargs) -> tuple[Action, UserSourceConnection | None]: # noqa: PLR0911 126 """decide which action should be taken""" 127 # When request is authenticated, always link 128 if self.request.user.is_authenticated: 129 new_connection = self.user_connection_type( 130 source=self.source, identifier=self.identifier 131 ) 132 new_connection.user = self.request.user 133 new_connection = self.update_user_connection(new_connection, **kwargs) 134 if existing := self.user_connection_type.objects.filter( 135 source=self.source, identifier=self.identifier 136 ).first(): 137 existing = self.update_user_connection(existing) 138 return Action.AUTH, existing 139 return Action.LINK, new_connection 140 141 action, connection = self.matcher.get_user_action(self.identifier, self.user_properties) 142 if connection: 143 connection = self.update_user_connection(connection, **kwargs) 144 return action, connection
decide which action should be taken
146 def update_user_connection( 147 self, connection: UserSourceConnection, **kwargs 148 ) -> UserSourceConnection: # pragma: no cover 149 """Optionally make changes to the user connection after it is looked up/created.""" 150 return connection
Optionally make changes to the user connection after it is looked up/created.
152 def get_flow(self, **kwargs) -> HttpResponse: 153 """Get the flow response based on user_matching_mode""" 154 try: 155 action, connection = self.get_action(**kwargs) 156 except IntegrityError as exc: 157 self._logger.warning("failed to get action", exc=exc) 158 return redirect(reverse("authentik_core:root-redirect")) 159 self._logger.debug("get_action", action=action, connection=connection) 160 try: 161 if connection: 162 if action == Action.LINK: 163 self._logger.debug("Linking existing user") 164 return self.handle_existing_link(connection) 165 if action == Action.AUTH: 166 self._logger.debug("Handling auth user") 167 return self.handle_auth(connection) 168 if action == Action.ENROLL: 169 self._logger.debug("Handling enrollment of new user") 170 return self.handle_enroll(connection) 171 except FlowNonApplicableException as exc: 172 self._logger.warning("Flow non applicable", exc=exc) 173 return self.error_handler(exc) 174 # Default case, assume deny 175 error = Exception( 176 _( 177 "Request to authenticate with {source} has been denied. Please authenticate " 178 "with the source you've previously signed up with.".format_map( 179 {"source": self.source.name} 180 ) 181 ), 182 ) 183 return self.error_handler(error)
Get the flow response based on user_matching_mode
185 def error_handler(self, error: Exception) -> HttpResponse: 186 """Handle any errors by returning an access denied stage""" 187 response = AccessDeniedResponse(self.request) 188 response.error_message = str(error) 189 if isinstance(error, FlowNonApplicableException): 190 response.policy_result = error.policy_result 191 response.error_message = error.messages 192 return response
Handle any errors by returning an access denied stage
194 def get_stages_to_append(self, flow: Flow) -> list[Stage]: 195 """Hook to override stages which are appended to the flow""" 196 return [ 197 in_memory_stage(PostSourceStage), 198 ]
Hook to override stages which are appended to the flow
269 def handle_auth( 270 self, 271 connection: UserSourceConnection, 272 ) -> HttpResponse: 273 """Login user and redirect.""" 274 return self._prepare_flow( 275 self.source.authentication_flow, 276 connection, 277 stages=[ 278 in_memory_stage( 279 MessageStage, 280 message=_( 281 "Successfully authenticated with {source}!".format_map( 282 {"source": self.source.name} 283 ) 284 ), 285 ) 286 ], 287 **{ 288 PLAN_CONTEXT_PENDING_USER: connection.user, 289 PLAN_CONTEXT_PROMPT: delete_none_values(self.user_properties), 290 PLAN_CONTEXT_USER_PATH: self.source.get_user_path(), 291 }, 292 )
Login user and redirect.
294 def handle_existing_link( 295 self, 296 connection: UserSourceConnection, 297 ) -> HttpResponse: 298 """Handler when the user was already authenticated and linked an external source 299 to their account.""" 300 # When request isn't authenticated we jump straight to auth 301 if not self.request.user.is_authenticated: 302 return self.handle_auth(connection) 303 # When an override flow token exists we actually still use a flow for link 304 # to continue the existing flow we came from 305 if SESSION_KEY_OVERRIDE_FLOW_TOKEN in self.request.session: 306 return self._prepare_flow(None, connection) 307 connection.save() 308 Event.new( 309 EventAction.SOURCE_LINKED, 310 message="Linked Source", 311 source=self.source, 312 ).from_http(self.request) 313 messages.success( 314 self.request, 315 _("Successfully linked {source}!".format_map({"source": self.source.name})), 316 ) 317 return redirect( 318 reverse( 319 "authentik_core:if-user", 320 ) 321 + "#/settings;page-sources" 322 )
Handler when the user was already authenticated and linked an external source to their account.
324 def handle_enroll( 325 self, 326 connection: UserSourceConnection, 327 ) -> HttpResponse: 328 """User was not authenticated and previous request was not authenticated.""" 329 # We run the Flow planner here so we can pass the Pending user in the context 330 if not self.source.enrollment_flow: 331 self._logger.warning("source has no enrollment flow") 332 return bad_request_message( 333 self.request, 334 _("Source is not configured for enrollment."), 335 ) 336 return self._prepare_flow( 337 self.source.enrollment_flow, 338 connection, 339 stages=[ 340 in_memory_stage( 341 MessageStage, 342 message=_( 343 "Successfully authenticated with {source}!".format_map( 344 {"source": self.source.name} 345 ) 346 ), 347 ) 348 ], 349 **{ 350 PLAN_CONTEXT_PROMPT: delete_none_values(self.user_properties), 351 PLAN_CONTEXT_USER_PATH: self.source.get_user_path(), 352 }, 353 )
User was not authenticated and previous request was not authenticated.
356class GroupUpdateStage(StageView): 357 """Dynamically injected stage which updates the user after enrollment/authentication.""" 358 359 def handle_group( 360 self, group_id: str, group_properties: dict[str, Any | dict[str, Any]] 361 ) -> Group | None: 362 action, connection = self.matcher.get_group_action(group_id, group_properties) 363 if action == Action.ENROLL: 364 group = Group.objects.create(**group_properties) 365 connection.group = group 366 connection.save() 367 return group 368 elif action in (Action.LINK, Action.AUTH): 369 group = connection.group 370 group.update_attributes(group_properties) 371 connection.save() 372 return group 373 374 return None 375 376 def handle_groups(self) -> bool: 377 self.source: Source = self.executor.plan.context[PLAN_CONTEXT_SOURCE] 378 self.user: User = self.executor.plan.context[PLAN_CONTEXT_PENDING_USER] 379 self.group_connection_type: GroupSourceConnection = ( 380 self.executor.current_stage.group_connection_type 381 ) 382 self.matcher = SourceMatcher(self.source, None, self.group_connection_type) 383 384 raw_groups: dict[str, dict[str, Any | dict[str, Any]]] = self.executor.plan.context[ 385 PLAN_CONTEXT_SOURCE_GROUPS 386 ] 387 groups: list[Group] = [] 388 389 for group_id, group_properties in raw_groups.items(): 390 group = self.handle_group(group_id, group_properties) 391 if not group: 392 return False 393 groups.append(group) 394 395 with transaction.atomic(): 396 self.user.groups.remove( 397 *self.user.groups.filter(groupsourceconnection__source=self.source) 398 ) 399 self.user.groups.add(*groups) 400 401 return True 402 403 def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 404 """Stage used after the user has been enrolled to sync their groups from source data""" 405 if self.handle_groups(): 406 return self.executor.stage_ok() 407 else: 408 return self.executor.stage_invalid("Failed to update groups. Please try again later.") 409 410 def post(self, request: HttpRequest) -> HttpResponse: 411 """Wrapper for post requests""" 412 return self.get(request)
Dynamically injected stage which updates the user after enrollment/authentication.
359 def handle_group( 360 self, group_id: str, group_properties: dict[str, Any | dict[str, Any]] 361 ) -> Group | None: 362 action, connection = self.matcher.get_group_action(group_id, group_properties) 363 if action == Action.ENROLL: 364 group = Group.objects.create(**group_properties) 365 connection.group = group 366 connection.save() 367 return group 368 elif action in (Action.LINK, Action.AUTH): 369 group = connection.group 370 group.update_attributes(group_properties) 371 connection.save() 372 return group 373 374 return None
376 def handle_groups(self) -> bool: 377 self.source: Source = self.executor.plan.context[PLAN_CONTEXT_SOURCE] 378 self.user: User = self.executor.plan.context[PLAN_CONTEXT_PENDING_USER] 379 self.group_connection_type: GroupSourceConnection = ( 380 self.executor.current_stage.group_connection_type 381 ) 382 self.matcher = SourceMatcher(self.source, None, self.group_connection_type) 383 384 raw_groups: dict[str, dict[str, Any | dict[str, Any]]] = self.executor.plan.context[ 385 PLAN_CONTEXT_SOURCE_GROUPS 386 ] 387 groups: list[Group] = [] 388 389 for group_id, group_properties in raw_groups.items(): 390 group = self.handle_group(group_id, group_properties) 391 if not group: 392 return False 393 groups.append(group) 394 395 with transaction.atomic(): 396 self.user.groups.remove( 397 *self.user.groups.filter(groupsourceconnection__source=self.source) 398 ) 399 self.user.groups.add(*groups) 400 401 return True
403 def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 404 """Stage used after the user has been enrolled to sync their groups from source data""" 405 if self.handle_groups(): 406 return self.executor.stage_ok() 407 else: 408 return self.executor.stage_invalid("Failed to update groups. Please try again later.")
Stage used after the user has been enrolled to sync their groups from source data
410 def post(self, request: HttpRequest) -> HttpResponse: 411 """Wrapper for post requests""" 412 return self.get(request)
Wrapper for post requests