authentik.sources.ldap.signals

authentik ldap source signals

 1"""authentik ldap source signals"""
 2
 3from typing import Any
 4
 5from django.dispatch import receiver
 6from django.utils.translation import gettext_lazy as _
 7from ldap3.core.exceptions import LDAPOperationResult
 8from rest_framework.serializers import ValidationError
 9from structlog.stdlib import get_logger
10
11from authentik.core.models import User
12from authentik.core.signals import password_changed
13from authentik.events.models import Event, EventAction
14from authentik.flows.planner import PLAN_CONTEXT_PENDING_USER
15from authentik.sources.ldap.models import LDAPSource
16from authentik.sources.ldap.password import LDAPPasswordChanger
17from authentik.stages.prompt.signals import password_validate
18
19LOGGER = get_logger()
20
21
22@receiver(password_validate)
23def ldap_password_validate(sender, password: str, plan_context: dict[str, Any], **__):
24    """if there's an LDAP Source with enabled password sync, check the password"""
25    sources = LDAPSource.objects.filter(sync_users_password=True, enabled=True)
26    if not sources.exists():
27        return
28    source = sources.first()
29    user = plan_context.get(PLAN_CONTEXT_PENDING_USER, None)
30    if user and not LDAPPasswordChanger.should_check_user(user):
31        return
32    changer = LDAPPasswordChanger(source)
33    if changer.check_ad_password_complexity_enabled():
34        passing = changer.ad_password_complexity(password, user)
35        if not passing:
36            raise ValidationError(_("Password does not match Active Directory Complexity."))
37
38
39@receiver(password_changed)
40def ldap_sync_password(sender, user: User, password: str, **_):
41    """Connect to ldap and update password."""
42    sources = LDAPSource.objects.filter(sync_users_password=True, enabled=True)
43    if not sources.exists():
44        return
45    source = sources.first()
46    if source.pk == getattr(sender, "pk", None):
47        return
48    if not LDAPPasswordChanger.should_check_user(user):
49        return
50    try:
51        changer = LDAPPasswordChanger(source)
52        changer.change_password(user, password)
53    except LDAPOperationResult as exc:
54        LOGGER.warning("failed to set LDAP password", exc=exc)
55        Event.new(
56            EventAction.CONFIGURATION_ERROR,
57            message=(
58                "Failed to change password in LDAP source due to remote error: "
59                f"{exc.result}, {exc.message}, {exc.description}"
60            ),
61            source=source,
62        ).set_user(user).save()
63        raise ValidationError("Failed to set password") from exc
LOGGER = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
@receiver(password_validate)
def ldap_password_validate(sender, password: str, plan_context: dict[str, typing.Any], **__):
23@receiver(password_validate)
24def ldap_password_validate(sender, password: str, plan_context: dict[str, Any], **__):
25    """if there's an LDAP Source with enabled password sync, check the password"""
26    sources = LDAPSource.objects.filter(sync_users_password=True, enabled=True)
27    if not sources.exists():
28        return
29    source = sources.first()
30    user = plan_context.get(PLAN_CONTEXT_PENDING_USER, None)
31    if user and not LDAPPasswordChanger.should_check_user(user):
32        return
33    changer = LDAPPasswordChanger(source)
34    if changer.check_ad_password_complexity_enabled():
35        passing = changer.ad_password_complexity(password, user)
36        if not passing:
37            raise ValidationError(_("Password does not match Active Directory Complexity."))

if there's an LDAP Source with enabled password sync, check the password

@receiver(password_changed)
def ldap_sync_password(sender, user: authentik.core.models.User, password: str, **_):
40@receiver(password_changed)
41def ldap_sync_password(sender, user: User, password: str, **_):
42    """Connect to ldap and update password."""
43    sources = LDAPSource.objects.filter(sync_users_password=True, enabled=True)
44    if not sources.exists():
45        return
46    source = sources.first()
47    if source.pk == getattr(sender, "pk", None):
48        return
49    if not LDAPPasswordChanger.should_check_user(user):
50        return
51    try:
52        changer = LDAPPasswordChanger(source)
53        changer.change_password(user, password)
54    except LDAPOperationResult as exc:
55        LOGGER.warning("failed to set LDAP password", exc=exc)
56        Event.new(
57            EventAction.CONFIGURATION_ERROR,
58            message=(
59                "Failed to change password in LDAP source due to remote error: "
60                f"{exc.result}, {exc.message}, {exc.description}"
61            ),
62            source=source,
63        ).set_user(user).save()
64        raise ValidationError("Failed to set password") from exc

Connect to ldap and update password.