authentik.stages.prompt.stage

Prompt Stage Logic

  1"""Prompt Stage Logic"""
  2
  3from collections.abc import Callable
  4from email.policy import Policy
  5from types import MethodType
  6from typing import Any
  7
  8from django.contrib.messages import INFO, add_message
  9from django.db.models.query import QuerySet
 10from django.http import HttpRequest, HttpResponse
 11from django.http.request import QueryDict
 12from django.utils.translation import gettext_lazy as _
 13from rest_framework.fields import (
 14    BooleanField,
 15    CharField,
 16    ChoiceField,
 17    IntegerField,
 18    ListField,
 19    empty,
 20)
 21from rest_framework.serializers import ValidationError
 22
 23from authentik.core.api.utils import PassiveSerializer
 24from authentik.core.models import User
 25from authentik.flows.challenge import Challenge, ChallengeResponse
 26from authentik.flows.planner import FlowPlan
 27from authentik.flows.stage import ChallengeStageView
 28from authentik.policies.engine import PolicyEngine
 29from authentik.policies.models import PolicyBinding, PolicyBindingModel, PolicyEngineMode
 30from authentik.stages.prompt.models import FieldTypes, Prompt, PromptStage
 31from authentik.stages.prompt.signals import password_validate
 32
 33PLAN_CONTEXT_PROMPT = "prompt_data"
 34
 35
 36class PromptChoiceSerializer(PassiveSerializer):
 37    """Serializer for a single Choice field"""
 38
 39    value = CharField(required=True)
 40    label = CharField(required=True)
 41
 42
 43class StagePromptSerializer(PassiveSerializer):
 44    """Serializer for a single Prompt field"""
 45
 46    field_key = CharField()
 47    label = CharField(allow_blank=True)
 48    type = ChoiceField(choices=FieldTypes.choices)
 49    required = BooleanField()
 50    placeholder = CharField(allow_blank=True)
 51    initial_value = CharField(allow_blank=True)
 52    order = IntegerField()
 53    sub_text = CharField(allow_blank=True)
 54    choices = ListField(child=PromptChoiceSerializer(), allow_empty=True, allow_null=True)
 55
 56
 57class PromptChallenge(Challenge):
 58    """Initial challenge being sent, define fields"""
 59
 60    fields = StagePromptSerializer(many=True)
 61    component = CharField(default="ak-stage-prompt")
 62
 63
 64class PromptChallengeResponse(ChallengeResponse):
 65    """Validate response, fields are dynamically created based
 66    on the stage"""
 67
 68    stage_instance: PromptStage
 69
 70    component = CharField(default="ak-stage-prompt")
 71
 72    def __init__(self, *args, **kwargs):
 73        stage: PromptStage = kwargs.pop("stage_instance", None)
 74        plan: FlowPlan = kwargs.pop("plan", None)
 75        request: HttpRequest = kwargs.pop("request", None)
 76        user: User = kwargs.pop("user", None)
 77        super().__init__(*args, **kwargs)
 78        self.stage_instance = stage
 79        self.plan = plan
 80        self.request = request
 81        if not self.stage_instance:
 82            return
 83        # list() is called so we only load the fields once
 84        fields = list(self.stage_instance.fields.all())
 85        for field in fields:
 86            field: Prompt
 87            choices = field.get_choices(
 88                plan.context.get(PLAN_CONTEXT_PROMPT, {}), user, self.request
 89            )
 90            current = field.get_initial_value(
 91                plan.context.get(PLAN_CONTEXT_PROMPT, {}), user, self.request
 92            )
 93            self.fields[field.field_key] = field.field(current, choices)
 94            # Special handling for fields with username type
 95            # these check for existing users with the same username
 96            if field.type == FieldTypes.USERNAME:
 97                setattr(
 98                    self,
 99                    f"validate_{field.field_key}",
100                    MethodType(username_field_validator_factory(), self),
101                )
102            # Check if we have a password field, add a handler that sends a signal
103            # to validate it
104            if field.type == FieldTypes.PASSWORD:
105                setattr(
106                    self,
107                    f"validate_{field.field_key}",
108                    MethodType(password_single_validator_factory(), self),
109                )
110
111        self.field_order = sorted(fields, key=lambda x: x.order)
112
113    def _validate_password_fields(self, *field_names):
114        """Check if the value of all password fields match by merging them into a set
115        and checking the length"""
116        all_passwords = {self.initial_data[x] for x in field_names}
117        if len(all_passwords) > 1:
118            raise ValidationError(_("Passwords don't match."))
119
120    def validate(self, attrs: dict[str, Any]) -> dict[str, Any]:
121        # Check if we have any static or hidden fields, and ensure they
122        # still have the same value
123        static_hidden_fields: QuerySet[Prompt] = self.stage_instance.fields.filter(
124            type__in=[
125                FieldTypes.HIDDEN,
126                FieldTypes.STATIC,
127                FieldTypes.ALERT_INFO,
128                FieldTypes.ALERT_WARNING,
129                FieldTypes.ALERT_DANGER,
130                FieldTypes.TEXT_READ_ONLY,
131                FieldTypes.TEXT_AREA_READ_ONLY,
132            ]
133        )
134        for static_hidden in static_hidden_fields:
135            field = self.fields[static_hidden.field_key]
136            default = field.default
137            # Prevent rest_framework.fields.empty from ending up in policies and events
138            if default == empty:
139                default = ""
140            attrs[static_hidden.field_key] = default
141
142        # Check if we have two password fields, and make sure they are the same
143        password_fields: QuerySet[Prompt] = self.stage_instance.fields.filter(
144            type=FieldTypes.PASSWORD
145        )
146        if password_fields.exists() and password_fields.count() == 2:  # noqa: PLR2004
147            self._validate_password_fields(*[field.field_key for field in password_fields])
148
149        engine = ListPolicyEngine(
150            self.stage_instance.validation_policies.all(),
151            self.stage.get_pending_user(),
152            self.request,
153        )
154        engine.mode = PolicyEngineMode.MODE_ALL
155        engine.request.context[PLAN_CONTEXT_PROMPT] = attrs
156        engine.use_cache = False
157        engine.build()
158        result = engine.result
159        if not result.passing:
160            raise ValidationError(list(result.messages))
161        else:
162            for msg in result.messages:
163                add_message(self.request, INFO, msg)
164        return attrs
165
166
167def username_field_validator_factory() -> Callable[[PromptChallengeResponse, str], Any]:
168    """Return a `clean_` method for `field`. Clean method checks if username is taken already."""
169
170    def username_field_validator(self: PromptChallengeResponse, value: str) -> Any:
171        """Check for duplicate usernames"""
172        pending_user = self.stage.get_pending_user()
173        query = User.objects.all()
174        if pending_user.pk:
175            query = query.exclude(username=pending_user.username)
176        if query.filter(username=value).exists():
177            raise ValidationError("Username is already taken.")
178        return value
179
180    return username_field_validator
181
182
183def password_single_validator_factory() -> Callable[[PromptChallengeResponse, str], Any]:
184    """Return a `clean_` method for `field`. Clean method checks if the password meets configured
185    PasswordPolicy."""
186
187    def password_single_clean(self: PromptChallengeResponse, value: str) -> Any:
188        """Send password validation signals for e.g. LDAP Source"""
189        password_validate.send(sender=self, password=value, plan_context=self.plan.context)
190        return value
191
192    return password_single_clean
193
194
195class ListPolicyEngine(PolicyEngine):
196    """Slightly modified policy engine, which uses a list instead of a PolicyBindingModel"""
197
198    def __init__(self, policies: list[Policy], user: User, request: HttpRequest = None) -> None:
199        super().__init__(PolicyBindingModel(), user, request)
200        self.__list = policies
201        self.use_cache = False
202
203    def bindings(self):
204        for idx, policy in enumerate(self.__list):
205            yield PolicyBinding(
206                policy=policy,
207                order=idx,
208            )
209
210
211class PromptStageView(ChallengeStageView):
212    """Prompt Stage, save form data in plan context."""
213
214    response_class = PromptChallengeResponse
215
216    def get_prompt_challenge_fields(self, fields: list[Prompt], context: dict, dry_run=False):
217        """Get serializers for all fields in `fields`, using the context `context`.
218        If `dry_run` is set, property mapping expression errors are raised, otherwise they
219        are logged and events are created"""
220        serializers = []
221        for field in fields:
222            data = StagePromptSerializer(field).data
223            # Ensure all placeholders and initial values are str, as
224            # otherwise further in we can fail serializer validation if we return
225            # some types such as bool
226            # choices can be a dict with value and label
227            choices = field.get_choices(context, self.get_pending_user(), self.request, dry_run)
228            if choices:
229                data["choices"] = list(self.clean_choices(choices))
230            else:
231                data["choices"] = None
232            data["placeholder"] = str(
233                field.get_placeholder(context, self.get_pending_user(), self.request, dry_run)
234            )
235            data["initial_value"] = str(
236                field.get_initial_value(context, self.get_pending_user(), self.request, dry_run)
237            )
238            serializers.append(data)
239        return serializers
240
241    def clean_choices(self, choices):
242        for choice in choices:
243            label, value = choice, choice
244            if isinstance(choice, dict):
245                label = choice.get("label", "")
246                value = choice.get("value", "")
247            yield {"label": str(label), "value": str(value)}
248
249    def get_challenge(self, *args, **kwargs) -> Challenge:
250        fields: list[Prompt] = list(self.executor.current_stage.fields.all().order_by("order"))
251        context_prompt = self.executor.plan.context.get(PLAN_CONTEXT_PROMPT, {})
252        serializers = self.get_prompt_challenge_fields(fields, context_prompt)
253        challenge = PromptChallenge(
254            data={
255                "fields": serializers,
256            },
257        )
258        return challenge
259
260    def get_response_instance(self, data: QueryDict) -> ChallengeResponse:
261        if not self.executor.plan:  # pragma: no cover
262            raise ValueError
263        return PromptChallengeResponse(
264            instance=None,
265            data=data,
266            request=self.request,
267            stage_instance=self.executor.current_stage,
268            stage=self,
269            plan=self.executor.plan,
270            user=self.get_pending_user(),
271        )
272
273    def challenge_valid(self, response: ChallengeResponse) -> HttpResponse:
274        if PLAN_CONTEXT_PROMPT not in self.executor.plan.context:
275            self.executor.plan.context[PLAN_CONTEXT_PROMPT] = {}
276        self.executor.plan.context[PLAN_CONTEXT_PROMPT].update(response.validated_data)
277        return self.executor.stage_ok()
PLAN_CONTEXT_PROMPT = 'prompt_data'
class PromptChoiceSerializer(authentik.core.api.utils.PassiveSerializer):
37class PromptChoiceSerializer(PassiveSerializer):
38    """Serializer for a single Choice field"""
39
40    value = CharField(required=True)
41    label = CharField(required=True)

Serializer for a single Choice field

value
label
class StagePromptSerializer(authentik.core.api.utils.PassiveSerializer):
44class StagePromptSerializer(PassiveSerializer):
45    """Serializer for a single Prompt field"""
46
47    field_key = CharField()
48    label = CharField(allow_blank=True)
49    type = ChoiceField(choices=FieldTypes.choices)
50    required = BooleanField()
51    placeholder = CharField(allow_blank=True)
52    initial_value = CharField(allow_blank=True)
53    order = IntegerField()
54    sub_text = CharField(allow_blank=True)
55    choices = ListField(child=PromptChoiceSerializer(), allow_empty=True, allow_null=True)

Serializer for a single Prompt field

field_key
label
type
required
placeholder
initial_value
order
sub_text
choices
class PromptChallenge(authentik.flows.challenge.Challenge):
58class PromptChallenge(Challenge):
59    """Initial challenge being sent, define fields"""
60
61    fields = StagePromptSerializer(many=True)
62    component = CharField(default="ak-stage-prompt")

Initial challenge being sent, define fields

def fields(unknown):

A dictionary of {field_name: field_instance}.

component
class PromptChallengeResponse(authentik.flows.challenge.ChallengeResponse):
 65class PromptChallengeResponse(ChallengeResponse):
 66    """Validate response, fields are dynamically created based
 67    on the stage"""
 68
 69    stage_instance: PromptStage
 70
 71    component = CharField(default="ak-stage-prompt")
 72
 73    def __init__(self, *args, **kwargs):
 74        stage: PromptStage = kwargs.pop("stage_instance", None)
 75        plan: FlowPlan = kwargs.pop("plan", None)
 76        request: HttpRequest = kwargs.pop("request", None)
 77        user: User = kwargs.pop("user", None)
 78        super().__init__(*args, **kwargs)
 79        self.stage_instance = stage
 80        self.plan = plan
 81        self.request = request
 82        if not self.stage_instance:
 83            return
 84        # list() is called so we only load the fields once
 85        fields = list(self.stage_instance.fields.all())
 86        for field in fields:
 87            field: Prompt
 88            choices = field.get_choices(
 89                plan.context.get(PLAN_CONTEXT_PROMPT, {}), user, self.request
 90            )
 91            current = field.get_initial_value(
 92                plan.context.get(PLAN_CONTEXT_PROMPT, {}), user, self.request
 93            )
 94            self.fields[field.field_key] = field.field(current, choices)
 95            # Special handling for fields with username type
 96            # these check for existing users with the same username
 97            if field.type == FieldTypes.USERNAME:
 98                setattr(
 99                    self,
100                    f"validate_{field.field_key}",
101                    MethodType(username_field_validator_factory(), self),
102                )
103            # Check if we have a password field, add a handler that sends a signal
104            # to validate it
105            if field.type == FieldTypes.PASSWORD:
106                setattr(
107                    self,
108                    f"validate_{field.field_key}",
109                    MethodType(password_single_validator_factory(), self),
110                )
111
112        self.field_order = sorted(fields, key=lambda x: x.order)
113
114    def _validate_password_fields(self, *field_names):
115        """Check if the value of all password fields match by merging them into a set
116        and checking the length"""
117        all_passwords = {self.initial_data[x] for x in field_names}
118        if len(all_passwords) > 1:
119            raise ValidationError(_("Passwords don't match."))
120
121    def validate(self, attrs: dict[str, Any]) -> dict[str, Any]:
122        # Check if we have any static or hidden fields, and ensure they
123        # still have the same value
124        static_hidden_fields: QuerySet[Prompt] = self.stage_instance.fields.filter(
125            type__in=[
126                FieldTypes.HIDDEN,
127                FieldTypes.STATIC,
128                FieldTypes.ALERT_INFO,
129                FieldTypes.ALERT_WARNING,
130                FieldTypes.ALERT_DANGER,
131                FieldTypes.TEXT_READ_ONLY,
132                FieldTypes.TEXT_AREA_READ_ONLY,
133            ]
134        )
135        for static_hidden in static_hidden_fields:
136            field = self.fields[static_hidden.field_key]
137            default = field.default
138            # Prevent rest_framework.fields.empty from ending up in policies and events
139            if default == empty:
140                default = ""
141            attrs[static_hidden.field_key] = default
142
143        # Check if we have two password fields, and make sure they are the same
144        password_fields: QuerySet[Prompt] = self.stage_instance.fields.filter(
145            type=FieldTypes.PASSWORD
146        )
147        if password_fields.exists() and password_fields.count() == 2:  # noqa: PLR2004
148            self._validate_password_fields(*[field.field_key for field in password_fields])
149
150        engine = ListPolicyEngine(
151            self.stage_instance.validation_policies.all(),
152            self.stage.get_pending_user(),
153            self.request,
154        )
155        engine.mode = PolicyEngineMode.MODE_ALL
156        engine.request.context[PLAN_CONTEXT_PROMPT] = attrs
157        engine.use_cache = False
158        engine.build()
159        result = engine.result
160        if not result.passing:
161            raise ValidationError(list(result.messages))
162        else:
163            for msg in result.messages:
164                add_message(self.request, INFO, msg)
165        return attrs

Validate response, fields are dynamically created based on the stage

PromptChallengeResponse(*args, **kwargs)
 73    def __init__(self, *args, **kwargs):
 74        stage: PromptStage = kwargs.pop("stage_instance", None)
 75        plan: FlowPlan = kwargs.pop("plan", None)
 76        request: HttpRequest = kwargs.pop("request", None)
 77        user: User = kwargs.pop("user", None)
 78        super().__init__(*args, **kwargs)
 79        self.stage_instance = stage
 80        self.plan = plan
 81        self.request = request
 82        if not self.stage_instance:
 83            return
 84        # list() is called so we only load the fields once
 85        fields = list(self.stage_instance.fields.all())
 86        for field in fields:
 87            field: Prompt
 88            choices = field.get_choices(
 89                plan.context.get(PLAN_CONTEXT_PROMPT, {}), user, self.request
 90            )
 91            current = field.get_initial_value(
 92                plan.context.get(PLAN_CONTEXT_PROMPT, {}), user, self.request
 93            )
 94            self.fields[field.field_key] = field.field(current, choices)
 95            # Special handling for fields with username type
 96            # these check for existing users with the same username
 97            if field.type == FieldTypes.USERNAME:
 98                setattr(
 99                    self,
100                    f"validate_{field.field_key}",
101                    MethodType(username_field_validator_factory(), self),
102                )
103            # Check if we have a password field, add a handler that sends a signal
104            # to validate it
105            if field.type == FieldTypes.PASSWORD:
106                setattr(
107                    self,
108                    f"validate_{field.field_key}",
109                    MethodType(password_single_validator_factory(), self),
110                )
111
112        self.field_order = sorted(fields, key=lambda x: x.order)
component
plan
request
field_order
def validate(self, attrs: dict[str, typing.Any]) -> dict[str, typing.Any]:
121    def validate(self, attrs: dict[str, Any]) -> dict[str, Any]:
122        # Check if we have any static or hidden fields, and ensure they
123        # still have the same value
124        static_hidden_fields: QuerySet[Prompt] = self.stage_instance.fields.filter(
125            type__in=[
126                FieldTypes.HIDDEN,
127                FieldTypes.STATIC,
128                FieldTypes.ALERT_INFO,
129                FieldTypes.ALERT_WARNING,
130                FieldTypes.ALERT_DANGER,
131                FieldTypes.TEXT_READ_ONLY,
132                FieldTypes.TEXT_AREA_READ_ONLY,
133            ]
134        )
135        for static_hidden in static_hidden_fields:
136            field = self.fields[static_hidden.field_key]
137            default = field.default
138            # Prevent rest_framework.fields.empty from ending up in policies and events
139            if default == empty:
140                default = ""
141            attrs[static_hidden.field_key] = default
142
143        # Check if we have two password fields, and make sure they are the same
144        password_fields: QuerySet[Prompt] = self.stage_instance.fields.filter(
145            type=FieldTypes.PASSWORD
146        )
147        if password_fields.exists() and password_fields.count() == 2:  # noqa: PLR2004
148            self._validate_password_fields(*[field.field_key for field in password_fields])
149
150        engine = ListPolicyEngine(
151            self.stage_instance.validation_policies.all(),
152            self.stage.get_pending_user(),
153            self.request,
154        )
155        engine.mode = PolicyEngineMode.MODE_ALL
156        engine.request.context[PLAN_CONTEXT_PROMPT] = attrs
157        engine.use_cache = False
158        engine.build()
159        result = engine.result
160        if not result.passing:
161            raise ValidationError(list(result.messages))
162        else:
163            for msg in result.messages:
164                add_message(self.request, INFO, msg)
165        return attrs
def username_field_validator_factory() -> Callable[[PromptChallengeResponse, str], typing.Any]:
168def username_field_validator_factory() -> Callable[[PromptChallengeResponse, str], Any]:
169    """Return a `clean_` method for `field`. Clean method checks if username is taken already."""
170
171    def username_field_validator(self: PromptChallengeResponse, value: str) -> Any:
172        """Check for duplicate usernames"""
173        pending_user = self.stage.get_pending_user()
174        query = User.objects.all()
175        if pending_user.pk:
176            query = query.exclude(username=pending_user.username)
177        if query.filter(username=value).exists():
178            raise ValidationError("Username is already taken.")
179        return value
180
181    return username_field_validator

Return a clean_ method for field. Clean method checks if username is taken already.

def password_single_validator_factory() -> Callable[[PromptChallengeResponse, str], typing.Any]:
184def password_single_validator_factory() -> Callable[[PromptChallengeResponse, str], Any]:
185    """Return a `clean_` method for `field`. Clean method checks if the password meets configured
186    PasswordPolicy."""
187
188    def password_single_clean(self: PromptChallengeResponse, value: str) -> Any:
189        """Send password validation signals for e.g. LDAP Source"""
190        password_validate.send(sender=self, password=value, plan_context=self.plan.context)
191        return value
192
193    return password_single_clean

Return a clean_ method for field. Clean method checks if the password meets configured PasswordPolicy.

class ListPolicyEngine(authentik.policies.engine.PolicyEngine):
196class ListPolicyEngine(PolicyEngine):
197    """Slightly modified policy engine, which uses a list instead of a PolicyBindingModel"""
198
199    def __init__(self, policies: list[Policy], user: User, request: HttpRequest = None) -> None:
200        super().__init__(PolicyBindingModel(), user, request)
201        self.__list = policies
202        self.use_cache = False
203
204    def bindings(self):
205        for idx, policy in enumerate(self.__list):
206            yield PolicyBinding(
207                policy=policy,
208                order=idx,
209            )

Slightly modified policy engine, which uses a list instead of a PolicyBindingModel

ListPolicyEngine( policies: list[email._policybase.Policy], user: authentik.core.models.User, request: django.http.request.HttpRequest = None)
199    def __init__(self, policies: list[Policy], user: User, request: HttpRequest = None) -> None:
200        super().__init__(PolicyBindingModel(), user, request)
201        self.__list = policies
202        self.use_cache = False
use_cache
def bindings(self):
204    def bindings(self):
205        for idx, policy in enumerate(self.__list):
206            yield PolicyBinding(
207                policy=policy,
208                order=idx,
209            )

Make sure all Policies are their respective classes

class PromptStageView(authentik.flows.stage.ChallengeStageView):
212class PromptStageView(ChallengeStageView):
213    """Prompt Stage, save form data in plan context."""
214
215    response_class = PromptChallengeResponse
216
217    def get_prompt_challenge_fields(self, fields: list[Prompt], context: dict, dry_run=False):
218        """Get serializers for all fields in `fields`, using the context `context`.
219        If `dry_run` is set, property mapping expression errors are raised, otherwise they
220        are logged and events are created"""
221        serializers = []
222        for field in fields:
223            data = StagePromptSerializer(field).data
224            # Ensure all placeholders and initial values are str, as
225            # otherwise further in we can fail serializer validation if we return
226            # some types such as bool
227            # choices can be a dict with value and label
228            choices = field.get_choices(context, self.get_pending_user(), self.request, dry_run)
229            if choices:
230                data["choices"] = list(self.clean_choices(choices))
231            else:
232                data["choices"] = None
233            data["placeholder"] = str(
234                field.get_placeholder(context, self.get_pending_user(), self.request, dry_run)
235            )
236            data["initial_value"] = str(
237                field.get_initial_value(context, self.get_pending_user(), self.request, dry_run)
238            )
239            serializers.append(data)
240        return serializers
241
242    def clean_choices(self, choices):
243        for choice in choices:
244            label, value = choice, choice
245            if isinstance(choice, dict):
246                label = choice.get("label", "")
247                value = choice.get("value", "")
248            yield {"label": str(label), "value": str(value)}
249
250    def get_challenge(self, *args, **kwargs) -> Challenge:
251        fields: list[Prompt] = list(self.executor.current_stage.fields.all().order_by("order"))
252        context_prompt = self.executor.plan.context.get(PLAN_CONTEXT_PROMPT, {})
253        serializers = self.get_prompt_challenge_fields(fields, context_prompt)
254        challenge = PromptChallenge(
255            data={
256                "fields": serializers,
257            },
258        )
259        return challenge
260
261    def get_response_instance(self, data: QueryDict) -> ChallengeResponse:
262        if not self.executor.plan:  # pragma: no cover
263            raise ValueError
264        return PromptChallengeResponse(
265            instance=None,
266            data=data,
267            request=self.request,
268            stage_instance=self.executor.current_stage,
269            stage=self,
270            plan=self.executor.plan,
271            user=self.get_pending_user(),
272        )
273
274    def challenge_valid(self, response: ChallengeResponse) -> HttpResponse:
275        if PLAN_CONTEXT_PROMPT not in self.executor.plan.context:
276            self.executor.plan.context[PLAN_CONTEXT_PROMPT] = {}
277        self.executor.plan.context[PLAN_CONTEXT_PROMPT].update(response.validated_data)
278        return self.executor.stage_ok()

Prompt Stage, save form data in plan context.

response_class = <class 'PromptChallengeResponse'>
def get_prompt_challenge_fields( self, fields: list[authentik.stages.prompt.models.Prompt], context: dict, dry_run=False):
217    def get_prompt_challenge_fields(self, fields: list[Prompt], context: dict, dry_run=False):
218        """Get serializers for all fields in `fields`, using the context `context`.
219        If `dry_run` is set, property mapping expression errors are raised, otherwise they
220        are logged and events are created"""
221        serializers = []
222        for field in fields:
223            data = StagePromptSerializer(field).data
224            # Ensure all placeholders and initial values are str, as
225            # otherwise further in we can fail serializer validation if we return
226            # some types such as bool
227            # choices can be a dict with value and label
228            choices = field.get_choices(context, self.get_pending_user(), self.request, dry_run)
229            if choices:
230                data["choices"] = list(self.clean_choices(choices))
231            else:
232                data["choices"] = None
233            data["placeholder"] = str(
234                field.get_placeholder(context, self.get_pending_user(), self.request, dry_run)
235            )
236            data["initial_value"] = str(
237                field.get_initial_value(context, self.get_pending_user(), self.request, dry_run)
238            )
239            serializers.append(data)
240        return serializers

Get serializers for all fields in fields, using the context context. If dry_run is set, property mapping expression errors are raised, otherwise they are logged and events are created

def clean_choices(self, choices):
242    def clean_choices(self, choices):
243        for choice in choices:
244            label, value = choice, choice
245            if isinstance(choice, dict):
246                label = choice.get("label", "")
247                value = choice.get("value", "")
248            yield {"label": str(label), "value": str(value)}
def get_challenge(self, *args, **kwargs) -> authentik.flows.challenge.Challenge:
250    def get_challenge(self, *args, **kwargs) -> Challenge:
251        fields: list[Prompt] = list(self.executor.current_stage.fields.all().order_by("order"))
252        context_prompt = self.executor.plan.context.get(PLAN_CONTEXT_PROMPT, {})
253        serializers = self.get_prompt_challenge_fields(fields, context_prompt)
254        challenge = PromptChallenge(
255            data={
256                "fields": serializers,
257            },
258        )
259        return challenge

Return the challenge that the client should solve

def get_response_instance( self, data: django.http.request.QueryDict) -> authentik.flows.challenge.ChallengeResponse:
261    def get_response_instance(self, data: QueryDict) -> ChallengeResponse:
262        if not self.executor.plan:  # pragma: no cover
263            raise ValueError
264        return PromptChallengeResponse(
265            instance=None,
266            data=data,
267            request=self.request,
268            stage_instance=self.executor.current_stage,
269            stage=self,
270            plan=self.executor.plan,
271            user=self.get_pending_user(),
272        )

Return the response class type

def challenge_valid( self, response: authentik.flows.challenge.ChallengeResponse) -> django.http.response.HttpResponse:
274    def challenge_valid(self, response: ChallengeResponse) -> HttpResponse:
275        if PLAN_CONTEXT_PROMPT not in self.executor.plan.context:
276            self.executor.plan.context[PLAN_CONTEXT_PROMPT] = {}
277        self.executor.plan.context[PLAN_CONTEXT_PROMPT].update(response.validated_data)
278        return self.executor.stage_ok()

Callback when the challenge has the correct format