authentik.flows.stage
authentik stage Base view
1"""authentik stage Base view""" 2 3from typing import TYPE_CHECKING 4from urllib.parse import urlencode 5 6from django.conf import settings 7from django.contrib.auth.models import AnonymousUser 8from django.http import HttpRequest 9from django.http.request import QueryDict 10from django.http.response import HttpResponse 11from django.urls import reverse 12from django.views.generic.base import View 13from prometheus_client import Histogram 14from rest_framework.request import Request 15from sentry_sdk import start_span 16from structlog.stdlib import BoundLogger, get_logger 17 18from authentik.core.models import Application, User 19from authentik.flows.challenge import ( 20 AccessDeniedChallenge, 21 Challenge, 22 ChallengeResponse, 23 ContextualFlowInfo, 24 HttpChallengeResponse, 25 RedirectChallenge, 26 SessionEndChallenge, 27 WithUserInfoChallenge, 28) 29from authentik.flows.exceptions import StageInvalidException 30from authentik.flows.models import InvalidResponseAction 31from authentik.flows.planner import PLAN_CONTEXT_APPLICATION, PLAN_CONTEXT_PENDING_USER 32from authentik.lib.avatars import DEFAULT_AVATAR, get_avatar 33from authentik.lib.utils.reflection import class_to_path 34 35if TYPE_CHECKING: 36 from authentik.flows.views.executor import FlowExecutorView 37 38PLAN_CONTEXT_PENDING_USER_IDENTIFIER = "pending_user_identifier" 39HIST_FLOWS_STAGE_TIME = Histogram( 40 "authentik_flows_stage_time", 41 "Duration taken by different parts of stages", 42 ["stage_type", "method"], 43) 44 45 46class StageView(View): 47 """Abstract Stage""" 48 49 executor: FlowExecutorView 50 51 request: HttpRequest = None 52 53 logger: BoundLogger 54 55 def __init__(self, executor: FlowExecutorView, **kwargs): 56 self.executor = executor 57 current_stage = getattr(self.executor, "current_stage", None) 58 self.logger = get_logger().bind( 59 stage=getattr(current_stage, "name", None), 60 stage_view=class_to_path(type(self)), 61 ) 62 super().__init__(**kwargs) 63 64 def get_pending_user(self, for_display=False) -> User: 65 """Either show the matched User object or show what the user entered, 66 based on what the earlier stage (mostly IdentificationStage) set. 67 _USER_IDENTIFIER overrides the first User, as PENDING_USER is used for 68 other things besides the form display. 69 70 If no user is pending, returns request.user""" 71 if not self.executor.plan: 72 return self.request.user 73 if PLAN_CONTEXT_PENDING_USER_IDENTIFIER in self.executor.plan.context and for_display: 74 return User( 75 username=self.executor.plan.context.get(PLAN_CONTEXT_PENDING_USER_IDENTIFIER), 76 email="", 77 ) 78 if PLAN_CONTEXT_PENDING_USER in self.executor.plan.context: 79 return self.executor.plan.context[PLAN_CONTEXT_PENDING_USER] 80 return self.request.user 81 82 def cleanup(self): 83 """Cleanup session""" 84 85 86class ChallengeStageView(StageView): 87 """Stage view which response with a challenge""" 88 89 response_class = ChallengeResponse 90 91 def get_response_instance(self, data: QueryDict) -> ChallengeResponse: 92 """Return the response class type""" 93 return self.response_class(None, data=data, stage=self) 94 95 def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 96 """Return a challenge for the frontend to solve""" 97 try: 98 challenge = self._get_challenge(*args, **kwargs) 99 except StageInvalidException as exc: 100 self.logger.debug("Got StageInvalidException", exc=exc) 101 return self.executor.stage_invalid() 102 if not challenge.is_valid(): 103 self.logger.error( 104 "f(ch): Invalid challenge", 105 errors=challenge.errors, 106 challenge=challenge.data, 107 ) 108 return HttpChallengeResponse(challenge) 109 110 def post(self, request: Request, *args, **kwargs) -> HttpResponse: 111 """Handle challenge response""" 112 valid = False 113 try: 114 challenge: ChallengeResponse = self.get_response_instance(data=request.data) 115 valid = challenge.is_valid() 116 except StageInvalidException as exc: 117 self.logger.debug("Got StageInvalidException", exc=exc) 118 return self.executor.stage_invalid() 119 if not valid: 120 if self.executor.current_binding.invalid_response_action in [ 121 InvalidResponseAction.RESTART, 122 InvalidResponseAction.RESTART_WITH_CONTEXT, 123 ]: 124 keep_context = ( 125 self.executor.current_binding.invalid_response_action 126 == InvalidResponseAction.RESTART_WITH_CONTEXT 127 ) 128 self.logger.debug( 129 "f(ch): Invalid response, restarting flow", 130 keep_context=keep_context, 131 ) 132 return self.executor.restart_flow(keep_context) 133 with ( 134 start_span( 135 op="authentik.flow.stage.challenge_invalid", 136 name=self.__class__.__name__, 137 ), 138 HIST_FLOWS_STAGE_TIME.labels( 139 stage_type=self.__class__.__name__, method="challenge_invalid" 140 ).time(), 141 ): 142 return self.challenge_invalid(challenge) 143 with ( 144 start_span( 145 op="authentik.flow.stage.challenge_valid", 146 name=self.__class__.__name__, 147 ), 148 HIST_FLOWS_STAGE_TIME.labels( 149 stage_type=self.__class__.__name__, method="challenge_valid" 150 ).time(), 151 ): 152 return self.challenge_valid(challenge) 153 154 def format_title(self) -> str: 155 """Allow usage of placeholder in flow title.""" 156 if not self.executor.plan: 157 return self.executor.flow.title 158 try: 159 return self.executor.flow.title % { 160 "app": self.executor.plan.context.get(PLAN_CONTEXT_APPLICATION, ""), 161 "user": self.get_pending_user(for_display=True), 162 } 163 164 except Exception as exc: # noqa 165 self.logger.warning("failed to template title", exc=exc) 166 return self.executor.flow.title 167 168 @property 169 def cancel_url(self) -> str: 170 from authentik.flows.views.executor import NEXT_ARG_NAME, SESSION_KEY_GET 171 172 next_param = self.request.session.get(SESSION_KEY_GET, {}).get(NEXT_ARG_NAME) 173 url = reverse("authentik_flows:cancel") 174 if next_param: 175 return f"{url}?{urlencode({NEXT_ARG_NAME: next_param})}" 176 return url 177 178 def _get_challenge(self, *args, **kwargs) -> Challenge: 179 with ( 180 start_span( 181 op="authentik.flow.stage.get_challenge", 182 name=self.__class__.__name__, 183 ), 184 HIST_FLOWS_STAGE_TIME.labels( 185 stage_type=self.__class__.__name__, method="get_challenge" 186 ).time(), 187 ): 188 challenge = self.get_challenge(*args, **kwargs) 189 with start_span( 190 op="authentik.flow.stage._get_challenge", 191 name=self.__class__.__name__, 192 ): 193 if not hasattr(challenge, "initial_data"): 194 challenge.initial_data = {} 195 if "flow_info" not in challenge.initial_data: 196 flow_info = ContextualFlowInfo( 197 data={ 198 "title": self.format_title(), 199 "background": self.executor.flow.background_url(self.request), 200 "background_themed_urls": self.executor.flow.background_themed_urls( 201 self.request 202 ), 203 "cancel_url": self.cancel_url, 204 "layout": self.executor.flow.layout, 205 } 206 ) 207 flow_info.is_valid() 208 challenge.initial_data["flow_info"] = flow_info.data 209 if isinstance(challenge, WithUserInfoChallenge): 210 # If there's a pending user, update the `username` field 211 # this field is only used by password managers. 212 # If there's no user set, an error is raised later. 213 if user := self.get_pending_user(for_display=True): 214 challenge.initial_data["pending_user"] = user.username 215 challenge.initial_data["pending_user_avatar"] = DEFAULT_AVATAR 216 if not isinstance(user, AnonymousUser): 217 challenge.initial_data["pending_user_avatar"] = get_avatar(user, self.request) 218 return challenge 219 220 def get_challenge(self, *args, **kwargs) -> Challenge: 221 """Return the challenge that the client should solve""" 222 raise NotImplementedError 223 224 def challenge_valid(self, response: ChallengeResponse) -> HttpResponse: 225 """Callback when the challenge has the correct format""" 226 raise NotImplementedError 227 228 def challenge_invalid(self, response: ChallengeResponse) -> HttpResponse: 229 """Callback when the challenge has the incorrect format""" 230 challenge_response = self._get_challenge() 231 full_errors = {} 232 for field, errors in response.errors.items(): 233 for error in errors: 234 full_errors.setdefault(field, []) 235 field_error = { 236 "string": str(error), 237 } 238 if hasattr(error, "code"): 239 field_error["code"] = error.code 240 full_errors[field].append(field_error) 241 challenge_response.initial_data["response_errors"] = full_errors 242 if not challenge_response.is_valid(): 243 if settings.TEST: 244 raise StageInvalidException( 245 ( 246 f"Invalid challenge response: \n\t{challenge_response.errors}" 247 f"\n\nValidated data:\n\t {challenge_response.data}" 248 f"\n\nInitial data:\n\t {challenge_response.initial_data}" 249 ), 250 ) 251 self.logger.error( 252 "f(ch): invalid challenge response", 253 errors=challenge_response.errors, 254 ) 255 return HttpChallengeResponse(challenge_response) 256 257 258class AccessDeniedStage(ChallengeStageView): 259 """Used internally by FlowExecutor's stage_invalid()""" 260 261 error_message: str | None 262 263 def __init__(self, executor: FlowExecutorView, error_message: str | None = None, **kwargs): 264 super().__init__(executor, **kwargs) 265 self.error_message = error_message 266 267 def get_challenge(self, *args, **kwargs) -> Challenge: 268 return AccessDeniedChallenge( 269 data={ 270 "error_message": str(self.error_message or "Unknown error"), 271 "component": "ak-stage-access-denied", 272 } 273 ) 274 275 # This can never be reached since this challenge is created on demand and only the 276 # .get() method is called 277 def challenge_valid(self, response: ChallengeResponse) -> HttpResponse: # pragma: no cover 278 return self.executor.cancel() 279 280 281class RedirectStage(ChallengeStageView): 282 """Redirect to any URL""" 283 284 def get_challenge(self, *args, **kwargs) -> RedirectChallenge: 285 destination = getattr( 286 self.executor.current_stage, "destination", reverse("authentik_core:root-redirect") 287 ) 288 return RedirectChallenge( 289 data={ 290 "to": destination, 291 } 292 ) 293 294 def challenge_valid(self, response: ChallengeResponse) -> HttpResponse: 295 return HttpChallengeResponse(self.get_challenge()) 296 297 298class SessionEndStage(ChallengeStageView): 299 """Stage inserted when a flow is used as invalidation flow. By default shows actions 300 that the user is likely to take after signing out of a provider.""" 301 302 def get_challenge(self, *args, **kwargs) -> Challenge: 303 if not self.request.user.is_authenticated: 304 return RedirectChallenge( 305 data={ 306 "to": reverse("authentik_core:root-redirect"), 307 }, 308 ) 309 application: Application | None = self.executor.plan.context.get(PLAN_CONTEXT_APPLICATION) 310 data = { 311 "component": "ak-stage-session-end", 312 "brand_name": self.request.brand.branding_title, 313 } 314 if application: 315 data["application_name"] = application.name 316 data["application_launch_url"] = application.get_launch_url(self.get_pending_user()) 317 if self.request.brand.flow_invalidation: 318 data["invalidation_flow_url"] = reverse( 319 "authentik_core:if-flow", 320 kwargs={ 321 "flow_slug": self.request.brand.flow_invalidation.slug, 322 }, 323 ) 324 325 return SessionEndChallenge(data=data) 326 327 # This can never be reached since this challenge is created on demand and only the 328 # .get() method is called 329 def challenge_valid(self, response: ChallengeResponse) -> HttpResponse: # pragma: no cover 330 return self.executor.cancel()
47class StageView(View): 48 """Abstract Stage""" 49 50 executor: FlowExecutorView 51 52 request: HttpRequest = None 53 54 logger: BoundLogger 55 56 def __init__(self, executor: FlowExecutorView, **kwargs): 57 self.executor = executor 58 current_stage = getattr(self.executor, "current_stage", None) 59 self.logger = get_logger().bind( 60 stage=getattr(current_stage, "name", None), 61 stage_view=class_to_path(type(self)), 62 ) 63 super().__init__(**kwargs) 64 65 def get_pending_user(self, for_display=False) -> User: 66 """Either show the matched User object or show what the user entered, 67 based on what the earlier stage (mostly IdentificationStage) set. 68 _USER_IDENTIFIER overrides the first User, as PENDING_USER is used for 69 other things besides the form display. 70 71 If no user is pending, returns request.user""" 72 if not self.executor.plan: 73 return self.request.user 74 if PLAN_CONTEXT_PENDING_USER_IDENTIFIER in self.executor.plan.context and for_display: 75 return User( 76 username=self.executor.plan.context.get(PLAN_CONTEXT_PENDING_USER_IDENTIFIER), 77 email="", 78 ) 79 if PLAN_CONTEXT_PENDING_USER in self.executor.plan.context: 80 return self.executor.plan.context[PLAN_CONTEXT_PENDING_USER] 81 return self.request.user 82 83 def cleanup(self): 84 """Cleanup session"""
Abstract Stage
56 def __init__(self, executor: FlowExecutorView, **kwargs): 57 self.executor = executor 58 current_stage = getattr(self.executor, "current_stage", None) 59 self.logger = get_logger().bind( 60 stage=getattr(current_stage, "name", None), 61 stage_view=class_to_path(type(self)), 62 ) 63 super().__init__(**kwargs)
Constructor. Called in the URLconf; can contain helpful extra keyword arguments, and other things.
65 def get_pending_user(self, for_display=False) -> User: 66 """Either show the matched User object or show what the user entered, 67 based on what the earlier stage (mostly IdentificationStage) set. 68 _USER_IDENTIFIER overrides the first User, as PENDING_USER is used for 69 other things besides the form display. 70 71 If no user is pending, returns request.user""" 72 if not self.executor.plan: 73 return self.request.user 74 if PLAN_CONTEXT_PENDING_USER_IDENTIFIER in self.executor.plan.context and for_display: 75 return User( 76 username=self.executor.plan.context.get(PLAN_CONTEXT_PENDING_USER_IDENTIFIER), 77 email="", 78 ) 79 if PLAN_CONTEXT_PENDING_USER in self.executor.plan.context: 80 return self.executor.plan.context[PLAN_CONTEXT_PENDING_USER] 81 return self.request.user
Either show the matched User object or show what the user entered, based on what the earlier stage (mostly IdentificationStage) set. _USER_IDENTIFIER overrides the first User, as PENDING_USER is used for other things besides the form display.
If no user is pending, returns request.user
87class ChallengeStageView(StageView): 88 """Stage view which response with a challenge""" 89 90 response_class = ChallengeResponse 91 92 def get_response_instance(self, data: QueryDict) -> ChallengeResponse: 93 """Return the response class type""" 94 return self.response_class(None, data=data, stage=self) 95 96 def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 97 """Return a challenge for the frontend to solve""" 98 try: 99 challenge = self._get_challenge(*args, **kwargs) 100 except StageInvalidException as exc: 101 self.logger.debug("Got StageInvalidException", exc=exc) 102 return self.executor.stage_invalid() 103 if not challenge.is_valid(): 104 self.logger.error( 105 "f(ch): Invalid challenge", 106 errors=challenge.errors, 107 challenge=challenge.data, 108 ) 109 return HttpChallengeResponse(challenge) 110 111 def post(self, request: Request, *args, **kwargs) -> HttpResponse: 112 """Handle challenge response""" 113 valid = False 114 try: 115 challenge: ChallengeResponse = self.get_response_instance(data=request.data) 116 valid = challenge.is_valid() 117 except StageInvalidException as exc: 118 self.logger.debug("Got StageInvalidException", exc=exc) 119 return self.executor.stage_invalid() 120 if not valid: 121 if self.executor.current_binding.invalid_response_action in [ 122 InvalidResponseAction.RESTART, 123 InvalidResponseAction.RESTART_WITH_CONTEXT, 124 ]: 125 keep_context = ( 126 self.executor.current_binding.invalid_response_action 127 == InvalidResponseAction.RESTART_WITH_CONTEXT 128 ) 129 self.logger.debug( 130 "f(ch): Invalid response, restarting flow", 131 keep_context=keep_context, 132 ) 133 return self.executor.restart_flow(keep_context) 134 with ( 135 start_span( 136 op="authentik.flow.stage.challenge_invalid", 137 name=self.__class__.__name__, 138 ), 139 HIST_FLOWS_STAGE_TIME.labels( 140 stage_type=self.__class__.__name__, method="challenge_invalid" 141 ).time(), 142 ): 143 return self.challenge_invalid(challenge) 144 with ( 145 start_span( 146 op="authentik.flow.stage.challenge_valid", 147 name=self.__class__.__name__, 148 ), 149 HIST_FLOWS_STAGE_TIME.labels( 150 stage_type=self.__class__.__name__, method="challenge_valid" 151 ).time(), 152 ): 153 return self.challenge_valid(challenge) 154 155 def format_title(self) -> str: 156 """Allow usage of placeholder in flow title.""" 157 if not self.executor.plan: 158 return self.executor.flow.title 159 try: 160 return self.executor.flow.title % { 161 "app": self.executor.plan.context.get(PLAN_CONTEXT_APPLICATION, ""), 162 "user": self.get_pending_user(for_display=True), 163 } 164 165 except Exception as exc: # noqa 166 self.logger.warning("failed to template title", exc=exc) 167 return self.executor.flow.title 168 169 @property 170 def cancel_url(self) -> str: 171 from authentik.flows.views.executor import NEXT_ARG_NAME, SESSION_KEY_GET 172 173 next_param = self.request.session.get(SESSION_KEY_GET, {}).get(NEXT_ARG_NAME) 174 url = reverse("authentik_flows:cancel") 175 if next_param: 176 return f"{url}?{urlencode({NEXT_ARG_NAME: next_param})}" 177 return url 178 179 def _get_challenge(self, *args, **kwargs) -> Challenge: 180 with ( 181 start_span( 182 op="authentik.flow.stage.get_challenge", 183 name=self.__class__.__name__, 184 ), 185 HIST_FLOWS_STAGE_TIME.labels( 186 stage_type=self.__class__.__name__, method="get_challenge" 187 ).time(), 188 ): 189 challenge = self.get_challenge(*args, **kwargs) 190 with start_span( 191 op="authentik.flow.stage._get_challenge", 192 name=self.__class__.__name__, 193 ): 194 if not hasattr(challenge, "initial_data"): 195 challenge.initial_data = {} 196 if "flow_info" not in challenge.initial_data: 197 flow_info = ContextualFlowInfo( 198 data={ 199 "title": self.format_title(), 200 "background": self.executor.flow.background_url(self.request), 201 "background_themed_urls": self.executor.flow.background_themed_urls( 202 self.request 203 ), 204 "cancel_url": self.cancel_url, 205 "layout": self.executor.flow.layout, 206 } 207 ) 208 flow_info.is_valid() 209 challenge.initial_data["flow_info"] = flow_info.data 210 if isinstance(challenge, WithUserInfoChallenge): 211 # If there's a pending user, update the `username` field 212 # this field is only used by password managers. 213 # If there's no user set, an error is raised later. 214 if user := self.get_pending_user(for_display=True): 215 challenge.initial_data["pending_user"] = user.username 216 challenge.initial_data["pending_user_avatar"] = DEFAULT_AVATAR 217 if not isinstance(user, AnonymousUser): 218 challenge.initial_data["pending_user_avatar"] = get_avatar(user, self.request) 219 return challenge 220 221 def get_challenge(self, *args, **kwargs) -> Challenge: 222 """Return the challenge that the client should solve""" 223 raise NotImplementedError 224 225 def challenge_valid(self, response: ChallengeResponse) -> HttpResponse: 226 """Callback when the challenge has the correct format""" 227 raise NotImplementedError 228 229 def challenge_invalid(self, response: ChallengeResponse) -> HttpResponse: 230 """Callback when the challenge has the incorrect format""" 231 challenge_response = self._get_challenge() 232 full_errors = {} 233 for field, errors in response.errors.items(): 234 for error in errors: 235 full_errors.setdefault(field, []) 236 field_error = { 237 "string": str(error), 238 } 239 if hasattr(error, "code"): 240 field_error["code"] = error.code 241 full_errors[field].append(field_error) 242 challenge_response.initial_data["response_errors"] = full_errors 243 if not challenge_response.is_valid(): 244 if settings.TEST: 245 raise StageInvalidException( 246 ( 247 f"Invalid challenge response: \n\t{challenge_response.errors}" 248 f"\n\nValidated data:\n\t {challenge_response.data}" 249 f"\n\nInitial data:\n\t {challenge_response.initial_data}" 250 ), 251 ) 252 self.logger.error( 253 "f(ch): invalid challenge response", 254 errors=challenge_response.errors, 255 ) 256 return HttpChallengeResponse(challenge_response)
Stage view which response with a challenge
92 def get_response_instance(self, data: QueryDict) -> ChallengeResponse: 93 """Return the response class type""" 94 return self.response_class(None, data=data, stage=self)
Return the response class type
96 def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 97 """Return a challenge for the frontend to solve""" 98 try: 99 challenge = self._get_challenge(*args, **kwargs) 100 except StageInvalidException as exc: 101 self.logger.debug("Got StageInvalidException", exc=exc) 102 return self.executor.stage_invalid() 103 if not challenge.is_valid(): 104 self.logger.error( 105 "f(ch): Invalid challenge", 106 errors=challenge.errors, 107 challenge=challenge.data, 108 ) 109 return HttpChallengeResponse(challenge)
Return a challenge for the frontend to solve
111 def post(self, request: Request, *args, **kwargs) -> HttpResponse: 112 """Handle challenge response""" 113 valid = False 114 try: 115 challenge: ChallengeResponse = self.get_response_instance(data=request.data) 116 valid = challenge.is_valid() 117 except StageInvalidException as exc: 118 self.logger.debug("Got StageInvalidException", exc=exc) 119 return self.executor.stage_invalid() 120 if not valid: 121 if self.executor.current_binding.invalid_response_action in [ 122 InvalidResponseAction.RESTART, 123 InvalidResponseAction.RESTART_WITH_CONTEXT, 124 ]: 125 keep_context = ( 126 self.executor.current_binding.invalid_response_action 127 == InvalidResponseAction.RESTART_WITH_CONTEXT 128 ) 129 self.logger.debug( 130 "f(ch): Invalid response, restarting flow", 131 keep_context=keep_context, 132 ) 133 return self.executor.restart_flow(keep_context) 134 with ( 135 start_span( 136 op="authentik.flow.stage.challenge_invalid", 137 name=self.__class__.__name__, 138 ), 139 HIST_FLOWS_STAGE_TIME.labels( 140 stage_type=self.__class__.__name__, method="challenge_invalid" 141 ).time(), 142 ): 143 return self.challenge_invalid(challenge) 144 with ( 145 start_span( 146 op="authentik.flow.stage.challenge_valid", 147 name=self.__class__.__name__, 148 ), 149 HIST_FLOWS_STAGE_TIME.labels( 150 stage_type=self.__class__.__name__, method="challenge_valid" 151 ).time(), 152 ): 153 return self.challenge_valid(challenge)
Handle challenge response
155 def format_title(self) -> str: 156 """Allow usage of placeholder in flow title.""" 157 if not self.executor.plan: 158 return self.executor.flow.title 159 try: 160 return self.executor.flow.title % { 161 "app": self.executor.plan.context.get(PLAN_CONTEXT_APPLICATION, ""), 162 "user": self.get_pending_user(for_display=True), 163 } 164 165 except Exception as exc: # noqa 166 self.logger.warning("failed to template title", exc=exc) 167 return self.executor.flow.title
Allow usage of placeholder in flow title.
169 @property 170 def cancel_url(self) -> str: 171 from authentik.flows.views.executor import NEXT_ARG_NAME, SESSION_KEY_GET 172 173 next_param = self.request.session.get(SESSION_KEY_GET, {}).get(NEXT_ARG_NAME) 174 url = reverse("authentik_flows:cancel") 175 if next_param: 176 return f"{url}?{urlencode({NEXT_ARG_NAME: next_param})}" 177 return url
221 def get_challenge(self, *args, **kwargs) -> Challenge: 222 """Return the challenge that the client should solve""" 223 raise NotImplementedError
Return the challenge that the client should solve
225 def challenge_valid(self, response: ChallengeResponse) -> HttpResponse: 226 """Callback when the challenge has the correct format""" 227 raise NotImplementedError
Callback when the challenge has the correct format
229 def challenge_invalid(self, response: ChallengeResponse) -> HttpResponse: 230 """Callback when the challenge has the incorrect format""" 231 challenge_response = self._get_challenge() 232 full_errors = {} 233 for field, errors in response.errors.items(): 234 for error in errors: 235 full_errors.setdefault(field, []) 236 field_error = { 237 "string": str(error), 238 } 239 if hasattr(error, "code"): 240 field_error["code"] = error.code 241 full_errors[field].append(field_error) 242 challenge_response.initial_data["response_errors"] = full_errors 243 if not challenge_response.is_valid(): 244 if settings.TEST: 245 raise StageInvalidException( 246 ( 247 f"Invalid challenge response: \n\t{challenge_response.errors}" 248 f"\n\nValidated data:\n\t {challenge_response.data}" 249 f"\n\nInitial data:\n\t {challenge_response.initial_data}" 250 ), 251 ) 252 self.logger.error( 253 "f(ch): invalid challenge response", 254 errors=challenge_response.errors, 255 ) 256 return HttpChallengeResponse(challenge_response)
Callback when the challenge has the incorrect format
259class AccessDeniedStage(ChallengeStageView): 260 """Used internally by FlowExecutor's stage_invalid()""" 261 262 error_message: str | None 263 264 def __init__(self, executor: FlowExecutorView, error_message: str | None = None, **kwargs): 265 super().__init__(executor, **kwargs) 266 self.error_message = error_message 267 268 def get_challenge(self, *args, **kwargs) -> Challenge: 269 return AccessDeniedChallenge( 270 data={ 271 "error_message": str(self.error_message or "Unknown error"), 272 "component": "ak-stage-access-denied", 273 } 274 ) 275 276 # This can never be reached since this challenge is created on demand and only the 277 # .get() method is called 278 def challenge_valid(self, response: ChallengeResponse) -> HttpResponse: # pragma: no cover 279 return self.executor.cancel()
Used internally by FlowExecutor's stage_invalid()
264 def __init__(self, executor: FlowExecutorView, error_message: str | None = None, **kwargs): 265 super().__init__(executor, **kwargs) 266 self.error_message = error_message
Constructor. Called in the URLconf; can contain helpful extra keyword arguments, and other things.
268 def get_challenge(self, *args, **kwargs) -> Challenge: 269 return AccessDeniedChallenge( 270 data={ 271 "error_message": str(self.error_message or "Unknown error"), 272 "component": "ak-stage-access-denied", 273 } 274 )
Return the challenge that the client should solve
278 def challenge_valid(self, response: ChallengeResponse) -> HttpResponse: # pragma: no cover 279 return self.executor.cancel()
Callback when the challenge has the correct format
282class RedirectStage(ChallengeStageView): 283 """Redirect to any URL""" 284 285 def get_challenge(self, *args, **kwargs) -> RedirectChallenge: 286 destination = getattr( 287 self.executor.current_stage, "destination", reverse("authentik_core:root-redirect") 288 ) 289 return RedirectChallenge( 290 data={ 291 "to": destination, 292 } 293 ) 294 295 def challenge_valid(self, response: ChallengeResponse) -> HttpResponse: 296 return HttpChallengeResponse(self.get_challenge())
Redirect to any URL
285 def get_challenge(self, *args, **kwargs) -> RedirectChallenge: 286 destination = getattr( 287 self.executor.current_stage, "destination", reverse("authentik_core:root-redirect") 288 ) 289 return RedirectChallenge( 290 data={ 291 "to": destination, 292 } 293 )
Return the challenge that the client should solve
295 def challenge_valid(self, response: ChallengeResponse) -> HttpResponse: 296 return HttpChallengeResponse(self.get_challenge())
Callback when the challenge has the correct format
299class SessionEndStage(ChallengeStageView): 300 """Stage inserted when a flow is used as invalidation flow. By default shows actions 301 that the user is likely to take after signing out of a provider.""" 302 303 def get_challenge(self, *args, **kwargs) -> Challenge: 304 if not self.request.user.is_authenticated: 305 return RedirectChallenge( 306 data={ 307 "to": reverse("authentik_core:root-redirect"), 308 }, 309 ) 310 application: Application | None = self.executor.plan.context.get(PLAN_CONTEXT_APPLICATION) 311 data = { 312 "component": "ak-stage-session-end", 313 "brand_name": self.request.brand.branding_title, 314 } 315 if application: 316 data["application_name"] = application.name 317 data["application_launch_url"] = application.get_launch_url(self.get_pending_user()) 318 if self.request.brand.flow_invalidation: 319 data["invalidation_flow_url"] = reverse( 320 "authentik_core:if-flow", 321 kwargs={ 322 "flow_slug": self.request.brand.flow_invalidation.slug, 323 }, 324 ) 325 326 return SessionEndChallenge(data=data) 327 328 # This can never be reached since this challenge is created on demand and only the 329 # .get() method is called 330 def challenge_valid(self, response: ChallengeResponse) -> HttpResponse: # pragma: no cover 331 return self.executor.cancel()
Stage inserted when a flow is used as invalidation flow. By default shows actions that the user is likely to take after signing out of a provider.
303 def get_challenge(self, *args, **kwargs) -> Challenge: 304 if not self.request.user.is_authenticated: 305 return RedirectChallenge( 306 data={ 307 "to": reverse("authentik_core:root-redirect"), 308 }, 309 ) 310 application: Application | None = self.executor.plan.context.get(PLAN_CONTEXT_APPLICATION) 311 data = { 312 "component": "ak-stage-session-end", 313 "brand_name": self.request.brand.branding_title, 314 } 315 if application: 316 data["application_name"] = application.name 317 data["application_launch_url"] = application.get_launch_url(self.get_pending_user()) 318 if self.request.brand.flow_invalidation: 319 data["invalidation_flow_url"] = reverse( 320 "authentik_core:if-flow", 321 kwargs={ 322 "flow_slug": self.request.brand.flow_invalidation.slug, 323 }, 324 ) 325 326 return SessionEndChallenge(data=data)
Return the challenge that the client should solve
330 def challenge_valid(self, response: ChallengeResponse) -> HttpResponse: # pragma: no cover 331 return self.executor.cancel()
Callback when the challenge has the correct format