authentik.sources.ldap.sync.vendor.freeipa

FreeIPA specific

 1"""FreeIPA specific"""
 2
 3from collections.abc import Generator
 4from datetime import UTC, datetime
 5from typing import Any
 6
 7from authentik.core.models import User
 8from authentik.sources.ldap.models import flatten
 9from authentik.sources.ldap.sync.base import BaseLDAPSynchronizer
10
11
12class FreeIPA(BaseLDAPSynchronizer):
13    """FreeIPA-specific LDAP"""
14
15    @staticmethod
16    def name() -> str:
17        return "freeipa"
18
19    def get_objects(self, **kwargs) -> Generator:
20        yield None
21
22    def sync(self, attributes: dict[str, Any], user: User, created: bool):
23        self.check_pwd_last_set(attributes, user, created)
24        self.check_nsaccountlock(attributes, user)
25
26    def check_pwd_last_set(self, attributes: dict[str, Any], user: User, created: bool):
27        """Check krbLastPwdChange"""
28        if "krbLastPwdChange" not in attributes:
29            return
30        pwd_last_set: datetime = attributes.get("krbLastPwdChange", datetime.now())
31        pwd_last_set = pwd_last_set.replace(tzinfo=UTC)
32        if created or pwd_last_set >= user.password_change_date:
33            self._task.info(f"'{user.username}': Reset user's password")
34            self._logger.debug(
35                "Reset user's password",
36                user=user.username,
37                created=created,
38                pwd_last_set=pwd_last_set,
39            )
40            user.set_unusable_password()
41            user.save()
42
43    def check_nsaccountlock(self, attributes: dict[str, Any], user: User):
44        """https://www.port389.org/docs/389ds/howto/howto-account-inactivation.html"""
45        # This is more of a 389-ds quirk rather than FreeIPA, but FreeIPA uses
46        # 389-ds and this will trigger regardless
47        if "nsaccountlock" not in attributes:
48            return
49        # For some reason, nsaccountlock is not defined properly in the schema as bool
50        # hence we get it as a list of strings
51        _is_locked = str(flatten(attributes.get("nsaccountlock", ["FALSE"])))
52        # So we have to attempt to convert it to a bool
53        is_locked = _is_locked.lower() == "true"
54        # And then invert it since freeipa saves locked and we save active
55        is_active = not is_locked
56        if is_active != user.is_active:
57            user.is_active = is_active
58            user.save()
13class FreeIPA(BaseLDAPSynchronizer):
14    """FreeIPA-specific LDAP"""
15
16    @staticmethod
17    def name() -> str:
18        return "freeipa"
19
20    def get_objects(self, **kwargs) -> Generator:
21        yield None
22
23    def sync(self, attributes: dict[str, Any], user: User, created: bool):
24        self.check_pwd_last_set(attributes, user, created)
25        self.check_nsaccountlock(attributes, user)
26
27    def check_pwd_last_set(self, attributes: dict[str, Any], user: User, created: bool):
28        """Check krbLastPwdChange"""
29        if "krbLastPwdChange" not in attributes:
30            return
31        pwd_last_set: datetime = attributes.get("krbLastPwdChange", datetime.now())
32        pwd_last_set = pwd_last_set.replace(tzinfo=UTC)
33        if created or pwd_last_set >= user.password_change_date:
34            self._task.info(f"'{user.username}': Reset user's password")
35            self._logger.debug(
36                "Reset user's password",
37                user=user.username,
38                created=created,
39                pwd_last_set=pwd_last_set,
40            )
41            user.set_unusable_password()
42            user.save()
43
44    def check_nsaccountlock(self, attributes: dict[str, Any], user: User):
45        """https://www.port389.org/docs/389ds/howto/howto-account-inactivation.html"""
46        # This is more of a 389-ds quirk rather than FreeIPA, but FreeIPA uses
47        # 389-ds and this will trigger regardless
48        if "nsaccountlock" not in attributes:
49            return
50        # For some reason, nsaccountlock is not defined properly in the schema as bool
51        # hence we get it as a list of strings
52        _is_locked = str(flatten(attributes.get("nsaccountlock", ["FALSE"])))
53        # So we have to attempt to convert it to a bool
54        is_locked = _is_locked.lower() == "true"
55        # And then invert it since freeipa saves locked and we save active
56        is_active = not is_locked
57        if is_active != user.is_active:
58            user.is_active = is_active
59            user.save()

FreeIPA-specific LDAP

@staticmethod
def name() -> str:
16    @staticmethod
17    def name() -> str:
18        return "freeipa"

UI name for the type of object this class synchronizes

def get_objects(self, **kwargs) -> Generator:
20    def get_objects(self, **kwargs) -> Generator:
21        yield None

Get objects from LDAP, implemented in subclass

def sync( self, attributes: dict[str, typing.Any], user: authentik.core.models.User, created: bool):
23    def sync(self, attributes: dict[str, Any], user: User, created: bool):
24        self.check_pwd_last_set(attributes, user, created)
25        self.check_nsaccountlock(attributes, user)

Sync function, implemented in subclass

def check_pwd_last_set( self, attributes: dict[str, typing.Any], user: authentik.core.models.User, created: bool):
27    def check_pwd_last_set(self, attributes: dict[str, Any], user: User, created: bool):
28        """Check krbLastPwdChange"""
29        if "krbLastPwdChange" not in attributes:
30            return
31        pwd_last_set: datetime = attributes.get("krbLastPwdChange", datetime.now())
32        pwd_last_set = pwd_last_set.replace(tzinfo=UTC)
33        if created or pwd_last_set >= user.password_change_date:
34            self._task.info(f"'{user.username}': Reset user's password")
35            self._logger.debug(
36                "Reset user's password",
37                user=user.username,
38                created=created,
39                pwd_last_set=pwd_last_set,
40            )
41            user.set_unusable_password()
42            user.save()

Check krbLastPwdChange

def check_nsaccountlock( self, attributes: dict[str, typing.Any], user: authentik.core.models.User):
44    def check_nsaccountlock(self, attributes: dict[str, Any], user: User):
45        """https://www.port389.org/docs/389ds/howto/howto-account-inactivation.html"""
46        # This is more of a 389-ds quirk rather than FreeIPA, but FreeIPA uses
47        # 389-ds and this will trigger regardless
48        if "nsaccountlock" not in attributes:
49            return
50        # For some reason, nsaccountlock is not defined properly in the schema as bool
51        # hence we get it as a list of strings
52        _is_locked = str(flatten(attributes.get("nsaccountlock", ["FALSE"])))
53        # So we have to attempt to convert it to a bool
54        is_locked = _is_locked.lower() == "true"
55        # And then invert it since freeipa saves locked and we save active
56        is_active = not is_locked
57        if is_active != user.is_active:
58            user.is_active = is_active
59            user.save()