authentik.flows.stage
authentik stage Base view
1"""authentik stage Base view""" 2 3from typing import TYPE_CHECKING 4from urllib.parse import urlencode 5 6from django.conf import settings 7from django.contrib.auth.models import AnonymousUser 8from django.http import HttpRequest 9from django.http.request import QueryDict 10from django.http.response import HttpResponse 11from django.urls import reverse 12from django.views.generic.base import View 13from prometheus_client import Histogram 14from rest_framework.request import Request 15from sentry_sdk import start_span 16from structlog.stdlib import BoundLogger, get_logger 17 18from authentik.common.oauth.constants import PLAN_CONTEXT_POST_LOGOUT_REDIRECT_URI 19from authentik.core.models import Application, User, UserTypes 20from authentik.flows.challenge import ( 21 AccessDeniedChallenge, 22 Challenge, 23 ChallengeResponse, 24 ContextualFlowInfo, 25 HttpChallengeResponse, 26 RedirectChallenge, 27 SessionEndChallenge, 28 WithUserInfoChallenge, 29) 30from authentik.flows.exceptions import StageInvalidException 31from authentik.flows.models import InvalidResponseAction 32from authentik.flows.planner import PLAN_CONTEXT_APPLICATION, PLAN_CONTEXT_PENDING_USER 33from authentik.lib.avatars import DEFAULT_AVATAR, get_avatar 34from authentik.lib.utils.reflection import class_to_path 35 36if TYPE_CHECKING: 37 from authentik.flows.views.executor import FlowExecutorView 38 39PLAN_CONTEXT_PENDING_USER_IDENTIFIER = "pending_user_identifier" 40HIST_FLOWS_STAGE_TIME = Histogram( 41 "authentik_flows_stage_time", 42 "Duration taken by different parts of stages", 43 ["stage_type", "method"], 44) 45 46 47class StageView(View): 48 """Abstract Stage""" 49 50 executor: FlowExecutorView 51 52 request: HttpRequest = None 53 54 logger: BoundLogger 55 56 def __init__(self, executor: FlowExecutorView, **kwargs): 57 self.executor = executor 58 current_stage = getattr(self.executor, "current_stage", None) 59 self.logger = get_logger().bind( 60 stage=getattr(current_stage, "name", None), 61 stage_view=class_to_path(type(self)), 62 ) 63 super().__init__(**kwargs) 64 65 def get_pending_user(self, for_display=False) -> User: 66 """Either show the matched User object or show what the user entered, 67 based on what the earlier stage (mostly IdentificationStage) set. 68 _USER_IDENTIFIER overrides the first User, as PENDING_USER is used for 69 other things besides the form display. 70 71 If no user is pending, returns request.user""" 72 if not self.executor.plan: 73 return self.request.user 74 if PLAN_CONTEXT_PENDING_USER_IDENTIFIER in self.executor.plan.context and for_display: 75 return User( 76 username=self.executor.plan.context.get(PLAN_CONTEXT_PENDING_USER_IDENTIFIER), 77 email="", 78 ) 79 if PLAN_CONTEXT_PENDING_USER in self.executor.plan.context: 80 return self.executor.plan.context[PLAN_CONTEXT_PENDING_USER] 81 return self.request.user 82 83 def cleanup(self): 84 """Cleanup session""" 85 86 87class ChallengeStageView(StageView): 88 """Stage view which response with a challenge""" 89 90 response_class = ChallengeResponse 91 92 def get_response_instance(self, data: QueryDict) -> ChallengeResponse: 93 """Return the response class type""" 94 return self.response_class(None, data=data, stage=self) 95 96 def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 97 """Return a challenge for the frontend to solve""" 98 try: 99 challenge = self._get_challenge(*args, **kwargs) 100 except StageInvalidException as exc: 101 self.logger.debug("Got StageInvalidException", exc=exc) 102 return self.executor.stage_invalid() 103 if not challenge.is_valid(): 104 self.logger.error( 105 "f(ch): Invalid challenge", 106 errors=challenge.errors, 107 challenge=challenge.data, 108 ) 109 return HttpChallengeResponse(challenge) 110 111 def post(self, request: Request, *args, **kwargs) -> HttpResponse: 112 """Handle challenge response""" 113 valid = False 114 try: 115 challenge: ChallengeResponse = self.get_response_instance(data=request.data) 116 valid = challenge.is_valid() 117 except StageInvalidException as exc: 118 self.logger.debug("Got StageInvalidException", exc=exc) 119 return self.executor.stage_invalid() 120 if not valid: 121 if self.executor.current_binding.invalid_response_action in [ 122 InvalidResponseAction.RESTART, 123 InvalidResponseAction.RESTART_WITH_CONTEXT, 124 ]: 125 keep_context = ( 126 self.executor.current_binding.invalid_response_action 127 == InvalidResponseAction.RESTART_WITH_CONTEXT 128 ) 129 self.logger.debug( 130 "f(ch): Invalid response, restarting flow", 131 keep_context=keep_context, 132 ) 133 return self.executor.restart_flow(keep_context) 134 with ( 135 start_span( 136 op="authentik.flow.stage.challenge_invalid", 137 name=self.__class__.__name__, 138 ), 139 HIST_FLOWS_STAGE_TIME.labels( 140 stage_type=self.__class__.__name__, method="challenge_invalid" 141 ).time(), 142 ): 143 return self.challenge_invalid(challenge) 144 with ( 145 start_span( 146 op="authentik.flow.stage.challenge_valid", 147 name=self.__class__.__name__, 148 ), 149 HIST_FLOWS_STAGE_TIME.labels( 150 stage_type=self.__class__.__name__, method="challenge_valid" 151 ).time(), 152 ): 153 return self.challenge_valid(challenge) 154 155 def format_title(self) -> str: 156 """Allow usage of placeholder in flow title.""" 157 if not self.executor.plan: 158 return self.executor.flow.title 159 try: 160 return self.executor.flow.title % { 161 "app": self.executor.plan.context.get(PLAN_CONTEXT_APPLICATION, ""), 162 "user": self.get_pending_user(for_display=True), 163 } 164 165 except Exception as exc: # noqa 166 self.logger.warning("failed to template title", exc=exc) 167 return self.executor.flow.title 168 169 @property 170 def cancel_url(self) -> str: 171 from authentik.flows.views.executor import NEXT_ARG_NAME, SESSION_KEY_GET 172 173 next_param = self.request.session.get(SESSION_KEY_GET, {}).get(NEXT_ARG_NAME) 174 url = reverse("authentik_flows:cancel") 175 if next_param: 176 return f"{url}?{urlencode({NEXT_ARG_NAME: next_param})}" 177 return url 178 179 def _get_challenge(self, *args, **kwargs) -> Challenge: 180 with ( 181 start_span( 182 op="authentik.flow.stage.get_challenge", 183 name=self.__class__.__name__, 184 ), 185 HIST_FLOWS_STAGE_TIME.labels( 186 stage_type=self.__class__.__name__, method="get_challenge" 187 ).time(), 188 ): 189 challenge = self.get_challenge(*args, **kwargs) 190 with start_span( 191 op="authentik.flow.stage._get_challenge", 192 name=self.__class__.__name__, 193 ): 194 if not hasattr(challenge, "initial_data"): 195 challenge.initial_data = {} 196 if "flow_info" not in challenge.initial_data: 197 # Flow payloads can outlive the previous signed media JWT, so 198 # refreshes must mint fresh URLs instead of reusing cached ones. 199 flow_info = ContextualFlowInfo( 200 data={ 201 "title": self.format_title(), 202 "background": self.executor.flow.background_url(use_cache=False), 203 "background_themed_urls": self.executor.flow.background_themed_urls( 204 use_cache=False, 205 ), 206 "cancel_url": self.cancel_url, 207 "layout": self.executor.flow.layout, 208 } 209 ) 210 flow_info.is_valid() 211 challenge.initial_data["flow_info"] = flow_info.data 212 if isinstance(challenge, WithUserInfoChallenge): 213 # If there's a pending user, update the `username` field 214 # this field is only used by password managers. 215 # If there's no user set, an error is raised later. 216 if user := self.get_pending_user(for_display=True): 217 challenge.initial_data["pending_user"] = user.username 218 challenge.initial_data["pending_user_avatar"] = DEFAULT_AVATAR 219 if not isinstance(user, AnonymousUser): 220 challenge.initial_data["pending_user_avatar"] = get_avatar(user, self.request) 221 return challenge 222 223 def get_challenge(self, *args, **kwargs) -> Challenge: 224 """Return the challenge that the client should solve""" 225 raise NotImplementedError 226 227 def challenge_valid(self, response: ChallengeResponse) -> HttpResponse: 228 """Callback when the challenge has the correct format""" 229 raise NotImplementedError 230 231 def challenge_invalid(self, response: ChallengeResponse) -> HttpResponse: 232 """Callback when the challenge has the incorrect format""" 233 challenge_response = self._get_challenge() 234 full_errors = {} 235 for field, errors in response.errors.items(): 236 for error in errors: 237 full_errors.setdefault(field, []) 238 field_error = { 239 "string": str(error), 240 } 241 if hasattr(error, "code"): 242 field_error["code"] = error.code 243 full_errors[field].append(field_error) 244 challenge_response.initial_data["response_errors"] = full_errors 245 if not challenge_response.is_valid(): 246 if settings.TEST: 247 raise StageInvalidException( 248 ( 249 f"Invalid challenge response: \n\t{challenge_response.errors}" 250 f"\n\nValidated data:\n\t {challenge_response.data}" 251 f"\n\nInitial data:\n\t {challenge_response.initial_data}" 252 ), 253 ) 254 self.logger.error( 255 "f(ch): invalid challenge response", 256 errors=challenge_response.errors, 257 ) 258 return HttpChallengeResponse(challenge_response) 259 260 261class AccessDeniedStage(ChallengeStageView): 262 """Used internally by FlowExecutor's stage_invalid()""" 263 264 error_message: str | None 265 266 def __init__(self, executor: FlowExecutorView, error_message: str | None = None, **kwargs): 267 super().__init__(executor, **kwargs) 268 self.error_message = error_message 269 270 def get_challenge(self, *args, **kwargs) -> Challenge: 271 return AccessDeniedChallenge( 272 data={ 273 "error_message": str(self.error_message or "Unknown error"), 274 "component": "ak-stage-access-denied", 275 } 276 ) 277 278 # This can never be reached since this challenge is created on demand and only the 279 # .get() method is called 280 def challenge_valid(self, response: ChallengeResponse) -> HttpResponse: # pragma: no cover 281 return self.executor.cancel() 282 283 284class RedirectStage(ChallengeStageView): 285 """Redirect to any URL""" 286 287 def get_challenge(self, *args, **kwargs) -> RedirectChallenge: 288 destination = getattr( 289 self.executor.current_stage, "destination", reverse("authentik_core:root-redirect") 290 ) 291 return RedirectChallenge( 292 data={ 293 "to": destination, 294 } 295 ) 296 297 def challenge_valid(self, response: ChallengeResponse) -> HttpResponse: 298 return HttpChallengeResponse(self.get_challenge()) 299 300 301class SessionEndStage(ChallengeStageView): 302 """Stage inserted when a flow is used as invalidation flow. By default shows actions 303 that the user is likely to take after signing out of a provider.""" 304 305 def get_challenge(self, *args, **kwargs) -> Challenge: 306 # Check for OIDC post_logout_redirect_uri in context 307 post_logout_redirect_uri = self.executor.plan.context.get( 308 PLAN_CONTEXT_POST_LOGOUT_REDIRECT_URI 309 ) 310 311 if post_logout_redirect_uri: 312 self.logger.debug( 313 "SessionEndStage redirecting to post_logout_redirect_uri", 314 redirect_url=post_logout_redirect_uri, 315 ) 316 return RedirectChallenge( 317 data={ 318 "to": post_logout_redirect_uri, 319 }, 320 ) 321 322 if not self.request.user.is_authenticated: 323 # User is logged out with no redirect URI - go to default 324 return RedirectChallenge( 325 data={ 326 "to": reverse("authentik_core:root-redirect"), 327 }, 328 ) 329 application: Application | None = self.executor.plan.context.get(PLAN_CONTEXT_APPLICATION) 330 data = { 331 "component": "ak-stage-session-end", 332 "brand_name": self.request.brand.branding_title, 333 } 334 if self.get_pending_user().type == UserTypes.INTERNAL: 335 data["overview_url"] = self.request.build_absolute_uri( 336 reverse("authentik_core:root-redirect") 337 ) 338 if application: 339 data["application_name"] = application.name 340 data["application_launch_url"] = application.get_launch_url(self.get_pending_user()) 341 if self.request.brand.flow_invalidation: 342 data["invalidation_flow_url"] = reverse( 343 "authentik_core:if-flow", 344 kwargs={ 345 "flow_slug": self.request.brand.flow_invalidation.slug, 346 }, 347 ) 348 349 return SessionEndChallenge(data=data) 350 351 # This can never be reached since this challenge is created on demand and only the 352 # .get() method is called 353 def challenge_valid(self, response: ChallengeResponse) -> HttpResponse: # pragma: no cover 354 return self.executor.cancel()
48class StageView(View): 49 """Abstract Stage""" 50 51 executor: FlowExecutorView 52 53 request: HttpRequest = None 54 55 logger: BoundLogger 56 57 def __init__(self, executor: FlowExecutorView, **kwargs): 58 self.executor = executor 59 current_stage = getattr(self.executor, "current_stage", None) 60 self.logger = get_logger().bind( 61 stage=getattr(current_stage, "name", None), 62 stage_view=class_to_path(type(self)), 63 ) 64 super().__init__(**kwargs) 65 66 def get_pending_user(self, for_display=False) -> User: 67 """Either show the matched User object or show what the user entered, 68 based on what the earlier stage (mostly IdentificationStage) set. 69 _USER_IDENTIFIER overrides the first User, as PENDING_USER is used for 70 other things besides the form display. 71 72 If no user is pending, returns request.user""" 73 if not self.executor.plan: 74 return self.request.user 75 if PLAN_CONTEXT_PENDING_USER_IDENTIFIER in self.executor.plan.context and for_display: 76 return User( 77 username=self.executor.plan.context.get(PLAN_CONTEXT_PENDING_USER_IDENTIFIER), 78 email="", 79 ) 80 if PLAN_CONTEXT_PENDING_USER in self.executor.plan.context: 81 return self.executor.plan.context[PLAN_CONTEXT_PENDING_USER] 82 return self.request.user 83 84 def cleanup(self): 85 """Cleanup session"""
Abstract Stage
57 def __init__(self, executor: FlowExecutorView, **kwargs): 58 self.executor = executor 59 current_stage = getattr(self.executor, "current_stage", None) 60 self.logger = get_logger().bind( 61 stage=getattr(current_stage, "name", None), 62 stage_view=class_to_path(type(self)), 63 ) 64 super().__init__(**kwargs)
Constructor. Called in the URLconf; can contain helpful extra keyword arguments, and other things.
66 def get_pending_user(self, for_display=False) -> User: 67 """Either show the matched User object or show what the user entered, 68 based on what the earlier stage (mostly IdentificationStage) set. 69 _USER_IDENTIFIER overrides the first User, as PENDING_USER is used for 70 other things besides the form display. 71 72 If no user is pending, returns request.user""" 73 if not self.executor.plan: 74 return self.request.user 75 if PLAN_CONTEXT_PENDING_USER_IDENTIFIER in self.executor.plan.context and for_display: 76 return User( 77 username=self.executor.plan.context.get(PLAN_CONTEXT_PENDING_USER_IDENTIFIER), 78 email="", 79 ) 80 if PLAN_CONTEXT_PENDING_USER in self.executor.plan.context: 81 return self.executor.plan.context[PLAN_CONTEXT_PENDING_USER] 82 return self.request.user
Either show the matched User object or show what the user entered, based on what the earlier stage (mostly IdentificationStage) set. _USER_IDENTIFIER overrides the first User, as PENDING_USER is used for other things besides the form display.
If no user is pending, returns request.user
88class ChallengeStageView(StageView): 89 """Stage view which response with a challenge""" 90 91 response_class = ChallengeResponse 92 93 def get_response_instance(self, data: QueryDict) -> ChallengeResponse: 94 """Return the response class type""" 95 return self.response_class(None, data=data, stage=self) 96 97 def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 98 """Return a challenge for the frontend to solve""" 99 try: 100 challenge = self._get_challenge(*args, **kwargs) 101 except StageInvalidException as exc: 102 self.logger.debug("Got StageInvalidException", exc=exc) 103 return self.executor.stage_invalid() 104 if not challenge.is_valid(): 105 self.logger.error( 106 "f(ch): Invalid challenge", 107 errors=challenge.errors, 108 challenge=challenge.data, 109 ) 110 return HttpChallengeResponse(challenge) 111 112 def post(self, request: Request, *args, **kwargs) -> HttpResponse: 113 """Handle challenge response""" 114 valid = False 115 try: 116 challenge: ChallengeResponse = self.get_response_instance(data=request.data) 117 valid = challenge.is_valid() 118 except StageInvalidException as exc: 119 self.logger.debug("Got StageInvalidException", exc=exc) 120 return self.executor.stage_invalid() 121 if not valid: 122 if self.executor.current_binding.invalid_response_action in [ 123 InvalidResponseAction.RESTART, 124 InvalidResponseAction.RESTART_WITH_CONTEXT, 125 ]: 126 keep_context = ( 127 self.executor.current_binding.invalid_response_action 128 == InvalidResponseAction.RESTART_WITH_CONTEXT 129 ) 130 self.logger.debug( 131 "f(ch): Invalid response, restarting flow", 132 keep_context=keep_context, 133 ) 134 return self.executor.restart_flow(keep_context) 135 with ( 136 start_span( 137 op="authentik.flow.stage.challenge_invalid", 138 name=self.__class__.__name__, 139 ), 140 HIST_FLOWS_STAGE_TIME.labels( 141 stage_type=self.__class__.__name__, method="challenge_invalid" 142 ).time(), 143 ): 144 return self.challenge_invalid(challenge) 145 with ( 146 start_span( 147 op="authentik.flow.stage.challenge_valid", 148 name=self.__class__.__name__, 149 ), 150 HIST_FLOWS_STAGE_TIME.labels( 151 stage_type=self.__class__.__name__, method="challenge_valid" 152 ).time(), 153 ): 154 return self.challenge_valid(challenge) 155 156 def format_title(self) -> str: 157 """Allow usage of placeholder in flow title.""" 158 if not self.executor.plan: 159 return self.executor.flow.title 160 try: 161 return self.executor.flow.title % { 162 "app": self.executor.plan.context.get(PLAN_CONTEXT_APPLICATION, ""), 163 "user": self.get_pending_user(for_display=True), 164 } 165 166 except Exception as exc: # noqa 167 self.logger.warning("failed to template title", exc=exc) 168 return self.executor.flow.title 169 170 @property 171 def cancel_url(self) -> str: 172 from authentik.flows.views.executor import NEXT_ARG_NAME, SESSION_KEY_GET 173 174 next_param = self.request.session.get(SESSION_KEY_GET, {}).get(NEXT_ARG_NAME) 175 url = reverse("authentik_flows:cancel") 176 if next_param: 177 return f"{url}?{urlencode({NEXT_ARG_NAME: next_param})}" 178 return url 179 180 def _get_challenge(self, *args, **kwargs) -> Challenge: 181 with ( 182 start_span( 183 op="authentik.flow.stage.get_challenge", 184 name=self.__class__.__name__, 185 ), 186 HIST_FLOWS_STAGE_TIME.labels( 187 stage_type=self.__class__.__name__, method="get_challenge" 188 ).time(), 189 ): 190 challenge = self.get_challenge(*args, **kwargs) 191 with start_span( 192 op="authentik.flow.stage._get_challenge", 193 name=self.__class__.__name__, 194 ): 195 if not hasattr(challenge, "initial_data"): 196 challenge.initial_data = {} 197 if "flow_info" not in challenge.initial_data: 198 # Flow payloads can outlive the previous signed media JWT, so 199 # refreshes must mint fresh URLs instead of reusing cached ones. 200 flow_info = ContextualFlowInfo( 201 data={ 202 "title": self.format_title(), 203 "background": self.executor.flow.background_url(use_cache=False), 204 "background_themed_urls": self.executor.flow.background_themed_urls( 205 use_cache=False, 206 ), 207 "cancel_url": self.cancel_url, 208 "layout": self.executor.flow.layout, 209 } 210 ) 211 flow_info.is_valid() 212 challenge.initial_data["flow_info"] = flow_info.data 213 if isinstance(challenge, WithUserInfoChallenge): 214 # If there's a pending user, update the `username` field 215 # this field is only used by password managers. 216 # If there's no user set, an error is raised later. 217 if user := self.get_pending_user(for_display=True): 218 challenge.initial_data["pending_user"] = user.username 219 challenge.initial_data["pending_user_avatar"] = DEFAULT_AVATAR 220 if not isinstance(user, AnonymousUser): 221 challenge.initial_data["pending_user_avatar"] = get_avatar(user, self.request) 222 return challenge 223 224 def get_challenge(self, *args, **kwargs) -> Challenge: 225 """Return the challenge that the client should solve""" 226 raise NotImplementedError 227 228 def challenge_valid(self, response: ChallengeResponse) -> HttpResponse: 229 """Callback when the challenge has the correct format""" 230 raise NotImplementedError 231 232 def challenge_invalid(self, response: ChallengeResponse) -> HttpResponse: 233 """Callback when the challenge has the incorrect format""" 234 challenge_response = self._get_challenge() 235 full_errors = {} 236 for field, errors in response.errors.items(): 237 for error in errors: 238 full_errors.setdefault(field, []) 239 field_error = { 240 "string": str(error), 241 } 242 if hasattr(error, "code"): 243 field_error["code"] = error.code 244 full_errors[field].append(field_error) 245 challenge_response.initial_data["response_errors"] = full_errors 246 if not challenge_response.is_valid(): 247 if settings.TEST: 248 raise StageInvalidException( 249 ( 250 f"Invalid challenge response: \n\t{challenge_response.errors}" 251 f"\n\nValidated data:\n\t {challenge_response.data}" 252 f"\n\nInitial data:\n\t {challenge_response.initial_data}" 253 ), 254 ) 255 self.logger.error( 256 "f(ch): invalid challenge response", 257 errors=challenge_response.errors, 258 ) 259 return HttpChallengeResponse(challenge_response)
Stage view which response with a challenge
93 def get_response_instance(self, data: QueryDict) -> ChallengeResponse: 94 """Return the response class type""" 95 return self.response_class(None, data=data, stage=self)
Return the response class type
97 def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 98 """Return a challenge for the frontend to solve""" 99 try: 100 challenge = self._get_challenge(*args, **kwargs) 101 except StageInvalidException as exc: 102 self.logger.debug("Got StageInvalidException", exc=exc) 103 return self.executor.stage_invalid() 104 if not challenge.is_valid(): 105 self.logger.error( 106 "f(ch): Invalid challenge", 107 errors=challenge.errors, 108 challenge=challenge.data, 109 ) 110 return HttpChallengeResponse(challenge)
Return a challenge for the frontend to solve
112 def post(self, request: Request, *args, **kwargs) -> HttpResponse: 113 """Handle challenge response""" 114 valid = False 115 try: 116 challenge: ChallengeResponse = self.get_response_instance(data=request.data) 117 valid = challenge.is_valid() 118 except StageInvalidException as exc: 119 self.logger.debug("Got StageInvalidException", exc=exc) 120 return self.executor.stage_invalid() 121 if not valid: 122 if self.executor.current_binding.invalid_response_action in [ 123 InvalidResponseAction.RESTART, 124 InvalidResponseAction.RESTART_WITH_CONTEXT, 125 ]: 126 keep_context = ( 127 self.executor.current_binding.invalid_response_action 128 == InvalidResponseAction.RESTART_WITH_CONTEXT 129 ) 130 self.logger.debug( 131 "f(ch): Invalid response, restarting flow", 132 keep_context=keep_context, 133 ) 134 return self.executor.restart_flow(keep_context) 135 with ( 136 start_span( 137 op="authentik.flow.stage.challenge_invalid", 138 name=self.__class__.__name__, 139 ), 140 HIST_FLOWS_STAGE_TIME.labels( 141 stage_type=self.__class__.__name__, method="challenge_invalid" 142 ).time(), 143 ): 144 return self.challenge_invalid(challenge) 145 with ( 146 start_span( 147 op="authentik.flow.stage.challenge_valid", 148 name=self.__class__.__name__, 149 ), 150 HIST_FLOWS_STAGE_TIME.labels( 151 stage_type=self.__class__.__name__, method="challenge_valid" 152 ).time(), 153 ): 154 return self.challenge_valid(challenge)
Handle challenge response
156 def format_title(self) -> str: 157 """Allow usage of placeholder in flow title.""" 158 if not self.executor.plan: 159 return self.executor.flow.title 160 try: 161 return self.executor.flow.title % { 162 "app": self.executor.plan.context.get(PLAN_CONTEXT_APPLICATION, ""), 163 "user": self.get_pending_user(for_display=True), 164 } 165 166 except Exception as exc: # noqa 167 self.logger.warning("failed to template title", exc=exc) 168 return self.executor.flow.title
Allow usage of placeholder in flow title.
170 @property 171 def cancel_url(self) -> str: 172 from authentik.flows.views.executor import NEXT_ARG_NAME, SESSION_KEY_GET 173 174 next_param = self.request.session.get(SESSION_KEY_GET, {}).get(NEXT_ARG_NAME) 175 url = reverse("authentik_flows:cancel") 176 if next_param: 177 return f"{url}?{urlencode({NEXT_ARG_NAME: next_param})}" 178 return url
224 def get_challenge(self, *args, **kwargs) -> Challenge: 225 """Return the challenge that the client should solve""" 226 raise NotImplementedError
Return the challenge that the client should solve
228 def challenge_valid(self, response: ChallengeResponse) -> HttpResponse: 229 """Callback when the challenge has the correct format""" 230 raise NotImplementedError
Callback when the challenge has the correct format
232 def challenge_invalid(self, response: ChallengeResponse) -> HttpResponse: 233 """Callback when the challenge has the incorrect format""" 234 challenge_response = self._get_challenge() 235 full_errors = {} 236 for field, errors in response.errors.items(): 237 for error in errors: 238 full_errors.setdefault(field, []) 239 field_error = { 240 "string": str(error), 241 } 242 if hasattr(error, "code"): 243 field_error["code"] = error.code 244 full_errors[field].append(field_error) 245 challenge_response.initial_data["response_errors"] = full_errors 246 if not challenge_response.is_valid(): 247 if settings.TEST: 248 raise StageInvalidException( 249 ( 250 f"Invalid challenge response: \n\t{challenge_response.errors}" 251 f"\n\nValidated data:\n\t {challenge_response.data}" 252 f"\n\nInitial data:\n\t {challenge_response.initial_data}" 253 ), 254 ) 255 self.logger.error( 256 "f(ch): invalid challenge response", 257 errors=challenge_response.errors, 258 ) 259 return HttpChallengeResponse(challenge_response)
Callback when the challenge has the incorrect format
262class AccessDeniedStage(ChallengeStageView): 263 """Used internally by FlowExecutor's stage_invalid()""" 264 265 error_message: str | None 266 267 def __init__(self, executor: FlowExecutorView, error_message: str | None = None, **kwargs): 268 super().__init__(executor, **kwargs) 269 self.error_message = error_message 270 271 def get_challenge(self, *args, **kwargs) -> Challenge: 272 return AccessDeniedChallenge( 273 data={ 274 "error_message": str(self.error_message or "Unknown error"), 275 "component": "ak-stage-access-denied", 276 } 277 ) 278 279 # This can never be reached since this challenge is created on demand and only the 280 # .get() method is called 281 def challenge_valid(self, response: ChallengeResponse) -> HttpResponse: # pragma: no cover 282 return self.executor.cancel()
Used internally by FlowExecutor's stage_invalid()
267 def __init__(self, executor: FlowExecutorView, error_message: str | None = None, **kwargs): 268 super().__init__(executor, **kwargs) 269 self.error_message = error_message
Constructor. Called in the URLconf; can contain helpful extra keyword arguments, and other things.
271 def get_challenge(self, *args, **kwargs) -> Challenge: 272 return AccessDeniedChallenge( 273 data={ 274 "error_message": str(self.error_message or "Unknown error"), 275 "component": "ak-stage-access-denied", 276 } 277 )
Return the challenge that the client should solve
281 def challenge_valid(self, response: ChallengeResponse) -> HttpResponse: # pragma: no cover 282 return self.executor.cancel()
Callback when the challenge has the correct format
285class RedirectStage(ChallengeStageView): 286 """Redirect to any URL""" 287 288 def get_challenge(self, *args, **kwargs) -> RedirectChallenge: 289 destination = getattr( 290 self.executor.current_stage, "destination", reverse("authentik_core:root-redirect") 291 ) 292 return RedirectChallenge( 293 data={ 294 "to": destination, 295 } 296 ) 297 298 def challenge_valid(self, response: ChallengeResponse) -> HttpResponse: 299 return HttpChallengeResponse(self.get_challenge())
Redirect to any URL
288 def get_challenge(self, *args, **kwargs) -> RedirectChallenge: 289 destination = getattr( 290 self.executor.current_stage, "destination", reverse("authentik_core:root-redirect") 291 ) 292 return RedirectChallenge( 293 data={ 294 "to": destination, 295 } 296 )
Return the challenge that the client should solve
298 def challenge_valid(self, response: ChallengeResponse) -> HttpResponse: 299 return HttpChallengeResponse(self.get_challenge())
Callback when the challenge has the correct format
302class SessionEndStage(ChallengeStageView): 303 """Stage inserted when a flow is used as invalidation flow. By default shows actions 304 that the user is likely to take after signing out of a provider.""" 305 306 def get_challenge(self, *args, **kwargs) -> Challenge: 307 # Check for OIDC post_logout_redirect_uri in context 308 post_logout_redirect_uri = self.executor.plan.context.get( 309 PLAN_CONTEXT_POST_LOGOUT_REDIRECT_URI 310 ) 311 312 if post_logout_redirect_uri: 313 self.logger.debug( 314 "SessionEndStage redirecting to post_logout_redirect_uri", 315 redirect_url=post_logout_redirect_uri, 316 ) 317 return RedirectChallenge( 318 data={ 319 "to": post_logout_redirect_uri, 320 }, 321 ) 322 323 if not self.request.user.is_authenticated: 324 # User is logged out with no redirect URI - go to default 325 return RedirectChallenge( 326 data={ 327 "to": reverse("authentik_core:root-redirect"), 328 }, 329 ) 330 application: Application | None = self.executor.plan.context.get(PLAN_CONTEXT_APPLICATION) 331 data = { 332 "component": "ak-stage-session-end", 333 "brand_name": self.request.brand.branding_title, 334 } 335 if self.get_pending_user().type == UserTypes.INTERNAL: 336 data["overview_url"] = self.request.build_absolute_uri( 337 reverse("authentik_core:root-redirect") 338 ) 339 if application: 340 data["application_name"] = application.name 341 data["application_launch_url"] = application.get_launch_url(self.get_pending_user()) 342 if self.request.brand.flow_invalidation: 343 data["invalidation_flow_url"] = reverse( 344 "authentik_core:if-flow", 345 kwargs={ 346 "flow_slug": self.request.brand.flow_invalidation.slug, 347 }, 348 ) 349 350 return SessionEndChallenge(data=data) 351 352 # This can never be reached since this challenge is created on demand and only the 353 # .get() method is called 354 def challenge_valid(self, response: ChallengeResponse) -> HttpResponse: # pragma: no cover 355 return self.executor.cancel()
Stage inserted when a flow is used as invalidation flow. By default shows actions that the user is likely to take after signing out of a provider.
306 def get_challenge(self, *args, **kwargs) -> Challenge: 307 # Check for OIDC post_logout_redirect_uri in context 308 post_logout_redirect_uri = self.executor.plan.context.get( 309 PLAN_CONTEXT_POST_LOGOUT_REDIRECT_URI 310 ) 311 312 if post_logout_redirect_uri: 313 self.logger.debug( 314 "SessionEndStage redirecting to post_logout_redirect_uri", 315 redirect_url=post_logout_redirect_uri, 316 ) 317 return RedirectChallenge( 318 data={ 319 "to": post_logout_redirect_uri, 320 }, 321 ) 322 323 if not self.request.user.is_authenticated: 324 # User is logged out with no redirect URI - go to default 325 return RedirectChallenge( 326 data={ 327 "to": reverse("authentik_core:root-redirect"), 328 }, 329 ) 330 application: Application | None = self.executor.plan.context.get(PLAN_CONTEXT_APPLICATION) 331 data = { 332 "component": "ak-stage-session-end", 333 "brand_name": self.request.brand.branding_title, 334 } 335 if self.get_pending_user().type == UserTypes.INTERNAL: 336 data["overview_url"] = self.request.build_absolute_uri( 337 reverse("authentik_core:root-redirect") 338 ) 339 if application: 340 data["application_name"] = application.name 341 data["application_launch_url"] = application.get_launch_url(self.get_pending_user()) 342 if self.request.brand.flow_invalidation: 343 data["invalidation_flow_url"] = reverse( 344 "authentik_core:if-flow", 345 kwargs={ 346 "flow_slug": self.request.brand.flow_invalidation.slug, 347 }, 348 ) 349 350 return SessionEndChallenge(data=data)
Return the challenge that the client should solve
354 def challenge_valid(self, response: ChallengeResponse) -> HttpResponse: # pragma: no cover 355 return self.executor.cancel()
Callback when the challenge has the correct format