authentik.sources.ldap.signals
authentik ldap source signals
1"""authentik ldap source signals""" 2 3from typing import Any 4 5from django.dispatch import receiver 6from django.utils.translation import gettext_lazy as _ 7from ldap3.core.exceptions import LDAPOperationResult 8from rest_framework.serializers import ValidationError 9from structlog.stdlib import get_logger 10 11from authentik.core.models import User 12from authentik.core.signals import password_changed 13from authentik.events.models import Event, EventAction 14from authentik.flows.planner import PLAN_CONTEXT_PENDING_USER 15from authentik.sources.ldap.models import LDAPSource 16from authentik.sources.ldap.password import LDAPPasswordChanger 17from authentik.stages.prompt.signals import password_validate 18 19LOGGER = get_logger() 20 21 22@receiver(password_validate) 23def ldap_password_validate(sender, password: str, plan_context: dict[str, Any], **__): 24 """if there's an LDAP Source with enabled password sync, check the password""" 25 sources = LDAPSource.objects.filter(sync_users_password=True, enabled=True) 26 if not sources.exists(): 27 return 28 source = sources.first() 29 user = plan_context.get(PLAN_CONTEXT_PENDING_USER, None) 30 if user and not LDAPPasswordChanger.should_check_user(user): 31 return 32 changer = LDAPPasswordChanger(source) 33 if changer.check_ad_password_complexity_enabled(): 34 passing = changer.ad_password_complexity(password, user) 35 if not passing: 36 raise ValidationError(_("Password does not match Active Directory Complexity.")) 37 38 39@receiver(password_changed) 40def ldap_sync_password(sender, user: User, password: str, **_): 41 """Connect to ldap and update password.""" 42 sources = LDAPSource.objects.filter(sync_users_password=True, enabled=True) 43 if not sources.exists(): 44 return 45 source = sources.first() 46 if source.pk == getattr(sender, "pk", None): 47 return 48 if not LDAPPasswordChanger.should_check_user(user): 49 return 50 try: 51 changer = LDAPPasswordChanger(source) 52 changer.change_password(user, password) 53 except LDAPOperationResult as exc: 54 LOGGER.warning("failed to set LDAP password", exc=exc) 55 Event.new( 56 EventAction.CONFIGURATION_ERROR, 57 message=( 58 "Failed to change password in LDAP source due to remote error: " 59 f"{exc.result}, {exc.message}, {exc.description}" 60 ), 61 source=source, 62 ).set_user(user).save() 63 raise ValidationError("Failed to set password") from exc
LOGGER =
<BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
@receiver(password_validate)
def
ldap_password_validate(sender, password: str, plan_context: dict[str, typing.Any], **__):
23@receiver(password_validate) 24def ldap_password_validate(sender, password: str, plan_context: dict[str, Any], **__): 25 """if there's an LDAP Source with enabled password sync, check the password""" 26 sources = LDAPSource.objects.filter(sync_users_password=True, enabled=True) 27 if not sources.exists(): 28 return 29 source = sources.first() 30 user = plan_context.get(PLAN_CONTEXT_PENDING_USER, None) 31 if user and not LDAPPasswordChanger.should_check_user(user): 32 return 33 changer = LDAPPasswordChanger(source) 34 if changer.check_ad_password_complexity_enabled(): 35 passing = changer.ad_password_complexity(password, user) 36 if not passing: 37 raise ValidationError(_("Password does not match Active Directory Complexity."))
if there's an LDAP Source with enabled password sync, check the password
@receiver(password_changed)
def
ldap_sync_password(sender, user: authentik.core.models.User, password: str, **_):
40@receiver(password_changed) 41def ldap_sync_password(sender, user: User, password: str, **_): 42 """Connect to ldap and update password.""" 43 sources = LDAPSource.objects.filter(sync_users_password=True, enabled=True) 44 if not sources.exists(): 45 return 46 source = sources.first() 47 if source.pk == getattr(sender, "pk", None): 48 return 49 if not LDAPPasswordChanger.should_check_user(user): 50 return 51 try: 52 changer = LDAPPasswordChanger(source) 53 changer.change_password(user, password) 54 except LDAPOperationResult as exc: 55 LOGGER.warning("failed to set LDAP password", exc=exc) 56 Event.new( 57 EventAction.CONFIGURATION_ERROR, 58 message=( 59 "Failed to change password in LDAP source due to remote error: " 60 f"{exc.result}, {exc.message}, {exc.description}" 61 ), 62 source=source, 63 ).set_user(user).save() 64 raise ValidationError("Failed to set password") from exc
Connect to ldap and update password.