authentik.flows.stage

authentik stage Base view

  1"""authentik stage Base view"""
  2
  3from typing import TYPE_CHECKING
  4from urllib.parse import urlencode
  5
  6from django.conf import settings
  7from django.contrib.auth.models import AnonymousUser
  8from django.http import HttpRequest
  9from django.http.request import QueryDict
 10from django.http.response import HttpResponse
 11from django.urls import reverse
 12from django.views.generic.base import View
 13from prometheus_client import Histogram
 14from rest_framework.request import Request
 15from sentry_sdk import start_span
 16from structlog.stdlib import BoundLogger, get_logger
 17
 18from authentik.core.models import Application, User
 19from authentik.flows.challenge import (
 20    AccessDeniedChallenge,
 21    Challenge,
 22    ChallengeResponse,
 23    ContextualFlowInfo,
 24    HttpChallengeResponse,
 25    RedirectChallenge,
 26    SessionEndChallenge,
 27    WithUserInfoChallenge,
 28)
 29from authentik.flows.exceptions import StageInvalidException
 30from authentik.flows.models import InvalidResponseAction
 31from authentik.flows.planner import PLAN_CONTEXT_APPLICATION, PLAN_CONTEXT_PENDING_USER
 32from authentik.lib.avatars import DEFAULT_AVATAR, get_avatar
 33from authentik.lib.utils.reflection import class_to_path
 34
 35if TYPE_CHECKING:
 36    from authentik.flows.views.executor import FlowExecutorView
 37
 38PLAN_CONTEXT_PENDING_USER_IDENTIFIER = "pending_user_identifier"
 39HIST_FLOWS_STAGE_TIME = Histogram(
 40    "authentik_flows_stage_time",
 41    "Duration taken by different parts of stages",
 42    ["stage_type", "method"],
 43)
 44
 45
 46class StageView(View):
 47    """Abstract Stage"""
 48
 49    executor: FlowExecutorView
 50
 51    request: HttpRequest = None
 52
 53    logger: BoundLogger
 54
 55    def __init__(self, executor: FlowExecutorView, **kwargs):
 56        self.executor = executor
 57        current_stage = getattr(self.executor, "current_stage", None)
 58        self.logger = get_logger().bind(
 59            stage=getattr(current_stage, "name", None),
 60            stage_view=class_to_path(type(self)),
 61        )
 62        super().__init__(**kwargs)
 63
 64    def get_pending_user(self, for_display=False) -> User:
 65        """Either show the matched User object or show what the user entered,
 66        based on what the earlier stage (mostly IdentificationStage) set.
 67        _USER_IDENTIFIER overrides the first User, as PENDING_USER is used for
 68        other things besides the form display.
 69
 70        If no user is pending, returns request.user"""
 71        if not self.executor.plan:
 72            return self.request.user
 73        if PLAN_CONTEXT_PENDING_USER_IDENTIFIER in self.executor.plan.context and for_display:
 74            return User(
 75                username=self.executor.plan.context.get(PLAN_CONTEXT_PENDING_USER_IDENTIFIER),
 76                email="",
 77            )
 78        if PLAN_CONTEXT_PENDING_USER in self.executor.plan.context:
 79            return self.executor.plan.context[PLAN_CONTEXT_PENDING_USER]
 80        return self.request.user
 81
 82    def cleanup(self):
 83        """Cleanup session"""
 84
 85
 86class ChallengeStageView(StageView):
 87    """Stage view which response with a challenge"""
 88
 89    response_class = ChallengeResponse
 90
 91    def get_response_instance(self, data: QueryDict) -> ChallengeResponse:
 92        """Return the response class type"""
 93        return self.response_class(None, data=data, stage=self)
 94
 95    def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
 96        """Return a challenge for the frontend to solve"""
 97        try:
 98            challenge = self._get_challenge(*args, **kwargs)
 99        except StageInvalidException as exc:
100            self.logger.debug("Got StageInvalidException", exc=exc)
101            return self.executor.stage_invalid()
102        if not challenge.is_valid():
103            self.logger.error(
104                "f(ch): Invalid challenge",
105                errors=challenge.errors,
106                challenge=challenge.data,
107            )
108        return HttpChallengeResponse(challenge)
109
110    def post(self, request: Request, *args, **kwargs) -> HttpResponse:
111        """Handle challenge response"""
112        valid = False
113        try:
114            challenge: ChallengeResponse = self.get_response_instance(data=request.data)
115            valid = challenge.is_valid()
116        except StageInvalidException as exc:
117            self.logger.debug("Got StageInvalidException", exc=exc)
118            return self.executor.stage_invalid()
119        if not valid:
120            if self.executor.current_binding.invalid_response_action in [
121                InvalidResponseAction.RESTART,
122                InvalidResponseAction.RESTART_WITH_CONTEXT,
123            ]:
124                keep_context = (
125                    self.executor.current_binding.invalid_response_action
126                    == InvalidResponseAction.RESTART_WITH_CONTEXT
127                )
128                self.logger.debug(
129                    "f(ch): Invalid response, restarting flow",
130                    keep_context=keep_context,
131                )
132                return self.executor.restart_flow(keep_context)
133            with (
134                start_span(
135                    op="authentik.flow.stage.challenge_invalid",
136                    name=self.__class__.__name__,
137                ),
138                HIST_FLOWS_STAGE_TIME.labels(
139                    stage_type=self.__class__.__name__, method="challenge_invalid"
140                ).time(),
141            ):
142                return self.challenge_invalid(challenge)
143        with (
144            start_span(
145                op="authentik.flow.stage.challenge_valid",
146                name=self.__class__.__name__,
147            ),
148            HIST_FLOWS_STAGE_TIME.labels(
149                stage_type=self.__class__.__name__, method="challenge_valid"
150            ).time(),
151        ):
152            return self.challenge_valid(challenge)
153
154    def format_title(self) -> str:
155        """Allow usage of placeholder in flow title."""
156        if not self.executor.plan:
157            return self.executor.flow.title
158        try:
159            return self.executor.flow.title % {
160                "app": self.executor.plan.context.get(PLAN_CONTEXT_APPLICATION, ""),
161                "user": self.get_pending_user(for_display=True),
162            }
163
164        except Exception as exc:  # noqa
165            self.logger.warning("failed to template title", exc=exc)
166            return self.executor.flow.title
167
168    @property
169    def cancel_url(self) -> str:
170        from authentik.flows.views.executor import NEXT_ARG_NAME, SESSION_KEY_GET
171
172        next_param = self.request.session.get(SESSION_KEY_GET, {}).get(NEXT_ARG_NAME)
173        url = reverse("authentik_flows:cancel")
174        if next_param:
175            return f"{url}?{urlencode({NEXT_ARG_NAME: next_param})}"
176        return url
177
178    def _get_challenge(self, *args, **kwargs) -> Challenge:
179        with (
180            start_span(
181                op="authentik.flow.stage.get_challenge",
182                name=self.__class__.__name__,
183            ),
184            HIST_FLOWS_STAGE_TIME.labels(
185                stage_type=self.__class__.__name__, method="get_challenge"
186            ).time(),
187        ):
188            challenge = self.get_challenge(*args, **kwargs)
189        with start_span(
190            op="authentik.flow.stage._get_challenge",
191            name=self.__class__.__name__,
192        ):
193            if not hasattr(challenge, "initial_data"):
194                challenge.initial_data = {}
195            if "flow_info" not in challenge.initial_data:
196                flow_info = ContextualFlowInfo(
197                    data={
198                        "title": self.format_title(),
199                        "background": self.executor.flow.background_url(self.request),
200                        "background_themed_urls": self.executor.flow.background_themed_urls(
201                            self.request
202                        ),
203                        "cancel_url": self.cancel_url,
204                        "layout": self.executor.flow.layout,
205                    }
206                )
207                flow_info.is_valid()
208                challenge.initial_data["flow_info"] = flow_info.data
209            if isinstance(challenge, WithUserInfoChallenge):
210                # If there's a pending user, update the `username` field
211                # this field is only used by password managers.
212                # If there's no user set, an error is raised later.
213                if user := self.get_pending_user(for_display=True):
214                    challenge.initial_data["pending_user"] = user.username
215                challenge.initial_data["pending_user_avatar"] = DEFAULT_AVATAR
216                if not isinstance(user, AnonymousUser):
217                    challenge.initial_data["pending_user_avatar"] = get_avatar(user, self.request)
218        return challenge
219
220    def get_challenge(self, *args, **kwargs) -> Challenge:
221        """Return the challenge that the client should solve"""
222        raise NotImplementedError
223
224    def challenge_valid(self, response: ChallengeResponse) -> HttpResponse:
225        """Callback when the challenge has the correct format"""
226        raise NotImplementedError
227
228    def challenge_invalid(self, response: ChallengeResponse) -> HttpResponse:
229        """Callback when the challenge has the incorrect format"""
230        challenge_response = self._get_challenge()
231        full_errors = {}
232        for field, errors in response.errors.items():
233            for error in errors:
234                full_errors.setdefault(field, [])
235                field_error = {
236                    "string": str(error),
237                }
238                if hasattr(error, "code"):
239                    field_error["code"] = error.code
240                full_errors[field].append(field_error)
241        challenge_response.initial_data["response_errors"] = full_errors
242        if not challenge_response.is_valid():
243            if settings.TEST:
244                raise StageInvalidException(
245                    (
246                        f"Invalid challenge response: \n\t{challenge_response.errors}"
247                        f"\n\nValidated data:\n\t {challenge_response.data}"
248                        f"\n\nInitial data:\n\t {challenge_response.initial_data}"
249                    ),
250                )
251            self.logger.error(
252                "f(ch): invalid challenge response",
253                errors=challenge_response.errors,
254            )
255        return HttpChallengeResponse(challenge_response)
256
257
258class AccessDeniedStage(ChallengeStageView):
259    """Used internally by FlowExecutor's stage_invalid()"""
260
261    error_message: str | None
262
263    def __init__(self, executor: FlowExecutorView, error_message: str | None = None, **kwargs):
264        super().__init__(executor, **kwargs)
265        self.error_message = error_message
266
267    def get_challenge(self, *args, **kwargs) -> Challenge:
268        return AccessDeniedChallenge(
269            data={
270                "error_message": str(self.error_message or "Unknown error"),
271                "component": "ak-stage-access-denied",
272            }
273        )
274
275    # This can never be reached since this challenge is created on demand and only the
276    # .get() method is called
277    def challenge_valid(self, response: ChallengeResponse) -> HttpResponse:  # pragma: no cover
278        return self.executor.cancel()
279
280
281class RedirectStage(ChallengeStageView):
282    """Redirect to any URL"""
283
284    def get_challenge(self, *args, **kwargs) -> RedirectChallenge:
285        destination = getattr(
286            self.executor.current_stage, "destination", reverse("authentik_core:root-redirect")
287        )
288        return RedirectChallenge(
289            data={
290                "to": destination,
291            }
292        )
293
294    def challenge_valid(self, response: ChallengeResponse) -> HttpResponse:
295        return HttpChallengeResponse(self.get_challenge())
296
297
298class SessionEndStage(ChallengeStageView):
299    """Stage inserted when a flow is used as invalidation flow. By default shows actions
300    that the user is likely to take after signing out of a provider."""
301
302    def get_challenge(self, *args, **kwargs) -> Challenge:
303        if not self.request.user.is_authenticated:
304            return RedirectChallenge(
305                data={
306                    "to": reverse("authentik_core:root-redirect"),
307                },
308            )
309        application: Application | None = self.executor.plan.context.get(PLAN_CONTEXT_APPLICATION)
310        data = {
311            "component": "ak-stage-session-end",
312            "brand_name": self.request.brand.branding_title,
313        }
314        if application:
315            data["application_name"] = application.name
316            data["application_launch_url"] = application.get_launch_url(self.get_pending_user())
317        if self.request.brand.flow_invalidation:
318            data["invalidation_flow_url"] = reverse(
319                "authentik_core:if-flow",
320                kwargs={
321                    "flow_slug": self.request.brand.flow_invalidation.slug,
322                },
323            )
324
325        return SessionEndChallenge(data=data)
326
327    # This can never be reached since this challenge is created on demand and only the
328    # .get() method is called
329    def challenge_valid(self, response: ChallengeResponse) -> HttpResponse:  # pragma: no cover
330        return self.executor.cancel()
PLAN_CONTEXT_PENDING_USER_IDENTIFIER = 'pending_user_identifier'
HIST_FLOWS_STAGE_TIME = prometheus_client.metrics.Histogram(authentik_flows_stage_time)
class StageView(django.views.generic.base.View):
47class StageView(View):
48    """Abstract Stage"""
49
50    executor: FlowExecutorView
51
52    request: HttpRequest = None
53
54    logger: BoundLogger
55
56    def __init__(self, executor: FlowExecutorView, **kwargs):
57        self.executor = executor
58        current_stage = getattr(self.executor, "current_stage", None)
59        self.logger = get_logger().bind(
60            stage=getattr(current_stage, "name", None),
61            stage_view=class_to_path(type(self)),
62        )
63        super().__init__(**kwargs)
64
65    def get_pending_user(self, for_display=False) -> User:
66        """Either show the matched User object or show what the user entered,
67        based on what the earlier stage (mostly IdentificationStage) set.
68        _USER_IDENTIFIER overrides the first User, as PENDING_USER is used for
69        other things besides the form display.
70
71        If no user is pending, returns request.user"""
72        if not self.executor.plan:
73            return self.request.user
74        if PLAN_CONTEXT_PENDING_USER_IDENTIFIER in self.executor.plan.context and for_display:
75            return User(
76                username=self.executor.plan.context.get(PLAN_CONTEXT_PENDING_USER_IDENTIFIER),
77                email="",
78            )
79        if PLAN_CONTEXT_PENDING_USER in self.executor.plan.context:
80            return self.executor.plan.context[PLAN_CONTEXT_PENDING_USER]
81        return self.request.user
82
83    def cleanup(self):
84        """Cleanup session"""

Abstract Stage

StageView(executor: authentik.flows.views.executor.FlowExecutorView, **kwargs)
56    def __init__(self, executor: FlowExecutorView, **kwargs):
57        self.executor = executor
58        current_stage = getattr(self.executor, "current_stage", None)
59        self.logger = get_logger().bind(
60            stage=getattr(current_stage, "name", None),
61            stage_view=class_to_path(type(self)),
62        )
63        super().__init__(**kwargs)

Constructor. Called in the URLconf; can contain helpful extra keyword arguments, and other things.

request: django.http.request.HttpRequest = None
logger: structlog.stdlib.BoundLogger
def get_pending_user(self, for_display=False) -> authentik.core.models.User:
65    def get_pending_user(self, for_display=False) -> User:
66        """Either show the matched User object or show what the user entered,
67        based on what the earlier stage (mostly IdentificationStage) set.
68        _USER_IDENTIFIER overrides the first User, as PENDING_USER is used for
69        other things besides the form display.
70
71        If no user is pending, returns request.user"""
72        if not self.executor.plan:
73            return self.request.user
74        if PLAN_CONTEXT_PENDING_USER_IDENTIFIER in self.executor.plan.context and for_display:
75            return User(
76                username=self.executor.plan.context.get(PLAN_CONTEXT_PENDING_USER_IDENTIFIER),
77                email="",
78            )
79        if PLAN_CONTEXT_PENDING_USER in self.executor.plan.context:
80            return self.executor.plan.context[PLAN_CONTEXT_PENDING_USER]
81        return self.request.user

Either show the matched User object or show what the user entered, based on what the earlier stage (mostly IdentificationStage) set. _USER_IDENTIFIER overrides the first User, as PENDING_USER is used for other things besides the form display.

If no user is pending, returns request.user

def cleanup(self):
83    def cleanup(self):
84        """Cleanup session"""

Cleanup session

class ChallengeStageView(StageView):
 87class ChallengeStageView(StageView):
 88    """Stage view which response with a challenge"""
 89
 90    response_class = ChallengeResponse
 91
 92    def get_response_instance(self, data: QueryDict) -> ChallengeResponse:
 93        """Return the response class type"""
 94        return self.response_class(None, data=data, stage=self)
 95
 96    def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
 97        """Return a challenge for the frontend to solve"""
 98        try:
 99            challenge = self._get_challenge(*args, **kwargs)
100        except StageInvalidException as exc:
101            self.logger.debug("Got StageInvalidException", exc=exc)
102            return self.executor.stage_invalid()
103        if not challenge.is_valid():
104            self.logger.error(
105                "f(ch): Invalid challenge",
106                errors=challenge.errors,
107                challenge=challenge.data,
108            )
109        return HttpChallengeResponse(challenge)
110
111    def post(self, request: Request, *args, **kwargs) -> HttpResponse:
112        """Handle challenge response"""
113        valid = False
114        try:
115            challenge: ChallengeResponse = self.get_response_instance(data=request.data)
116            valid = challenge.is_valid()
117        except StageInvalidException as exc:
118            self.logger.debug("Got StageInvalidException", exc=exc)
119            return self.executor.stage_invalid()
120        if not valid:
121            if self.executor.current_binding.invalid_response_action in [
122                InvalidResponseAction.RESTART,
123                InvalidResponseAction.RESTART_WITH_CONTEXT,
124            ]:
125                keep_context = (
126                    self.executor.current_binding.invalid_response_action
127                    == InvalidResponseAction.RESTART_WITH_CONTEXT
128                )
129                self.logger.debug(
130                    "f(ch): Invalid response, restarting flow",
131                    keep_context=keep_context,
132                )
133                return self.executor.restart_flow(keep_context)
134            with (
135                start_span(
136                    op="authentik.flow.stage.challenge_invalid",
137                    name=self.__class__.__name__,
138                ),
139                HIST_FLOWS_STAGE_TIME.labels(
140                    stage_type=self.__class__.__name__, method="challenge_invalid"
141                ).time(),
142            ):
143                return self.challenge_invalid(challenge)
144        with (
145            start_span(
146                op="authentik.flow.stage.challenge_valid",
147                name=self.__class__.__name__,
148            ),
149            HIST_FLOWS_STAGE_TIME.labels(
150                stage_type=self.__class__.__name__, method="challenge_valid"
151            ).time(),
152        ):
153            return self.challenge_valid(challenge)
154
155    def format_title(self) -> str:
156        """Allow usage of placeholder in flow title."""
157        if not self.executor.plan:
158            return self.executor.flow.title
159        try:
160            return self.executor.flow.title % {
161                "app": self.executor.plan.context.get(PLAN_CONTEXT_APPLICATION, ""),
162                "user": self.get_pending_user(for_display=True),
163            }
164
165        except Exception as exc:  # noqa
166            self.logger.warning("failed to template title", exc=exc)
167            return self.executor.flow.title
168
169    @property
170    def cancel_url(self) -> str:
171        from authentik.flows.views.executor import NEXT_ARG_NAME, SESSION_KEY_GET
172
173        next_param = self.request.session.get(SESSION_KEY_GET, {}).get(NEXT_ARG_NAME)
174        url = reverse("authentik_flows:cancel")
175        if next_param:
176            return f"{url}?{urlencode({NEXT_ARG_NAME: next_param})}"
177        return url
178
179    def _get_challenge(self, *args, **kwargs) -> Challenge:
180        with (
181            start_span(
182                op="authentik.flow.stage.get_challenge",
183                name=self.__class__.__name__,
184            ),
185            HIST_FLOWS_STAGE_TIME.labels(
186                stage_type=self.__class__.__name__, method="get_challenge"
187            ).time(),
188        ):
189            challenge = self.get_challenge(*args, **kwargs)
190        with start_span(
191            op="authentik.flow.stage._get_challenge",
192            name=self.__class__.__name__,
193        ):
194            if not hasattr(challenge, "initial_data"):
195                challenge.initial_data = {}
196            if "flow_info" not in challenge.initial_data:
197                flow_info = ContextualFlowInfo(
198                    data={
199                        "title": self.format_title(),
200                        "background": self.executor.flow.background_url(self.request),
201                        "background_themed_urls": self.executor.flow.background_themed_urls(
202                            self.request
203                        ),
204                        "cancel_url": self.cancel_url,
205                        "layout": self.executor.flow.layout,
206                    }
207                )
208                flow_info.is_valid()
209                challenge.initial_data["flow_info"] = flow_info.data
210            if isinstance(challenge, WithUserInfoChallenge):
211                # If there's a pending user, update the `username` field
212                # this field is only used by password managers.
213                # If there's no user set, an error is raised later.
214                if user := self.get_pending_user(for_display=True):
215                    challenge.initial_data["pending_user"] = user.username
216                challenge.initial_data["pending_user_avatar"] = DEFAULT_AVATAR
217                if not isinstance(user, AnonymousUser):
218                    challenge.initial_data["pending_user_avatar"] = get_avatar(user, self.request)
219        return challenge
220
221    def get_challenge(self, *args, **kwargs) -> Challenge:
222        """Return the challenge that the client should solve"""
223        raise NotImplementedError
224
225    def challenge_valid(self, response: ChallengeResponse) -> HttpResponse:
226        """Callback when the challenge has the correct format"""
227        raise NotImplementedError
228
229    def challenge_invalid(self, response: ChallengeResponse) -> HttpResponse:
230        """Callback when the challenge has the incorrect format"""
231        challenge_response = self._get_challenge()
232        full_errors = {}
233        for field, errors in response.errors.items():
234            for error in errors:
235                full_errors.setdefault(field, [])
236                field_error = {
237                    "string": str(error),
238                }
239                if hasattr(error, "code"):
240                    field_error["code"] = error.code
241                full_errors[field].append(field_error)
242        challenge_response.initial_data["response_errors"] = full_errors
243        if not challenge_response.is_valid():
244            if settings.TEST:
245                raise StageInvalidException(
246                    (
247                        f"Invalid challenge response: \n\t{challenge_response.errors}"
248                        f"\n\nValidated data:\n\t {challenge_response.data}"
249                        f"\n\nInitial data:\n\t {challenge_response.initial_data}"
250                    ),
251                )
252            self.logger.error(
253                "f(ch): invalid challenge response",
254                errors=challenge_response.errors,
255            )
256        return HttpChallengeResponse(challenge_response)

Stage view which response with a challenge

response_class = <class 'authentik.flows.challenge.ChallengeResponse'>
def get_response_instance( self, data: django.http.request.QueryDict) -> authentik.flows.challenge.ChallengeResponse:
92    def get_response_instance(self, data: QueryDict) -> ChallengeResponse:
93        """Return the response class type"""
94        return self.response_class(None, data=data, stage=self)

Return the response class type

def get( self, request: django.http.request.HttpRequest, *args, **kwargs) -> django.http.response.HttpResponse:
 96    def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
 97        """Return a challenge for the frontend to solve"""
 98        try:
 99            challenge = self._get_challenge(*args, **kwargs)
100        except StageInvalidException as exc:
101            self.logger.debug("Got StageInvalidException", exc=exc)
102            return self.executor.stage_invalid()
103        if not challenge.is_valid():
104            self.logger.error(
105                "f(ch): Invalid challenge",
106                errors=challenge.errors,
107                challenge=challenge.data,
108            )
109        return HttpChallengeResponse(challenge)

Return a challenge for the frontend to solve

def post( self, request: rest_framework.request.Request, *args, **kwargs) -> django.http.response.HttpResponse:
111    def post(self, request: Request, *args, **kwargs) -> HttpResponse:
112        """Handle challenge response"""
113        valid = False
114        try:
115            challenge: ChallengeResponse = self.get_response_instance(data=request.data)
116            valid = challenge.is_valid()
117        except StageInvalidException as exc:
118            self.logger.debug("Got StageInvalidException", exc=exc)
119            return self.executor.stage_invalid()
120        if not valid:
121            if self.executor.current_binding.invalid_response_action in [
122                InvalidResponseAction.RESTART,
123                InvalidResponseAction.RESTART_WITH_CONTEXT,
124            ]:
125                keep_context = (
126                    self.executor.current_binding.invalid_response_action
127                    == InvalidResponseAction.RESTART_WITH_CONTEXT
128                )
129                self.logger.debug(
130                    "f(ch): Invalid response, restarting flow",
131                    keep_context=keep_context,
132                )
133                return self.executor.restart_flow(keep_context)
134            with (
135                start_span(
136                    op="authentik.flow.stage.challenge_invalid",
137                    name=self.__class__.__name__,
138                ),
139                HIST_FLOWS_STAGE_TIME.labels(
140                    stage_type=self.__class__.__name__, method="challenge_invalid"
141                ).time(),
142            ):
143                return self.challenge_invalid(challenge)
144        with (
145            start_span(
146                op="authentik.flow.stage.challenge_valid",
147                name=self.__class__.__name__,
148            ),
149            HIST_FLOWS_STAGE_TIME.labels(
150                stage_type=self.__class__.__name__, method="challenge_valid"
151            ).time(),
152        ):
153            return self.challenge_valid(challenge)

Handle challenge response

def format_title(self) -> str:
155    def format_title(self) -> str:
156        """Allow usage of placeholder in flow title."""
157        if not self.executor.plan:
158            return self.executor.flow.title
159        try:
160            return self.executor.flow.title % {
161                "app": self.executor.plan.context.get(PLAN_CONTEXT_APPLICATION, ""),
162                "user": self.get_pending_user(for_display=True),
163            }
164
165        except Exception as exc:  # noqa
166            self.logger.warning("failed to template title", exc=exc)
167            return self.executor.flow.title

Allow usage of placeholder in flow title.

cancel_url: str
169    @property
170    def cancel_url(self) -> str:
171        from authentik.flows.views.executor import NEXT_ARG_NAME, SESSION_KEY_GET
172
173        next_param = self.request.session.get(SESSION_KEY_GET, {}).get(NEXT_ARG_NAME)
174        url = reverse("authentik_flows:cancel")
175        if next_param:
176            return f"{url}?{urlencode({NEXT_ARG_NAME: next_param})}"
177        return url
def get_challenge(self, *args, **kwargs) -> authentik.flows.challenge.Challenge:
221    def get_challenge(self, *args, **kwargs) -> Challenge:
222        """Return the challenge that the client should solve"""
223        raise NotImplementedError

Return the challenge that the client should solve

def challenge_valid( self, response: authentik.flows.challenge.ChallengeResponse) -> django.http.response.HttpResponse:
225    def challenge_valid(self, response: ChallengeResponse) -> HttpResponse:
226        """Callback when the challenge has the correct format"""
227        raise NotImplementedError

Callback when the challenge has the correct format

def challenge_invalid( self, response: authentik.flows.challenge.ChallengeResponse) -> django.http.response.HttpResponse:
229    def challenge_invalid(self, response: ChallengeResponse) -> HttpResponse:
230        """Callback when the challenge has the incorrect format"""
231        challenge_response = self._get_challenge()
232        full_errors = {}
233        for field, errors in response.errors.items():
234            for error in errors:
235                full_errors.setdefault(field, [])
236                field_error = {
237                    "string": str(error),
238                }
239                if hasattr(error, "code"):
240                    field_error["code"] = error.code
241                full_errors[field].append(field_error)
242        challenge_response.initial_data["response_errors"] = full_errors
243        if not challenge_response.is_valid():
244            if settings.TEST:
245                raise StageInvalidException(
246                    (
247                        f"Invalid challenge response: \n\t{challenge_response.errors}"
248                        f"\n\nValidated data:\n\t {challenge_response.data}"
249                        f"\n\nInitial data:\n\t {challenge_response.initial_data}"
250                    ),
251                )
252            self.logger.error(
253                "f(ch): invalid challenge response",
254                errors=challenge_response.errors,
255            )
256        return HttpChallengeResponse(challenge_response)

Callback when the challenge has the incorrect format

class AccessDeniedStage(ChallengeStageView):
259class AccessDeniedStage(ChallengeStageView):
260    """Used internally by FlowExecutor's stage_invalid()"""
261
262    error_message: str | None
263
264    def __init__(self, executor: FlowExecutorView, error_message: str | None = None, **kwargs):
265        super().__init__(executor, **kwargs)
266        self.error_message = error_message
267
268    def get_challenge(self, *args, **kwargs) -> Challenge:
269        return AccessDeniedChallenge(
270            data={
271                "error_message": str(self.error_message or "Unknown error"),
272                "component": "ak-stage-access-denied",
273            }
274        )
275
276    # This can never be reached since this challenge is created on demand and only the
277    # .get() method is called
278    def challenge_valid(self, response: ChallengeResponse) -> HttpResponse:  # pragma: no cover
279        return self.executor.cancel()

Used internally by FlowExecutor's stage_invalid()

AccessDeniedStage( executor: authentik.flows.views.executor.FlowExecutorView, error_message: str | None = None, **kwargs)
264    def __init__(self, executor: FlowExecutorView, error_message: str | None = None, **kwargs):
265        super().__init__(executor, **kwargs)
266        self.error_message = error_message

Constructor. Called in the URLconf; can contain helpful extra keyword arguments, and other things.

error_message: str | None
def get_challenge(self, *args, **kwargs) -> authentik.flows.challenge.Challenge:
268    def get_challenge(self, *args, **kwargs) -> Challenge:
269        return AccessDeniedChallenge(
270            data={
271                "error_message": str(self.error_message or "Unknown error"),
272                "component": "ak-stage-access-denied",
273            }
274        )

Return the challenge that the client should solve

def challenge_valid( self, response: authentik.flows.challenge.ChallengeResponse) -> django.http.response.HttpResponse:
278    def challenge_valid(self, response: ChallengeResponse) -> HttpResponse:  # pragma: no cover
279        return self.executor.cancel()

Callback when the challenge has the correct format

class RedirectStage(ChallengeStageView):
282class RedirectStage(ChallengeStageView):
283    """Redirect to any URL"""
284
285    def get_challenge(self, *args, **kwargs) -> RedirectChallenge:
286        destination = getattr(
287            self.executor.current_stage, "destination", reverse("authentik_core:root-redirect")
288        )
289        return RedirectChallenge(
290            data={
291                "to": destination,
292            }
293        )
294
295    def challenge_valid(self, response: ChallengeResponse) -> HttpResponse:
296        return HttpChallengeResponse(self.get_challenge())

Redirect to any URL

def get_challenge(self, *args, **kwargs) -> authentik.flows.challenge.RedirectChallenge:
285    def get_challenge(self, *args, **kwargs) -> RedirectChallenge:
286        destination = getattr(
287            self.executor.current_stage, "destination", reverse("authentik_core:root-redirect")
288        )
289        return RedirectChallenge(
290            data={
291                "to": destination,
292            }
293        )

Return the challenge that the client should solve

def challenge_valid( self, response: authentik.flows.challenge.ChallengeResponse) -> django.http.response.HttpResponse:
295    def challenge_valid(self, response: ChallengeResponse) -> HttpResponse:
296        return HttpChallengeResponse(self.get_challenge())

Callback when the challenge has the correct format

class SessionEndStage(ChallengeStageView):
299class SessionEndStage(ChallengeStageView):
300    """Stage inserted when a flow is used as invalidation flow. By default shows actions
301    that the user is likely to take after signing out of a provider."""
302
303    def get_challenge(self, *args, **kwargs) -> Challenge:
304        if not self.request.user.is_authenticated:
305            return RedirectChallenge(
306                data={
307                    "to": reverse("authentik_core:root-redirect"),
308                },
309            )
310        application: Application | None = self.executor.plan.context.get(PLAN_CONTEXT_APPLICATION)
311        data = {
312            "component": "ak-stage-session-end",
313            "brand_name": self.request.brand.branding_title,
314        }
315        if application:
316            data["application_name"] = application.name
317            data["application_launch_url"] = application.get_launch_url(self.get_pending_user())
318        if self.request.brand.flow_invalidation:
319            data["invalidation_flow_url"] = reverse(
320                "authentik_core:if-flow",
321                kwargs={
322                    "flow_slug": self.request.brand.flow_invalidation.slug,
323                },
324            )
325
326        return SessionEndChallenge(data=data)
327
328    # This can never be reached since this challenge is created on demand and only the
329    # .get() method is called
330    def challenge_valid(self, response: ChallengeResponse) -> HttpResponse:  # pragma: no cover
331        return self.executor.cancel()

Stage inserted when a flow is used as invalidation flow. By default shows actions that the user is likely to take after signing out of a provider.

def get_challenge(self, *args, **kwargs) -> authentik.flows.challenge.Challenge:
303    def get_challenge(self, *args, **kwargs) -> Challenge:
304        if not self.request.user.is_authenticated:
305            return RedirectChallenge(
306                data={
307                    "to": reverse("authentik_core:root-redirect"),
308                },
309            )
310        application: Application | None = self.executor.plan.context.get(PLAN_CONTEXT_APPLICATION)
311        data = {
312            "component": "ak-stage-session-end",
313            "brand_name": self.request.brand.branding_title,
314        }
315        if application:
316            data["application_name"] = application.name
317            data["application_launch_url"] = application.get_launch_url(self.get_pending_user())
318        if self.request.brand.flow_invalidation:
319            data["invalidation_flow_url"] = reverse(
320                "authentik_core:if-flow",
321                kwargs={
322                    "flow_slug": self.request.brand.flow_invalidation.slug,
323                },
324            )
325
326        return SessionEndChallenge(data=data)

Return the challenge that the client should solve

def challenge_valid( self, response: authentik.flows.challenge.ChallengeResponse) -> django.http.response.HttpResponse:
330    def challenge_valid(self, response: ChallengeResponse) -> HttpResponse:  # pragma: no cover
331        return self.executor.cancel()

Callback when the challenge has the correct format