authentik.flows.stage

authentik stage Base view

  1"""authentik stage Base view"""
  2
  3from typing import TYPE_CHECKING
  4from urllib.parse import urlencode
  5
  6from django.conf import settings
  7from django.contrib.auth.models import AnonymousUser
  8from django.http import HttpRequest
  9from django.http.request import QueryDict
 10from django.http.response import HttpResponse
 11from django.urls import reverse
 12from django.views.generic.base import View
 13from prometheus_client import Histogram
 14from rest_framework.request import Request
 15from sentry_sdk import start_span
 16from structlog.stdlib import BoundLogger, get_logger
 17
 18from authentik.common.oauth.constants import PLAN_CONTEXT_POST_LOGOUT_REDIRECT_URI
 19from authentik.core.models import Application, User, UserTypes
 20from authentik.flows.challenge import (
 21    AccessDeniedChallenge,
 22    Challenge,
 23    ChallengeResponse,
 24    ContextualFlowInfo,
 25    HttpChallengeResponse,
 26    RedirectChallenge,
 27    SessionEndChallenge,
 28    WithUserInfoChallenge,
 29)
 30from authentik.flows.exceptions import StageInvalidException
 31from authentik.flows.models import InvalidResponseAction
 32from authentik.flows.planner import PLAN_CONTEXT_APPLICATION, PLAN_CONTEXT_PENDING_USER
 33from authentik.lib.avatars import DEFAULT_AVATAR, get_avatar
 34from authentik.lib.utils.reflection import class_to_path
 35
 36if TYPE_CHECKING:
 37    from authentik.flows.views.executor import FlowExecutorView
 38
 39PLAN_CONTEXT_PENDING_USER_IDENTIFIER = "pending_user_identifier"
 40HIST_FLOWS_STAGE_TIME = Histogram(
 41    "authentik_flows_stage_time",
 42    "Duration taken by different parts of stages",
 43    ["stage_type", "method"],
 44)
 45
 46
 47class StageView(View):
 48    """Abstract Stage"""
 49
 50    executor: FlowExecutorView
 51
 52    request: HttpRequest = None
 53
 54    logger: BoundLogger
 55
 56    def __init__(self, executor: FlowExecutorView, **kwargs):
 57        self.executor = executor
 58        current_stage = getattr(self.executor, "current_stage", None)
 59        self.logger = get_logger().bind(
 60            stage=getattr(current_stage, "name", None),
 61            stage_view=class_to_path(type(self)),
 62        )
 63        super().__init__(**kwargs)
 64
 65    def get_pending_user(self, for_display=False) -> User:
 66        """Either show the matched User object or show what the user entered,
 67        based on what the earlier stage (mostly IdentificationStage) set.
 68        _USER_IDENTIFIER overrides the first User, as PENDING_USER is used for
 69        other things besides the form display.
 70
 71        If no user is pending, returns request.user"""
 72        if not self.executor.plan:
 73            return self.request.user
 74        if PLAN_CONTEXT_PENDING_USER_IDENTIFIER in self.executor.plan.context and for_display:
 75            return User(
 76                username=self.executor.plan.context.get(PLAN_CONTEXT_PENDING_USER_IDENTIFIER),
 77                email="",
 78            )
 79        if PLAN_CONTEXT_PENDING_USER in self.executor.plan.context:
 80            return self.executor.plan.context[PLAN_CONTEXT_PENDING_USER]
 81        return self.request.user
 82
 83    def cleanup(self):
 84        """Cleanup session"""
 85
 86
 87class ChallengeStageView(StageView):
 88    """Stage view which response with a challenge"""
 89
 90    response_class = ChallengeResponse
 91
 92    def get_response_instance(self, data: QueryDict) -> ChallengeResponse:
 93        """Return the response class type"""
 94        return self.response_class(None, data=data, stage=self)
 95
 96    def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
 97        """Return a challenge for the frontend to solve"""
 98        try:
 99            challenge = self._get_challenge(*args, **kwargs)
100        except StageInvalidException as exc:
101            self.logger.debug("Got StageInvalidException", exc=exc)
102            return self.executor.stage_invalid()
103        if not challenge.is_valid():
104            self.logger.error(
105                "f(ch): Invalid challenge",
106                errors=challenge.errors,
107                challenge=challenge.data,
108            )
109        return HttpChallengeResponse(challenge)
110
111    def post(self, request: Request, *args, **kwargs) -> HttpResponse:
112        """Handle challenge response"""
113        valid = False
114        try:
115            challenge: ChallengeResponse = self.get_response_instance(data=request.data)
116            valid = challenge.is_valid()
117        except StageInvalidException as exc:
118            self.logger.debug("Got StageInvalidException", exc=exc)
119            return self.executor.stage_invalid()
120        if not valid:
121            if self.executor.current_binding.invalid_response_action in [
122                InvalidResponseAction.RESTART,
123                InvalidResponseAction.RESTART_WITH_CONTEXT,
124            ]:
125                keep_context = (
126                    self.executor.current_binding.invalid_response_action
127                    == InvalidResponseAction.RESTART_WITH_CONTEXT
128                )
129                self.logger.debug(
130                    "f(ch): Invalid response, restarting flow",
131                    keep_context=keep_context,
132                )
133                return self.executor.restart_flow(keep_context)
134            with (
135                start_span(
136                    op="authentik.flow.stage.challenge_invalid",
137                    name=self.__class__.__name__,
138                ),
139                HIST_FLOWS_STAGE_TIME.labels(
140                    stage_type=self.__class__.__name__, method="challenge_invalid"
141                ).time(),
142            ):
143                return self.challenge_invalid(challenge)
144        with (
145            start_span(
146                op="authentik.flow.stage.challenge_valid",
147                name=self.__class__.__name__,
148            ),
149            HIST_FLOWS_STAGE_TIME.labels(
150                stage_type=self.__class__.__name__, method="challenge_valid"
151            ).time(),
152        ):
153            return self.challenge_valid(challenge)
154
155    def format_title(self) -> str:
156        """Allow usage of placeholder in flow title."""
157        if not self.executor.plan:
158            return self.executor.flow.title
159        try:
160            return self.executor.flow.title % {
161                "app": self.executor.plan.context.get(PLAN_CONTEXT_APPLICATION, ""),
162                "user": self.get_pending_user(for_display=True),
163            }
164
165        except Exception as exc:  # noqa
166            self.logger.warning("failed to template title", exc=exc)
167            return self.executor.flow.title
168
169    @property
170    def cancel_url(self) -> str:
171        from authentik.flows.views.executor import NEXT_ARG_NAME, SESSION_KEY_GET
172
173        next_param = self.request.session.get(SESSION_KEY_GET, {}).get(NEXT_ARG_NAME)
174        url = reverse("authentik_flows:cancel")
175        if next_param:
176            return f"{url}?{urlencode({NEXT_ARG_NAME: next_param})}"
177        return url
178
179    def _get_challenge(self, *args, **kwargs) -> Challenge:
180        with (
181            start_span(
182                op="authentik.flow.stage.get_challenge",
183                name=self.__class__.__name__,
184            ),
185            HIST_FLOWS_STAGE_TIME.labels(
186                stage_type=self.__class__.__name__, method="get_challenge"
187            ).time(),
188        ):
189            challenge = self.get_challenge(*args, **kwargs)
190        with start_span(
191            op="authentik.flow.stage._get_challenge",
192            name=self.__class__.__name__,
193        ):
194            if not hasattr(challenge, "initial_data"):
195                challenge.initial_data = {}
196            if "flow_info" not in challenge.initial_data:
197                # Flow payloads can outlive the previous signed media JWT, so
198                # refreshes must mint fresh URLs instead of reusing cached ones.
199                flow_info = ContextualFlowInfo(
200                    data={
201                        "title": self.format_title(),
202                        "background": self.executor.flow.background_url(use_cache=False),
203                        "background_themed_urls": self.executor.flow.background_themed_urls(
204                            use_cache=False,
205                        ),
206                        "cancel_url": self.cancel_url,
207                        "layout": self.executor.flow.layout,
208                    }
209                )
210                flow_info.is_valid()
211                challenge.initial_data["flow_info"] = flow_info.data
212            if isinstance(challenge, WithUserInfoChallenge):
213                # If there's a pending user, update the `username` field
214                # this field is only used by password managers.
215                # If there's no user set, an error is raised later.
216                if user := self.get_pending_user(for_display=True):
217                    challenge.initial_data["pending_user"] = user.username
218                challenge.initial_data["pending_user_avatar"] = DEFAULT_AVATAR
219                if not isinstance(user, AnonymousUser):
220                    challenge.initial_data["pending_user_avatar"] = get_avatar(user, self.request)
221        return challenge
222
223    def get_challenge(self, *args, **kwargs) -> Challenge:
224        """Return the challenge that the client should solve"""
225        raise NotImplementedError
226
227    def challenge_valid(self, response: ChallengeResponse) -> HttpResponse:
228        """Callback when the challenge has the correct format"""
229        raise NotImplementedError
230
231    def challenge_invalid(self, response: ChallengeResponse) -> HttpResponse:
232        """Callback when the challenge has the incorrect format"""
233        challenge_response = self._get_challenge()
234        full_errors = {}
235        for field, errors in response.errors.items():
236            for error in errors:
237                full_errors.setdefault(field, [])
238                field_error = {
239                    "string": str(error),
240                }
241                if hasattr(error, "code"):
242                    field_error["code"] = error.code
243                full_errors[field].append(field_error)
244        challenge_response.initial_data["response_errors"] = full_errors
245        if not challenge_response.is_valid():
246            if settings.TEST:
247                raise StageInvalidException(
248                    (
249                        f"Invalid challenge response: \n\t{challenge_response.errors}"
250                        f"\n\nValidated data:\n\t {challenge_response.data}"
251                        f"\n\nInitial data:\n\t {challenge_response.initial_data}"
252                    ),
253                )
254            self.logger.error(
255                "f(ch): invalid challenge response",
256                errors=challenge_response.errors,
257            )
258        return HttpChallengeResponse(challenge_response)
259
260
261class AccessDeniedStage(ChallengeStageView):
262    """Used internally by FlowExecutor's stage_invalid()"""
263
264    error_message: str | None
265
266    def __init__(self, executor: FlowExecutorView, error_message: str | None = None, **kwargs):
267        super().__init__(executor, **kwargs)
268        self.error_message = error_message
269
270    def get_challenge(self, *args, **kwargs) -> Challenge:
271        return AccessDeniedChallenge(
272            data={
273                "error_message": str(self.error_message or "Unknown error"),
274                "component": "ak-stage-access-denied",
275            }
276        )
277
278    # This can never be reached since this challenge is created on demand and only the
279    # .get() method is called
280    def challenge_valid(self, response: ChallengeResponse) -> HttpResponse:  # pragma: no cover
281        return self.executor.cancel()
282
283
284class RedirectStage(ChallengeStageView):
285    """Redirect to any URL"""
286
287    def get_challenge(self, *args, **kwargs) -> RedirectChallenge:
288        destination = getattr(
289            self.executor.current_stage, "destination", reverse("authentik_core:root-redirect")
290        )
291        return RedirectChallenge(
292            data={
293                "to": destination,
294            }
295        )
296
297    def challenge_valid(self, response: ChallengeResponse) -> HttpResponse:
298        return HttpChallengeResponse(self.get_challenge())
299
300
301class SessionEndStage(ChallengeStageView):
302    """Stage inserted when a flow is used as invalidation flow. By default shows actions
303    that the user is likely to take after signing out of a provider."""
304
305    def get_challenge(self, *args, **kwargs) -> Challenge:
306        # Check for OIDC post_logout_redirect_uri in context
307        post_logout_redirect_uri = self.executor.plan.context.get(
308            PLAN_CONTEXT_POST_LOGOUT_REDIRECT_URI
309        )
310
311        if post_logout_redirect_uri:
312            self.logger.debug(
313                "SessionEndStage redirecting to post_logout_redirect_uri",
314                redirect_url=post_logout_redirect_uri,
315            )
316            return RedirectChallenge(
317                data={
318                    "to": post_logout_redirect_uri,
319                },
320            )
321
322        if not self.request.user.is_authenticated:
323            # User is logged out with no redirect URI - go to default
324            return RedirectChallenge(
325                data={
326                    "to": reverse("authentik_core:root-redirect"),
327                },
328            )
329        application: Application | None = self.executor.plan.context.get(PLAN_CONTEXT_APPLICATION)
330        data = {
331            "component": "ak-stage-session-end",
332            "brand_name": self.request.brand.branding_title,
333        }
334        if self.get_pending_user().type == UserTypes.INTERNAL:
335            data["overview_url"] = self.request.build_absolute_uri(
336                reverse("authentik_core:root-redirect")
337            )
338        if application:
339            data["application_name"] = application.name
340            data["application_launch_url"] = application.get_launch_url(self.get_pending_user())
341        if self.request.brand.flow_invalidation:
342            data["invalidation_flow_url"] = reverse(
343                "authentik_core:if-flow",
344                kwargs={
345                    "flow_slug": self.request.brand.flow_invalidation.slug,
346                },
347            )
348
349        return SessionEndChallenge(data=data)
350
351    # This can never be reached since this challenge is created on demand and only the
352    # .get() method is called
353    def challenge_valid(self, response: ChallengeResponse) -> HttpResponse:  # pragma: no cover
354        return self.executor.cancel()
PLAN_CONTEXT_PENDING_USER_IDENTIFIER = 'pending_user_identifier'
HIST_FLOWS_STAGE_TIME = prometheus_client.metrics.Histogram(authentik_flows_stage_time)
class StageView(django.views.generic.base.View):
48class StageView(View):
49    """Abstract Stage"""
50
51    executor: FlowExecutorView
52
53    request: HttpRequest = None
54
55    logger: BoundLogger
56
57    def __init__(self, executor: FlowExecutorView, **kwargs):
58        self.executor = executor
59        current_stage = getattr(self.executor, "current_stage", None)
60        self.logger = get_logger().bind(
61            stage=getattr(current_stage, "name", None),
62            stage_view=class_to_path(type(self)),
63        )
64        super().__init__(**kwargs)
65
66    def get_pending_user(self, for_display=False) -> User:
67        """Either show the matched User object or show what the user entered,
68        based on what the earlier stage (mostly IdentificationStage) set.
69        _USER_IDENTIFIER overrides the first User, as PENDING_USER is used for
70        other things besides the form display.
71
72        If no user is pending, returns request.user"""
73        if not self.executor.plan:
74            return self.request.user
75        if PLAN_CONTEXT_PENDING_USER_IDENTIFIER in self.executor.plan.context and for_display:
76            return User(
77                username=self.executor.plan.context.get(PLAN_CONTEXT_PENDING_USER_IDENTIFIER),
78                email="",
79            )
80        if PLAN_CONTEXT_PENDING_USER in self.executor.plan.context:
81            return self.executor.plan.context[PLAN_CONTEXT_PENDING_USER]
82        return self.request.user
83
84    def cleanup(self):
85        """Cleanup session"""

Abstract Stage

StageView(executor: authentik.flows.views.executor.FlowExecutorView, **kwargs)
57    def __init__(self, executor: FlowExecutorView, **kwargs):
58        self.executor = executor
59        current_stage = getattr(self.executor, "current_stage", None)
60        self.logger = get_logger().bind(
61            stage=getattr(current_stage, "name", None),
62            stage_view=class_to_path(type(self)),
63        )
64        super().__init__(**kwargs)

Constructor. Called in the URLconf; can contain helpful extra keyword arguments, and other things.

request: django.http.request.HttpRequest = None
logger: structlog.stdlib.BoundLogger
def get_pending_user(self, for_display=False) -> authentik.core.models.User:
66    def get_pending_user(self, for_display=False) -> User:
67        """Either show the matched User object or show what the user entered,
68        based on what the earlier stage (mostly IdentificationStage) set.
69        _USER_IDENTIFIER overrides the first User, as PENDING_USER is used for
70        other things besides the form display.
71
72        If no user is pending, returns request.user"""
73        if not self.executor.plan:
74            return self.request.user
75        if PLAN_CONTEXT_PENDING_USER_IDENTIFIER in self.executor.plan.context and for_display:
76            return User(
77                username=self.executor.plan.context.get(PLAN_CONTEXT_PENDING_USER_IDENTIFIER),
78                email="",
79            )
80        if PLAN_CONTEXT_PENDING_USER in self.executor.plan.context:
81            return self.executor.plan.context[PLAN_CONTEXT_PENDING_USER]
82        return self.request.user

Either show the matched User object or show what the user entered, based on what the earlier stage (mostly IdentificationStage) set. _USER_IDENTIFIER overrides the first User, as PENDING_USER is used for other things besides the form display.

If no user is pending, returns request.user

def cleanup(self):
84    def cleanup(self):
85        """Cleanup session"""

Cleanup session

class ChallengeStageView(StageView):
 88class ChallengeStageView(StageView):
 89    """Stage view which response with a challenge"""
 90
 91    response_class = ChallengeResponse
 92
 93    def get_response_instance(self, data: QueryDict) -> ChallengeResponse:
 94        """Return the response class type"""
 95        return self.response_class(None, data=data, stage=self)
 96
 97    def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
 98        """Return a challenge for the frontend to solve"""
 99        try:
100            challenge = self._get_challenge(*args, **kwargs)
101        except StageInvalidException as exc:
102            self.logger.debug("Got StageInvalidException", exc=exc)
103            return self.executor.stage_invalid()
104        if not challenge.is_valid():
105            self.logger.error(
106                "f(ch): Invalid challenge",
107                errors=challenge.errors,
108                challenge=challenge.data,
109            )
110        return HttpChallengeResponse(challenge)
111
112    def post(self, request: Request, *args, **kwargs) -> HttpResponse:
113        """Handle challenge response"""
114        valid = False
115        try:
116            challenge: ChallengeResponse = self.get_response_instance(data=request.data)
117            valid = challenge.is_valid()
118        except StageInvalidException as exc:
119            self.logger.debug("Got StageInvalidException", exc=exc)
120            return self.executor.stage_invalid()
121        if not valid:
122            if self.executor.current_binding.invalid_response_action in [
123                InvalidResponseAction.RESTART,
124                InvalidResponseAction.RESTART_WITH_CONTEXT,
125            ]:
126                keep_context = (
127                    self.executor.current_binding.invalid_response_action
128                    == InvalidResponseAction.RESTART_WITH_CONTEXT
129                )
130                self.logger.debug(
131                    "f(ch): Invalid response, restarting flow",
132                    keep_context=keep_context,
133                )
134                return self.executor.restart_flow(keep_context)
135            with (
136                start_span(
137                    op="authentik.flow.stage.challenge_invalid",
138                    name=self.__class__.__name__,
139                ),
140                HIST_FLOWS_STAGE_TIME.labels(
141                    stage_type=self.__class__.__name__, method="challenge_invalid"
142                ).time(),
143            ):
144                return self.challenge_invalid(challenge)
145        with (
146            start_span(
147                op="authentik.flow.stage.challenge_valid",
148                name=self.__class__.__name__,
149            ),
150            HIST_FLOWS_STAGE_TIME.labels(
151                stage_type=self.__class__.__name__, method="challenge_valid"
152            ).time(),
153        ):
154            return self.challenge_valid(challenge)
155
156    def format_title(self) -> str:
157        """Allow usage of placeholder in flow title."""
158        if not self.executor.plan:
159            return self.executor.flow.title
160        try:
161            return self.executor.flow.title % {
162                "app": self.executor.plan.context.get(PLAN_CONTEXT_APPLICATION, ""),
163                "user": self.get_pending_user(for_display=True),
164            }
165
166        except Exception as exc:  # noqa
167            self.logger.warning("failed to template title", exc=exc)
168            return self.executor.flow.title
169
170    @property
171    def cancel_url(self) -> str:
172        from authentik.flows.views.executor import NEXT_ARG_NAME, SESSION_KEY_GET
173
174        next_param = self.request.session.get(SESSION_KEY_GET, {}).get(NEXT_ARG_NAME)
175        url = reverse("authentik_flows:cancel")
176        if next_param:
177            return f"{url}?{urlencode({NEXT_ARG_NAME: next_param})}"
178        return url
179
180    def _get_challenge(self, *args, **kwargs) -> Challenge:
181        with (
182            start_span(
183                op="authentik.flow.stage.get_challenge",
184                name=self.__class__.__name__,
185            ),
186            HIST_FLOWS_STAGE_TIME.labels(
187                stage_type=self.__class__.__name__, method="get_challenge"
188            ).time(),
189        ):
190            challenge = self.get_challenge(*args, **kwargs)
191        with start_span(
192            op="authentik.flow.stage._get_challenge",
193            name=self.__class__.__name__,
194        ):
195            if not hasattr(challenge, "initial_data"):
196                challenge.initial_data = {}
197            if "flow_info" not in challenge.initial_data:
198                # Flow payloads can outlive the previous signed media JWT, so
199                # refreshes must mint fresh URLs instead of reusing cached ones.
200                flow_info = ContextualFlowInfo(
201                    data={
202                        "title": self.format_title(),
203                        "background": self.executor.flow.background_url(use_cache=False),
204                        "background_themed_urls": self.executor.flow.background_themed_urls(
205                            use_cache=False,
206                        ),
207                        "cancel_url": self.cancel_url,
208                        "layout": self.executor.flow.layout,
209                    }
210                )
211                flow_info.is_valid()
212                challenge.initial_data["flow_info"] = flow_info.data
213            if isinstance(challenge, WithUserInfoChallenge):
214                # If there's a pending user, update the `username` field
215                # this field is only used by password managers.
216                # If there's no user set, an error is raised later.
217                if user := self.get_pending_user(for_display=True):
218                    challenge.initial_data["pending_user"] = user.username
219                challenge.initial_data["pending_user_avatar"] = DEFAULT_AVATAR
220                if not isinstance(user, AnonymousUser):
221                    challenge.initial_data["pending_user_avatar"] = get_avatar(user, self.request)
222        return challenge
223
224    def get_challenge(self, *args, **kwargs) -> Challenge:
225        """Return the challenge that the client should solve"""
226        raise NotImplementedError
227
228    def challenge_valid(self, response: ChallengeResponse) -> HttpResponse:
229        """Callback when the challenge has the correct format"""
230        raise NotImplementedError
231
232    def challenge_invalid(self, response: ChallengeResponse) -> HttpResponse:
233        """Callback when the challenge has the incorrect format"""
234        challenge_response = self._get_challenge()
235        full_errors = {}
236        for field, errors in response.errors.items():
237            for error in errors:
238                full_errors.setdefault(field, [])
239                field_error = {
240                    "string": str(error),
241                }
242                if hasattr(error, "code"):
243                    field_error["code"] = error.code
244                full_errors[field].append(field_error)
245        challenge_response.initial_data["response_errors"] = full_errors
246        if not challenge_response.is_valid():
247            if settings.TEST:
248                raise StageInvalidException(
249                    (
250                        f"Invalid challenge response: \n\t{challenge_response.errors}"
251                        f"\n\nValidated data:\n\t {challenge_response.data}"
252                        f"\n\nInitial data:\n\t {challenge_response.initial_data}"
253                    ),
254                )
255            self.logger.error(
256                "f(ch): invalid challenge response",
257                errors=challenge_response.errors,
258            )
259        return HttpChallengeResponse(challenge_response)

Stage view which response with a challenge

response_class = <class 'authentik.flows.challenge.ChallengeResponse'>
def get_response_instance( self, data: django.http.request.QueryDict) -> authentik.flows.challenge.ChallengeResponse:
93    def get_response_instance(self, data: QueryDict) -> ChallengeResponse:
94        """Return the response class type"""
95        return self.response_class(None, data=data, stage=self)

Return the response class type

def get( self, request: django.http.request.HttpRequest, *args, **kwargs) -> django.http.response.HttpResponse:
 97    def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
 98        """Return a challenge for the frontend to solve"""
 99        try:
100            challenge = self._get_challenge(*args, **kwargs)
101        except StageInvalidException as exc:
102            self.logger.debug("Got StageInvalidException", exc=exc)
103            return self.executor.stage_invalid()
104        if not challenge.is_valid():
105            self.logger.error(
106                "f(ch): Invalid challenge",
107                errors=challenge.errors,
108                challenge=challenge.data,
109            )
110        return HttpChallengeResponse(challenge)

Return a challenge for the frontend to solve

def post( self, request: rest_framework.request.Request, *args, **kwargs) -> django.http.response.HttpResponse:
112    def post(self, request: Request, *args, **kwargs) -> HttpResponse:
113        """Handle challenge response"""
114        valid = False
115        try:
116            challenge: ChallengeResponse = self.get_response_instance(data=request.data)
117            valid = challenge.is_valid()
118        except StageInvalidException as exc:
119            self.logger.debug("Got StageInvalidException", exc=exc)
120            return self.executor.stage_invalid()
121        if not valid:
122            if self.executor.current_binding.invalid_response_action in [
123                InvalidResponseAction.RESTART,
124                InvalidResponseAction.RESTART_WITH_CONTEXT,
125            ]:
126                keep_context = (
127                    self.executor.current_binding.invalid_response_action
128                    == InvalidResponseAction.RESTART_WITH_CONTEXT
129                )
130                self.logger.debug(
131                    "f(ch): Invalid response, restarting flow",
132                    keep_context=keep_context,
133                )
134                return self.executor.restart_flow(keep_context)
135            with (
136                start_span(
137                    op="authentik.flow.stage.challenge_invalid",
138                    name=self.__class__.__name__,
139                ),
140                HIST_FLOWS_STAGE_TIME.labels(
141                    stage_type=self.__class__.__name__, method="challenge_invalid"
142                ).time(),
143            ):
144                return self.challenge_invalid(challenge)
145        with (
146            start_span(
147                op="authentik.flow.stage.challenge_valid",
148                name=self.__class__.__name__,
149            ),
150            HIST_FLOWS_STAGE_TIME.labels(
151                stage_type=self.__class__.__name__, method="challenge_valid"
152            ).time(),
153        ):
154            return self.challenge_valid(challenge)

Handle challenge response

def format_title(self) -> str:
156    def format_title(self) -> str:
157        """Allow usage of placeholder in flow title."""
158        if not self.executor.plan:
159            return self.executor.flow.title
160        try:
161            return self.executor.flow.title % {
162                "app": self.executor.plan.context.get(PLAN_CONTEXT_APPLICATION, ""),
163                "user": self.get_pending_user(for_display=True),
164            }
165
166        except Exception as exc:  # noqa
167            self.logger.warning("failed to template title", exc=exc)
168            return self.executor.flow.title

Allow usage of placeholder in flow title.

cancel_url: str
170    @property
171    def cancel_url(self) -> str:
172        from authentik.flows.views.executor import NEXT_ARG_NAME, SESSION_KEY_GET
173
174        next_param = self.request.session.get(SESSION_KEY_GET, {}).get(NEXT_ARG_NAME)
175        url = reverse("authentik_flows:cancel")
176        if next_param:
177            return f"{url}?{urlencode({NEXT_ARG_NAME: next_param})}"
178        return url
def get_challenge(self, *args, **kwargs) -> authentik.flows.challenge.Challenge:
224    def get_challenge(self, *args, **kwargs) -> Challenge:
225        """Return the challenge that the client should solve"""
226        raise NotImplementedError

Return the challenge that the client should solve

def challenge_valid( self, response: authentik.flows.challenge.ChallengeResponse) -> django.http.response.HttpResponse:
228    def challenge_valid(self, response: ChallengeResponse) -> HttpResponse:
229        """Callback when the challenge has the correct format"""
230        raise NotImplementedError

Callback when the challenge has the correct format

def challenge_invalid( self, response: authentik.flows.challenge.ChallengeResponse) -> django.http.response.HttpResponse:
232    def challenge_invalid(self, response: ChallengeResponse) -> HttpResponse:
233        """Callback when the challenge has the incorrect format"""
234        challenge_response = self._get_challenge()
235        full_errors = {}
236        for field, errors in response.errors.items():
237            for error in errors:
238                full_errors.setdefault(field, [])
239                field_error = {
240                    "string": str(error),
241                }
242                if hasattr(error, "code"):
243                    field_error["code"] = error.code
244                full_errors[field].append(field_error)
245        challenge_response.initial_data["response_errors"] = full_errors
246        if not challenge_response.is_valid():
247            if settings.TEST:
248                raise StageInvalidException(
249                    (
250                        f"Invalid challenge response: \n\t{challenge_response.errors}"
251                        f"\n\nValidated data:\n\t {challenge_response.data}"
252                        f"\n\nInitial data:\n\t {challenge_response.initial_data}"
253                    ),
254                )
255            self.logger.error(
256                "f(ch): invalid challenge response",
257                errors=challenge_response.errors,
258            )
259        return HttpChallengeResponse(challenge_response)

Callback when the challenge has the incorrect format

class AccessDeniedStage(ChallengeStageView):
262class AccessDeniedStage(ChallengeStageView):
263    """Used internally by FlowExecutor's stage_invalid()"""
264
265    error_message: str | None
266
267    def __init__(self, executor: FlowExecutorView, error_message: str | None = None, **kwargs):
268        super().__init__(executor, **kwargs)
269        self.error_message = error_message
270
271    def get_challenge(self, *args, **kwargs) -> Challenge:
272        return AccessDeniedChallenge(
273            data={
274                "error_message": str(self.error_message or "Unknown error"),
275                "component": "ak-stage-access-denied",
276            }
277        )
278
279    # This can never be reached since this challenge is created on demand and only the
280    # .get() method is called
281    def challenge_valid(self, response: ChallengeResponse) -> HttpResponse:  # pragma: no cover
282        return self.executor.cancel()

Used internally by FlowExecutor's stage_invalid()

AccessDeniedStage( executor: authentik.flows.views.executor.FlowExecutorView, error_message: str | None = None, **kwargs)
267    def __init__(self, executor: FlowExecutorView, error_message: str | None = None, **kwargs):
268        super().__init__(executor, **kwargs)
269        self.error_message = error_message

Constructor. Called in the URLconf; can contain helpful extra keyword arguments, and other things.

error_message: str | None
def get_challenge(self, *args, **kwargs) -> authentik.flows.challenge.Challenge:
271    def get_challenge(self, *args, **kwargs) -> Challenge:
272        return AccessDeniedChallenge(
273            data={
274                "error_message": str(self.error_message or "Unknown error"),
275                "component": "ak-stage-access-denied",
276            }
277        )

Return the challenge that the client should solve

def challenge_valid( self, response: authentik.flows.challenge.ChallengeResponse) -> django.http.response.HttpResponse:
281    def challenge_valid(self, response: ChallengeResponse) -> HttpResponse:  # pragma: no cover
282        return self.executor.cancel()

Callback when the challenge has the correct format

class RedirectStage(ChallengeStageView):
285class RedirectStage(ChallengeStageView):
286    """Redirect to any URL"""
287
288    def get_challenge(self, *args, **kwargs) -> RedirectChallenge:
289        destination = getattr(
290            self.executor.current_stage, "destination", reverse("authentik_core:root-redirect")
291        )
292        return RedirectChallenge(
293            data={
294                "to": destination,
295            }
296        )
297
298    def challenge_valid(self, response: ChallengeResponse) -> HttpResponse:
299        return HttpChallengeResponse(self.get_challenge())

Redirect to any URL

def get_challenge(self, *args, **kwargs) -> authentik.flows.challenge.RedirectChallenge:
288    def get_challenge(self, *args, **kwargs) -> RedirectChallenge:
289        destination = getattr(
290            self.executor.current_stage, "destination", reverse("authentik_core:root-redirect")
291        )
292        return RedirectChallenge(
293            data={
294                "to": destination,
295            }
296        )

Return the challenge that the client should solve

def challenge_valid( self, response: authentik.flows.challenge.ChallengeResponse) -> django.http.response.HttpResponse:
298    def challenge_valid(self, response: ChallengeResponse) -> HttpResponse:
299        return HttpChallengeResponse(self.get_challenge())

Callback when the challenge has the correct format

class SessionEndStage(ChallengeStageView):
302class SessionEndStage(ChallengeStageView):
303    """Stage inserted when a flow is used as invalidation flow. By default shows actions
304    that the user is likely to take after signing out of a provider."""
305
306    def get_challenge(self, *args, **kwargs) -> Challenge:
307        # Check for OIDC post_logout_redirect_uri in context
308        post_logout_redirect_uri = self.executor.plan.context.get(
309            PLAN_CONTEXT_POST_LOGOUT_REDIRECT_URI
310        )
311
312        if post_logout_redirect_uri:
313            self.logger.debug(
314                "SessionEndStage redirecting to post_logout_redirect_uri",
315                redirect_url=post_logout_redirect_uri,
316            )
317            return RedirectChallenge(
318                data={
319                    "to": post_logout_redirect_uri,
320                },
321            )
322
323        if not self.request.user.is_authenticated:
324            # User is logged out with no redirect URI - go to default
325            return RedirectChallenge(
326                data={
327                    "to": reverse("authentik_core:root-redirect"),
328                },
329            )
330        application: Application | None = self.executor.plan.context.get(PLAN_CONTEXT_APPLICATION)
331        data = {
332            "component": "ak-stage-session-end",
333            "brand_name": self.request.brand.branding_title,
334        }
335        if self.get_pending_user().type == UserTypes.INTERNAL:
336            data["overview_url"] = self.request.build_absolute_uri(
337                reverse("authentik_core:root-redirect")
338            )
339        if application:
340            data["application_name"] = application.name
341            data["application_launch_url"] = application.get_launch_url(self.get_pending_user())
342        if self.request.brand.flow_invalidation:
343            data["invalidation_flow_url"] = reverse(
344                "authentik_core:if-flow",
345                kwargs={
346                    "flow_slug": self.request.brand.flow_invalidation.slug,
347                },
348            )
349
350        return SessionEndChallenge(data=data)
351
352    # This can never be reached since this challenge is created on demand and only the
353    # .get() method is called
354    def challenge_valid(self, response: ChallengeResponse) -> HttpResponse:  # pragma: no cover
355        return self.executor.cancel()

Stage inserted when a flow is used as invalidation flow. By default shows actions that the user is likely to take after signing out of a provider.

def get_challenge(self, *args, **kwargs) -> authentik.flows.challenge.Challenge:
306    def get_challenge(self, *args, **kwargs) -> Challenge:
307        # Check for OIDC post_logout_redirect_uri in context
308        post_logout_redirect_uri = self.executor.plan.context.get(
309            PLAN_CONTEXT_POST_LOGOUT_REDIRECT_URI
310        )
311
312        if post_logout_redirect_uri:
313            self.logger.debug(
314                "SessionEndStage redirecting to post_logout_redirect_uri",
315                redirect_url=post_logout_redirect_uri,
316            )
317            return RedirectChallenge(
318                data={
319                    "to": post_logout_redirect_uri,
320                },
321            )
322
323        if not self.request.user.is_authenticated:
324            # User is logged out with no redirect URI - go to default
325            return RedirectChallenge(
326                data={
327                    "to": reverse("authentik_core:root-redirect"),
328                },
329            )
330        application: Application | None = self.executor.plan.context.get(PLAN_CONTEXT_APPLICATION)
331        data = {
332            "component": "ak-stage-session-end",
333            "brand_name": self.request.brand.branding_title,
334        }
335        if self.get_pending_user().type == UserTypes.INTERNAL:
336            data["overview_url"] = self.request.build_absolute_uri(
337                reverse("authentik_core:root-redirect")
338            )
339        if application:
340            data["application_name"] = application.name
341            data["application_launch_url"] = application.get_launch_url(self.get_pending_user())
342        if self.request.brand.flow_invalidation:
343            data["invalidation_flow_url"] = reverse(
344                "authentik_core:if-flow",
345                kwargs={
346                    "flow_slug": self.request.brand.flow_invalidation.slug,
347                },
348            )
349
350        return SessionEndChallenge(data=data)

Return the challenge that the client should solve

def challenge_valid( self, response: authentik.flows.challenge.ChallengeResponse) -> django.http.response.HttpResponse:
354    def challenge_valid(self, response: ChallengeResponse) -> HttpResponse:  # pragma: no cover
355        return self.executor.cancel()

Callback when the challenge has the correct format