authentik.stages.password.stage

authentik password stage

  1"""authentik password stage"""
  2
  3from typing import Any
  4
  5from django.contrib.auth import _clean_credentials
  6from django.contrib.auth.backends import BaseBackend
  7from django.core.exceptions import PermissionDenied
  8from django.db.models import Sum
  9from django.http import HttpRequest, HttpResponse
 10from django.urls import reverse
 11from django.utils.translation import gettext as _
 12from rest_framework.exceptions import ValidationError
 13from rest_framework.fields import BooleanField, CharField
 14from sentry_sdk import start_span
 15from structlog.stdlib import get_logger
 16
 17from authentik.core.models import User
 18from authentik.core.signals import login_failed
 19from authentik.flows.challenge import (
 20    Challenge,
 21    ChallengeResponse,
 22    WithUserInfoChallenge,
 23)
 24from authentik.flows.exceptions import StageInvalidException
 25from authentik.flows.models import Flow, Stage
 26from authentik.flows.planner import PLAN_CONTEXT_PENDING_USER
 27from authentik.flows.stage import ChallengeStageView
 28from authentik.lib.utils.reflection import path_to_class
 29from authentik.policies.reputation.models import Reputation
 30from authentik.stages.password.models import PasswordStage
 31
 32LOGGER = get_logger()
 33PLAN_CONTEXT_AUTHENTICATION_BACKEND = "user_backend"
 34PLAN_CONTEXT_METHOD = "auth_method"
 35PLAN_CONTEXT_METHOD_ARGS = "auth_method_args"
 36PLAN_CONTEXT_INITIAL_SCORE = "goauthentik.io/stages/password/initial_score"
 37
 38
 39def authenticate(
 40    request: HttpRequest, backends: list[str], stage: Stage | None = None, **credentials: Any
 41) -> User | None:
 42    """If the given credentials are valid, return a User object.
 43
 44    Customized version of django's authenticate, which accepts a list of backends"""
 45    for backend_path in backends:
 46        try:
 47            backend: BaseBackend = path_to_class(backend_path)()
 48        except ImportError:
 49            LOGGER.warning("Failed to import backend", path=backend_path)
 50            continue
 51        LOGGER.debug("Attempting authentication...", backend=backend_path)
 52        with start_span(
 53            op="authentik.stages.password.authenticate",
 54            name=backend_path,
 55        ):
 56            user = backend.authenticate(request, **credentials)
 57        if user is None:
 58            LOGGER.debug("Backend returned nothing, continuing", backend=backend_path)
 59            continue
 60        # Annotate the user object with the path of the backend.
 61        user.backend = backend_path
 62        LOGGER.info("Successful authentication", user=user.username, backend=backend_path)
 63        return user
 64
 65    # The credentials supplied are invalid to all backends, fire signal
 66    login_failed.send(
 67        sender=__name__,
 68        credentials=_clean_credentials(credentials),
 69        request=request,
 70        stage=stage,
 71    )
 72
 73
 74class PasswordChallenge(WithUserInfoChallenge):
 75    """Password challenge UI fields"""
 76
 77    recovery_url = CharField(required=False)
 78
 79    component = CharField(default="ak-stage-password")
 80
 81    allow_show_password = BooleanField(default=False)
 82
 83
 84class PasswordChallengeResponse(ChallengeResponse):
 85    """Password challenge response"""
 86
 87    component = CharField(default="ak-stage-password")
 88
 89    password = CharField(trim_whitespace=False)
 90
 91    def validate_password(self, password: str) -> str | None:
 92        """Validate password and authenticate user"""
 93        executor = self.stage.executor
 94        if PLAN_CONTEXT_PENDING_USER not in executor.plan.context:
 95            raise StageInvalidException("No pending user")
 96        # Get the pending user's username, which is used as
 97        # an Identifier by most authentication backends
 98        pending_user: User = executor.plan.context[PLAN_CONTEXT_PENDING_USER]
 99        auth_kwargs = {
100            "password": password,
101            "username": pending_user.username,
102        }
103        try:
104            with start_span(
105                op="authentik.stages.password.authenticate",
106                name="User authenticate call",
107            ):
108                user = authenticate(
109                    self.stage.request,
110                    executor.current_stage.backends,
111                    executor.current_stage,
112                    **auth_kwargs,
113                )
114        except PermissionDenied as exc:
115            del auth_kwargs["password"]
116            # User was found, but permission was denied (i.e. user is not active)
117            self.stage.logger.debug("Denied access", **auth_kwargs)
118            raise StageInvalidException("Denied access") from exc
119        except ValidationError as exc:
120            del auth_kwargs["password"]
121            # User was found, authentication succeeded, but another signal raised an error
122            # (most likely LDAP)
123            self.stage.logger.debug("Validation error from signal", exc=exc, **auth_kwargs)
124            raise StageInvalidException("Validation error") from exc
125        if not user:
126            # No user was found -> invalid credentials
127            self.stage.logger.info("Invalid credentials")
128            raise ValidationError(_("Invalid password"), "invalid")
129        # User instance returned from authenticate() has .backend property set
130        executor.plan.context[PLAN_CONTEXT_PENDING_USER] = user
131        executor.plan.context[PLAN_CONTEXT_AUTHENTICATION_BACKEND] = user.backend
132        return password
133
134
135class PasswordStageView(ChallengeStageView):
136    """Authentication stage which authenticates against django's AuthBackend"""
137
138    response_class = PasswordChallengeResponse
139
140    def get_challenge(self) -> Challenge:
141        challenge = PasswordChallenge(
142            data={
143                "allow_show_password": self.executor.current_stage.allow_show_password,
144            }
145        )
146        recovery_flow: Flow | None = self.request.brand.flow_recovery
147        if recovery_flow:
148            recover_url = reverse(
149                "authentik_core:if-flow",
150                kwargs={"flow_slug": recovery_flow.slug},
151            )
152            challenge.initial_data["recovery_url"] = self.request.build_absolute_uri(recover_url)
153        if PLAN_CONTEXT_INITIAL_SCORE not in self.executor.plan.context:
154            self.executor.plan.context[PLAN_CONTEXT_INITIAL_SCORE] = self.get_reputation_score()
155        return challenge
156
157    def get_reputation_score(self) -> int:
158        return (
159            Reputation.objects.filter(identifier=self.get_pending_user().username).aggregate(
160                total_score=Sum("score")
161            )["total_score"]
162            or 0
163        )
164
165    def challenge_invalid(self, response: PasswordChallengeResponse) -> HttpResponse:
166        current_stage: PasswordStage = self.executor.current_stage
167        initial_score = self.executor.plan.context.get(PLAN_CONTEXT_INITIAL_SCORE)
168        if initial_score is None:
169            initial_score = self.get_reputation_score()
170            self.executor.plan.context[PLAN_CONTEXT_INITIAL_SCORE] = initial_score
171        new_score = self.get_reputation_score()
172        if (initial_score - new_score) >= current_stage.failed_attempts_before_cancel:
173            self.logger.debug("User has exceeded maximum tries")
174            return self.executor.stage_invalid(_("Invalid password"))
175        return super().challenge_invalid(response)
176
177    def challenge_valid(self, response: PasswordChallengeResponse) -> HttpResponse:
178        """Authenticate against django's authentication backend"""
179        if PLAN_CONTEXT_PENDING_USER not in self.executor.plan.context:
180            return self.executor.stage_invalid()
181        return self.executor.stage_ok()
LOGGER = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
PLAN_CONTEXT_AUTHENTICATION_BACKEND = 'user_backend'
PLAN_CONTEXT_METHOD = 'auth_method'
PLAN_CONTEXT_METHOD_ARGS = 'auth_method_args'
PLAN_CONTEXT_INITIAL_SCORE = 'goauthentik.io/stages/password/initial_score'
def authenticate( request: django.http.request.HttpRequest, backends: list[str], stage: authentik.flows.models.Stage | None = None, **credentials: Any) -> authentik.core.models.User | None:
40def authenticate(
41    request: HttpRequest, backends: list[str], stage: Stage | None = None, **credentials: Any
42) -> User | None:
43    """If the given credentials are valid, return a User object.
44
45    Customized version of django's authenticate, which accepts a list of backends"""
46    for backend_path in backends:
47        try:
48            backend: BaseBackend = path_to_class(backend_path)()
49        except ImportError:
50            LOGGER.warning("Failed to import backend", path=backend_path)
51            continue
52        LOGGER.debug("Attempting authentication...", backend=backend_path)
53        with start_span(
54            op="authentik.stages.password.authenticate",
55            name=backend_path,
56        ):
57            user = backend.authenticate(request, **credentials)
58        if user is None:
59            LOGGER.debug("Backend returned nothing, continuing", backend=backend_path)
60            continue
61        # Annotate the user object with the path of the backend.
62        user.backend = backend_path
63        LOGGER.info("Successful authentication", user=user.username, backend=backend_path)
64        return user
65
66    # The credentials supplied are invalid to all backends, fire signal
67    login_failed.send(
68        sender=__name__,
69        credentials=_clean_credentials(credentials),
70        request=request,
71        stage=stage,
72    )

If the given credentials are valid, return a User object.

Customized version of django's authenticate, which accepts a list of backends

class PasswordChallenge(authentik.flows.challenge.WithUserInfoChallenge):
75class PasswordChallenge(WithUserInfoChallenge):
76    """Password challenge UI fields"""
77
78    recovery_url = CharField(required=False)
79
80    component = CharField(default="ak-stage-password")
81
82    allow_show_password = BooleanField(default=False)

Password challenge UI fields

recovery_url
component
allow_show_password
class PasswordChallengeResponse(authentik.flows.challenge.ChallengeResponse):
 85class PasswordChallengeResponse(ChallengeResponse):
 86    """Password challenge response"""
 87
 88    component = CharField(default="ak-stage-password")
 89
 90    password = CharField(trim_whitespace=False)
 91
 92    def validate_password(self, password: str) -> str | None:
 93        """Validate password and authenticate user"""
 94        executor = self.stage.executor
 95        if PLAN_CONTEXT_PENDING_USER not in executor.plan.context:
 96            raise StageInvalidException("No pending user")
 97        # Get the pending user's username, which is used as
 98        # an Identifier by most authentication backends
 99        pending_user: User = executor.plan.context[PLAN_CONTEXT_PENDING_USER]
100        auth_kwargs = {
101            "password": password,
102            "username": pending_user.username,
103        }
104        try:
105            with start_span(
106                op="authentik.stages.password.authenticate",
107                name="User authenticate call",
108            ):
109                user = authenticate(
110                    self.stage.request,
111                    executor.current_stage.backends,
112                    executor.current_stage,
113                    **auth_kwargs,
114                )
115        except PermissionDenied as exc:
116            del auth_kwargs["password"]
117            # User was found, but permission was denied (i.e. user is not active)
118            self.stage.logger.debug("Denied access", **auth_kwargs)
119            raise StageInvalidException("Denied access") from exc
120        except ValidationError as exc:
121            del auth_kwargs["password"]
122            # User was found, authentication succeeded, but another signal raised an error
123            # (most likely LDAP)
124            self.stage.logger.debug("Validation error from signal", exc=exc, **auth_kwargs)
125            raise StageInvalidException("Validation error") from exc
126        if not user:
127            # No user was found -> invalid credentials
128            self.stage.logger.info("Invalid credentials")
129            raise ValidationError(_("Invalid password"), "invalid")
130        # User instance returned from authenticate() has .backend property set
131        executor.plan.context[PLAN_CONTEXT_PENDING_USER] = user
132        executor.plan.context[PLAN_CONTEXT_AUTHENTICATION_BACKEND] = user.backend
133        return password

Password challenge response

component
password
def validate_password(self, password: str) -> str | None:
 92    def validate_password(self, password: str) -> str | None:
 93        """Validate password and authenticate user"""
 94        executor = self.stage.executor
 95        if PLAN_CONTEXT_PENDING_USER not in executor.plan.context:
 96            raise StageInvalidException("No pending user")
 97        # Get the pending user's username, which is used as
 98        # an Identifier by most authentication backends
 99        pending_user: User = executor.plan.context[PLAN_CONTEXT_PENDING_USER]
100        auth_kwargs = {
101            "password": password,
102            "username": pending_user.username,
103        }
104        try:
105            with start_span(
106                op="authentik.stages.password.authenticate",
107                name="User authenticate call",
108            ):
109                user = authenticate(
110                    self.stage.request,
111                    executor.current_stage.backends,
112                    executor.current_stage,
113                    **auth_kwargs,
114                )
115        except PermissionDenied as exc:
116            del auth_kwargs["password"]
117            # User was found, but permission was denied (i.e. user is not active)
118            self.stage.logger.debug("Denied access", **auth_kwargs)
119            raise StageInvalidException("Denied access") from exc
120        except ValidationError as exc:
121            del auth_kwargs["password"]
122            # User was found, authentication succeeded, but another signal raised an error
123            # (most likely LDAP)
124            self.stage.logger.debug("Validation error from signal", exc=exc, **auth_kwargs)
125            raise StageInvalidException("Validation error") from exc
126        if not user:
127            # No user was found -> invalid credentials
128            self.stage.logger.info("Invalid credentials")
129            raise ValidationError(_("Invalid password"), "invalid")
130        # User instance returned from authenticate() has .backend property set
131        executor.plan.context[PLAN_CONTEXT_PENDING_USER] = user
132        executor.plan.context[PLAN_CONTEXT_AUTHENTICATION_BACKEND] = user.backend
133        return password

Validate password and authenticate user

class PasswordStageView(authentik.flows.stage.ChallengeStageView):
136class PasswordStageView(ChallengeStageView):
137    """Authentication stage which authenticates against django's AuthBackend"""
138
139    response_class = PasswordChallengeResponse
140
141    def get_challenge(self) -> Challenge:
142        challenge = PasswordChallenge(
143            data={
144                "allow_show_password": self.executor.current_stage.allow_show_password,
145            }
146        )
147        recovery_flow: Flow | None = self.request.brand.flow_recovery
148        if recovery_flow:
149            recover_url = reverse(
150                "authentik_core:if-flow",
151                kwargs={"flow_slug": recovery_flow.slug},
152            )
153            challenge.initial_data["recovery_url"] = self.request.build_absolute_uri(recover_url)
154        if PLAN_CONTEXT_INITIAL_SCORE not in self.executor.plan.context:
155            self.executor.plan.context[PLAN_CONTEXT_INITIAL_SCORE] = self.get_reputation_score()
156        return challenge
157
158    def get_reputation_score(self) -> int:
159        return (
160            Reputation.objects.filter(identifier=self.get_pending_user().username).aggregate(
161                total_score=Sum("score")
162            )["total_score"]
163            or 0
164        )
165
166    def challenge_invalid(self, response: PasswordChallengeResponse) -> HttpResponse:
167        current_stage: PasswordStage = self.executor.current_stage
168        initial_score = self.executor.plan.context.get(PLAN_CONTEXT_INITIAL_SCORE)
169        if initial_score is None:
170            initial_score = self.get_reputation_score()
171            self.executor.plan.context[PLAN_CONTEXT_INITIAL_SCORE] = initial_score
172        new_score = self.get_reputation_score()
173        if (initial_score - new_score) >= current_stage.failed_attempts_before_cancel:
174            self.logger.debug("User has exceeded maximum tries")
175            return self.executor.stage_invalid(_("Invalid password"))
176        return super().challenge_invalid(response)
177
178    def challenge_valid(self, response: PasswordChallengeResponse) -> HttpResponse:
179        """Authenticate against django's authentication backend"""
180        if PLAN_CONTEXT_PENDING_USER not in self.executor.plan.context:
181            return self.executor.stage_invalid()
182        return self.executor.stage_ok()

Authentication stage which authenticates against django's AuthBackend

response_class = <class 'PasswordChallengeResponse'>
def get_challenge(self) -> authentik.flows.challenge.Challenge:
141    def get_challenge(self) -> Challenge:
142        challenge = PasswordChallenge(
143            data={
144                "allow_show_password": self.executor.current_stage.allow_show_password,
145            }
146        )
147        recovery_flow: Flow | None = self.request.brand.flow_recovery
148        if recovery_flow:
149            recover_url = reverse(
150                "authentik_core:if-flow",
151                kwargs={"flow_slug": recovery_flow.slug},
152            )
153            challenge.initial_data["recovery_url"] = self.request.build_absolute_uri(recover_url)
154        if PLAN_CONTEXT_INITIAL_SCORE not in self.executor.plan.context:
155            self.executor.plan.context[PLAN_CONTEXT_INITIAL_SCORE] = self.get_reputation_score()
156        return challenge

Return the challenge that the client should solve

def get_reputation_score(self) -> int:
158    def get_reputation_score(self) -> int:
159        return (
160            Reputation.objects.filter(identifier=self.get_pending_user().username).aggregate(
161                total_score=Sum("score")
162            )["total_score"]
163            or 0
164        )
def challenge_invalid( self, response: PasswordChallengeResponse) -> django.http.response.HttpResponse:
166    def challenge_invalid(self, response: PasswordChallengeResponse) -> HttpResponse:
167        current_stage: PasswordStage = self.executor.current_stage
168        initial_score = self.executor.plan.context.get(PLAN_CONTEXT_INITIAL_SCORE)
169        if initial_score is None:
170            initial_score = self.get_reputation_score()
171            self.executor.plan.context[PLAN_CONTEXT_INITIAL_SCORE] = initial_score
172        new_score = self.get_reputation_score()
173        if (initial_score - new_score) >= current_stage.failed_attempts_before_cancel:
174            self.logger.debug("User has exceeded maximum tries")
175            return self.executor.stage_invalid(_("Invalid password"))
176        return super().challenge_invalid(response)

Callback when the challenge has the incorrect format

def challenge_valid( self, response: PasswordChallengeResponse) -> django.http.response.HttpResponse:
178    def challenge_valid(self, response: PasswordChallengeResponse) -> HttpResponse:
179        """Authenticate against django's authentication backend"""
180        if PLAN_CONTEXT_PENDING_USER not in self.executor.plan.context:
181            return self.executor.stage_invalid()
182        return self.executor.stage_ok()

Authenticate against django's authentication backend