authentik.sources.kerberos.signals

authentik kerberos source signals

 1"""authentik kerberos source signals"""
 2
 3from django.dispatch import receiver
 4from kadmin import exceptions as kadmin_exceptions
 5from rest_framework.serializers import ValidationError
 6from structlog.stdlib import get_logger
 7
 8from authentik.core.models import User
 9from authentik.core.signals import password_changed
10from authentik.events.models import Event, EventAction
11from authentik.sources.kerberos.models import (
12    Krb5ConfContext,
13    UserKerberosSourceConnection,
14)
15
16LOGGER = get_logger()
17
18
19@receiver(password_changed)
20def kerberos_sync_password(sender, user: User, password: str, **_):
21    """Connect to kerberos and update password."""
22    user_source_connections = UserKerberosSourceConnection.objects.select_related(
23        "source__kerberossource"
24    ).filter(
25        user=user,
26        source__enabled=True,
27        source__kerberossource__sync_users=True,
28        source__kerberossource__sync_users_password=True,
29    )
30    for user_source_connection in user_source_connections:
31        source = user_source_connection.source.kerberossource
32        if source.pk == getattr(sender, "pk", None):
33            continue
34        with Krb5ConfContext(source):
35            try:
36                kadm = source.connection()
37                kadm.get_principal(user_source_connection.identifier).change_password(
38                    kadm,
39                    password,
40                )
41            except kadmin_exceptions.PyKAdminException as exc:
42                LOGGER.warning("failed to set Kerberos password", exc=exc, source=source)
43                Event.new(
44                    EventAction.CONFIGURATION_ERROR,
45                    message=(
46                        f"Failed to change password in Kerberos source due to remote error: {exc}"
47                    ),
48                    source=source,
49                ).set_user(user).save()
50                raise ValidationError("Failed to set password") from exc
LOGGER = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
@receiver(password_changed)
def kerberos_sync_password(sender, user: authentik.core.models.User, password: str, **_):
20@receiver(password_changed)
21def kerberos_sync_password(sender, user: User, password: str, **_):
22    """Connect to kerberos and update password."""
23    user_source_connections = UserKerberosSourceConnection.objects.select_related(
24        "source__kerberossource"
25    ).filter(
26        user=user,
27        source__enabled=True,
28        source__kerberossource__sync_users=True,
29        source__kerberossource__sync_users_password=True,
30    )
31    for user_source_connection in user_source_connections:
32        source = user_source_connection.source.kerberossource
33        if source.pk == getattr(sender, "pk", None):
34            continue
35        with Krb5ConfContext(source):
36            try:
37                kadm = source.connection()
38                kadm.get_principal(user_source_connection.identifier).change_password(
39                    kadm,
40                    password,
41                )
42            except kadmin_exceptions.PyKAdminException as exc:
43                LOGGER.warning("failed to set Kerberos password", exc=exc, source=source)
44                Event.new(
45                    EventAction.CONFIGURATION_ERROR,
46                    message=(
47                        f"Failed to change password in Kerberos source due to remote error: {exc}"
48                    ),
49                    source=source,
50                ).set_user(user).save()
51                raise ValidationError("Failed to set password") from exc

Connect to kerberos and update password.