authentik.stages.password.stage
authentik password stage
1"""authentik password stage""" 2 3from typing import Any 4 5from django.contrib.auth import _clean_credentials 6from django.contrib.auth.backends import BaseBackend 7from django.core.exceptions import PermissionDenied 8from django.db.models import Sum 9from django.http import HttpRequest, HttpResponse 10from django.urls import reverse 11from django.utils.translation import gettext as _ 12from rest_framework.exceptions import ValidationError 13from rest_framework.fields import BooleanField, CharField 14from sentry_sdk import start_span 15from structlog.stdlib import get_logger 16 17from authentik.core.models import User 18from authentik.core.signals import login_failed 19from authentik.flows.challenge import ( 20 Challenge, 21 ChallengeResponse, 22 WithUserInfoChallenge, 23) 24from authentik.flows.exceptions import StageInvalidException 25from authentik.flows.models import Flow, Stage 26from authentik.flows.planner import PLAN_CONTEXT_PENDING_USER 27from authentik.flows.stage import ChallengeStageView 28from authentik.lib.utils.reflection import path_to_class 29from authentik.policies.reputation.models import Reputation 30from authentik.stages.password.models import PasswordStage 31 32LOGGER = get_logger() 33PLAN_CONTEXT_AUTHENTICATION_BACKEND = "user_backend" 34PLAN_CONTEXT_METHOD = "auth_method" 35PLAN_CONTEXT_METHOD_ARGS = "auth_method_args" 36PLAN_CONTEXT_INITIAL_SCORE = "goauthentik.io/stages/password/initial_score" 37 38 39def authenticate( 40 request: HttpRequest, backends: list[str], stage: Stage | None = None, **credentials: Any 41) -> User | None: 42 """If the given credentials are valid, return a User object. 43 44 Customized version of django's authenticate, which accepts a list of backends""" 45 for backend_path in backends: 46 try: 47 backend: BaseBackend = path_to_class(backend_path)() 48 except ImportError: 49 LOGGER.warning("Failed to import backend", path=backend_path) 50 continue 51 LOGGER.debug("Attempting authentication...", backend=backend_path) 52 with start_span( 53 op="authentik.stages.password.authenticate", 54 name=backend_path, 55 ): 56 user = backend.authenticate(request, **credentials) 57 if user is None: 58 LOGGER.debug("Backend returned nothing, continuing", backend=backend_path) 59 continue 60 # Annotate the user object with the path of the backend. 61 user.backend = backend_path 62 LOGGER.info("Successful authentication", user=user.username, backend=backend_path) 63 return user 64 65 # The credentials supplied are invalid to all backends, fire signal 66 login_failed.send( 67 sender=__name__, 68 credentials=_clean_credentials(credentials), 69 request=request, 70 stage=stage, 71 ) 72 73 74class PasswordChallenge(WithUserInfoChallenge): 75 """Password challenge UI fields""" 76 77 recovery_url = CharField(required=False) 78 79 component = CharField(default="ak-stage-password") 80 81 allow_show_password = BooleanField(default=False) 82 83 84class PasswordChallengeResponse(ChallengeResponse): 85 """Password challenge response""" 86 87 component = CharField(default="ak-stage-password") 88 89 password = CharField(trim_whitespace=False) 90 91 def validate_password(self, password: str) -> str | None: 92 """Validate password and authenticate user""" 93 executor = self.stage.executor 94 if PLAN_CONTEXT_PENDING_USER not in executor.plan.context: 95 raise StageInvalidException("No pending user") 96 # Get the pending user's username, which is used as 97 # an Identifier by most authentication backends 98 pending_user: User = executor.plan.context[PLAN_CONTEXT_PENDING_USER] 99 auth_kwargs = { 100 "password": password, 101 "username": pending_user.username, 102 } 103 try: 104 with start_span( 105 op="authentik.stages.password.authenticate", 106 name="User authenticate call", 107 ): 108 user = authenticate( 109 self.stage.request, 110 executor.current_stage.backends, 111 executor.current_stage, 112 **auth_kwargs, 113 ) 114 except PermissionDenied as exc: 115 del auth_kwargs["password"] 116 # User was found, but permission was denied (i.e. user is not active) 117 self.stage.logger.debug("Denied access", **auth_kwargs) 118 raise StageInvalidException("Denied access") from exc 119 except ValidationError as exc: 120 del auth_kwargs["password"] 121 # User was found, authentication succeeded, but another signal raised an error 122 # (most likely LDAP) 123 self.stage.logger.debug("Validation error from signal", exc=exc, **auth_kwargs) 124 raise StageInvalidException("Validation error") from exc 125 if not user: 126 # No user was found -> invalid credentials 127 self.stage.logger.info("Invalid credentials") 128 raise ValidationError(_("Invalid password"), "invalid") 129 # User instance returned from authenticate() has .backend property set 130 executor.plan.context[PLAN_CONTEXT_PENDING_USER] = user 131 executor.plan.context[PLAN_CONTEXT_AUTHENTICATION_BACKEND] = user.backend 132 return password 133 134 135class PasswordStageView(ChallengeStageView): 136 """Authentication stage which authenticates against django's AuthBackend""" 137 138 response_class = PasswordChallengeResponse 139 140 def get_challenge(self) -> Challenge: 141 challenge = PasswordChallenge( 142 data={ 143 "allow_show_password": self.executor.current_stage.allow_show_password, 144 } 145 ) 146 recovery_flow: Flow | None = self.request.brand.flow_recovery 147 if recovery_flow: 148 recover_url = reverse( 149 "authentik_core:if-flow", 150 kwargs={"flow_slug": recovery_flow.slug}, 151 ) 152 challenge.initial_data["recovery_url"] = self.request.build_absolute_uri(recover_url) 153 if PLAN_CONTEXT_INITIAL_SCORE not in self.executor.plan.context: 154 self.executor.plan.context[PLAN_CONTEXT_INITIAL_SCORE] = self.get_reputation_score() 155 return challenge 156 157 def get_reputation_score(self) -> int: 158 return ( 159 Reputation.objects.filter(identifier=self.get_pending_user().username).aggregate( 160 total_score=Sum("score") 161 )["total_score"] 162 or 0 163 ) 164 165 def challenge_invalid(self, response: PasswordChallengeResponse) -> HttpResponse: 166 current_stage: PasswordStage = self.executor.current_stage 167 initial_score = self.executor.plan.context.get(PLAN_CONTEXT_INITIAL_SCORE) 168 if initial_score is None: 169 initial_score = self.get_reputation_score() 170 self.executor.plan.context[PLAN_CONTEXT_INITIAL_SCORE] = initial_score 171 new_score = self.get_reputation_score() 172 if (initial_score - new_score) >= current_stage.failed_attempts_before_cancel: 173 self.logger.debug("User has exceeded maximum tries") 174 return self.executor.stage_invalid(_("Invalid password")) 175 return super().challenge_invalid(response) 176 177 def challenge_valid(self, response: PasswordChallengeResponse) -> HttpResponse: 178 """Authenticate against django's authentication backend""" 179 if PLAN_CONTEXT_PENDING_USER not in self.executor.plan.context: 180 return self.executor.stage_invalid() 181 return self.executor.stage_ok()
LOGGER =
<BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
PLAN_CONTEXT_AUTHENTICATION_BACKEND =
'user_backend'
PLAN_CONTEXT_METHOD =
'auth_method'
PLAN_CONTEXT_METHOD_ARGS =
'auth_method_args'
PLAN_CONTEXT_INITIAL_SCORE =
'goauthentik.io/stages/password/initial_score'
def
authenticate( request: django.http.request.HttpRequest, backends: list[str], stage: authentik.flows.models.Stage | None = None, **credentials: Any) -> authentik.core.models.User | None:
40def authenticate( 41 request: HttpRequest, backends: list[str], stage: Stage | None = None, **credentials: Any 42) -> User | None: 43 """If the given credentials are valid, return a User object. 44 45 Customized version of django's authenticate, which accepts a list of backends""" 46 for backend_path in backends: 47 try: 48 backend: BaseBackend = path_to_class(backend_path)() 49 except ImportError: 50 LOGGER.warning("Failed to import backend", path=backend_path) 51 continue 52 LOGGER.debug("Attempting authentication...", backend=backend_path) 53 with start_span( 54 op="authentik.stages.password.authenticate", 55 name=backend_path, 56 ): 57 user = backend.authenticate(request, **credentials) 58 if user is None: 59 LOGGER.debug("Backend returned nothing, continuing", backend=backend_path) 60 continue 61 # Annotate the user object with the path of the backend. 62 user.backend = backend_path 63 LOGGER.info("Successful authentication", user=user.username, backend=backend_path) 64 return user 65 66 # The credentials supplied are invalid to all backends, fire signal 67 login_failed.send( 68 sender=__name__, 69 credentials=_clean_credentials(credentials), 70 request=request, 71 stage=stage, 72 )
If the given credentials are valid, return a User object.
Customized version of django's authenticate, which accepts a list of backends
75class PasswordChallenge(WithUserInfoChallenge): 76 """Password challenge UI fields""" 77 78 recovery_url = CharField(required=False) 79 80 component = CharField(default="ak-stage-password") 81 82 allow_show_password = BooleanField(default=False)
Password challenge UI fields
85class PasswordChallengeResponse(ChallengeResponse): 86 """Password challenge response""" 87 88 component = CharField(default="ak-stage-password") 89 90 password = CharField(trim_whitespace=False) 91 92 def validate_password(self, password: str) -> str | None: 93 """Validate password and authenticate user""" 94 executor = self.stage.executor 95 if PLAN_CONTEXT_PENDING_USER not in executor.plan.context: 96 raise StageInvalidException("No pending user") 97 # Get the pending user's username, which is used as 98 # an Identifier by most authentication backends 99 pending_user: User = executor.plan.context[PLAN_CONTEXT_PENDING_USER] 100 auth_kwargs = { 101 "password": password, 102 "username": pending_user.username, 103 } 104 try: 105 with start_span( 106 op="authentik.stages.password.authenticate", 107 name="User authenticate call", 108 ): 109 user = authenticate( 110 self.stage.request, 111 executor.current_stage.backends, 112 executor.current_stage, 113 **auth_kwargs, 114 ) 115 except PermissionDenied as exc: 116 del auth_kwargs["password"] 117 # User was found, but permission was denied (i.e. user is not active) 118 self.stage.logger.debug("Denied access", **auth_kwargs) 119 raise StageInvalidException("Denied access") from exc 120 except ValidationError as exc: 121 del auth_kwargs["password"] 122 # User was found, authentication succeeded, but another signal raised an error 123 # (most likely LDAP) 124 self.stage.logger.debug("Validation error from signal", exc=exc, **auth_kwargs) 125 raise StageInvalidException("Validation error") from exc 126 if not user: 127 # No user was found -> invalid credentials 128 self.stage.logger.info("Invalid credentials") 129 raise ValidationError(_("Invalid password"), "invalid") 130 # User instance returned from authenticate() has .backend property set 131 executor.plan.context[PLAN_CONTEXT_PENDING_USER] = user 132 executor.plan.context[PLAN_CONTEXT_AUTHENTICATION_BACKEND] = user.backend 133 return password
Password challenge response
def
validate_password(self, password: str) -> str | None:
92 def validate_password(self, password: str) -> str | None: 93 """Validate password and authenticate user""" 94 executor = self.stage.executor 95 if PLAN_CONTEXT_PENDING_USER not in executor.plan.context: 96 raise StageInvalidException("No pending user") 97 # Get the pending user's username, which is used as 98 # an Identifier by most authentication backends 99 pending_user: User = executor.plan.context[PLAN_CONTEXT_PENDING_USER] 100 auth_kwargs = { 101 "password": password, 102 "username": pending_user.username, 103 } 104 try: 105 with start_span( 106 op="authentik.stages.password.authenticate", 107 name="User authenticate call", 108 ): 109 user = authenticate( 110 self.stage.request, 111 executor.current_stage.backends, 112 executor.current_stage, 113 **auth_kwargs, 114 ) 115 except PermissionDenied as exc: 116 del auth_kwargs["password"] 117 # User was found, but permission was denied (i.e. user is not active) 118 self.stage.logger.debug("Denied access", **auth_kwargs) 119 raise StageInvalidException("Denied access") from exc 120 except ValidationError as exc: 121 del auth_kwargs["password"] 122 # User was found, authentication succeeded, but another signal raised an error 123 # (most likely LDAP) 124 self.stage.logger.debug("Validation error from signal", exc=exc, **auth_kwargs) 125 raise StageInvalidException("Validation error") from exc 126 if not user: 127 # No user was found -> invalid credentials 128 self.stage.logger.info("Invalid credentials") 129 raise ValidationError(_("Invalid password"), "invalid") 130 # User instance returned from authenticate() has .backend property set 131 executor.plan.context[PLAN_CONTEXT_PENDING_USER] = user 132 executor.plan.context[PLAN_CONTEXT_AUTHENTICATION_BACKEND] = user.backend 133 return password
Validate password and authenticate user
136class PasswordStageView(ChallengeStageView): 137 """Authentication stage which authenticates against django's AuthBackend""" 138 139 response_class = PasswordChallengeResponse 140 141 def get_challenge(self) -> Challenge: 142 challenge = PasswordChallenge( 143 data={ 144 "allow_show_password": self.executor.current_stage.allow_show_password, 145 } 146 ) 147 recovery_flow: Flow | None = self.request.brand.flow_recovery 148 if recovery_flow: 149 recover_url = reverse( 150 "authentik_core:if-flow", 151 kwargs={"flow_slug": recovery_flow.slug}, 152 ) 153 challenge.initial_data["recovery_url"] = self.request.build_absolute_uri(recover_url) 154 if PLAN_CONTEXT_INITIAL_SCORE not in self.executor.plan.context: 155 self.executor.plan.context[PLAN_CONTEXT_INITIAL_SCORE] = self.get_reputation_score() 156 return challenge 157 158 def get_reputation_score(self) -> int: 159 return ( 160 Reputation.objects.filter(identifier=self.get_pending_user().username).aggregate( 161 total_score=Sum("score") 162 )["total_score"] 163 or 0 164 ) 165 166 def challenge_invalid(self, response: PasswordChallengeResponse) -> HttpResponse: 167 current_stage: PasswordStage = self.executor.current_stage 168 initial_score = self.executor.plan.context.get(PLAN_CONTEXT_INITIAL_SCORE) 169 if initial_score is None: 170 initial_score = self.get_reputation_score() 171 self.executor.plan.context[PLAN_CONTEXT_INITIAL_SCORE] = initial_score 172 new_score = self.get_reputation_score() 173 if (initial_score - new_score) >= current_stage.failed_attempts_before_cancel: 174 self.logger.debug("User has exceeded maximum tries") 175 return self.executor.stage_invalid(_("Invalid password")) 176 return super().challenge_invalid(response) 177 178 def challenge_valid(self, response: PasswordChallengeResponse) -> HttpResponse: 179 """Authenticate against django's authentication backend""" 180 if PLAN_CONTEXT_PENDING_USER not in self.executor.plan.context: 181 return self.executor.stage_invalid() 182 return self.executor.stage_ok()
Authentication stage which authenticates against django's AuthBackend
response_class =
<class 'PasswordChallengeResponse'>
141 def get_challenge(self) -> Challenge: 142 challenge = PasswordChallenge( 143 data={ 144 "allow_show_password": self.executor.current_stage.allow_show_password, 145 } 146 ) 147 recovery_flow: Flow | None = self.request.brand.flow_recovery 148 if recovery_flow: 149 recover_url = reverse( 150 "authentik_core:if-flow", 151 kwargs={"flow_slug": recovery_flow.slug}, 152 ) 153 challenge.initial_data["recovery_url"] = self.request.build_absolute_uri(recover_url) 154 if PLAN_CONTEXT_INITIAL_SCORE not in self.executor.plan.context: 155 self.executor.plan.context[PLAN_CONTEXT_INITIAL_SCORE] = self.get_reputation_score() 156 return challenge
Return the challenge that the client should solve
def
challenge_invalid( self, response: PasswordChallengeResponse) -> django.http.response.HttpResponse:
166 def challenge_invalid(self, response: PasswordChallengeResponse) -> HttpResponse: 167 current_stage: PasswordStage = self.executor.current_stage 168 initial_score = self.executor.plan.context.get(PLAN_CONTEXT_INITIAL_SCORE) 169 if initial_score is None: 170 initial_score = self.get_reputation_score() 171 self.executor.plan.context[PLAN_CONTEXT_INITIAL_SCORE] = initial_score 172 new_score = self.get_reputation_score() 173 if (initial_score - new_score) >= current_stage.failed_attempts_before_cancel: 174 self.logger.debug("User has exceeded maximum tries") 175 return self.executor.stage_invalid(_("Invalid password")) 176 return super().challenge_invalid(response)
Callback when the challenge has the incorrect format
def
challenge_valid( self, response: PasswordChallengeResponse) -> django.http.response.HttpResponse:
178 def challenge_valid(self, response: PasswordChallengeResponse) -> HttpResponse: 179 """Authenticate against django's authentication backend""" 180 if PLAN_CONTEXT_PENDING_USER not in self.executor.plan.context: 181 return self.executor.stage_invalid() 182 return self.executor.stage_ok()
Authenticate against django's authentication backend