authentik.sources.ldap.sync.vendor.ms_ad

Active Directory specific

 1"""Active Directory specific"""
 2
 3from collections.abc import Generator
 4from datetime import UTC, datetime
 5from enum import IntFlag
 6from typing import Any
 7
 8from authentik.core.models import User
 9from authentik.sources.ldap.sync.base import BaseLDAPSynchronizer
10
11
12class UserAccountControl(IntFlag):
13    """UserAccountControl attribute for Active directory users"""
14
15    # https://docs.microsoft.com/en-us/troubleshoot/windows-server/identity
16    #   /useraccountcontrol-manipulate-account-properties
17
18    SCRIPT = 1
19    ACCOUNTDISABLE = 2
20    HOMEDIR_REQUIRED = 8
21    LOCKOUT = 16
22    PASSWD_NOTREQD = 32
23    PASSWD_CANT_CHANGE = 64
24    ENCRYPTED_TEXT_PWD_ALLOWED = 128
25    TEMP_DUPLICATE_ACCOUNT = 256
26    NORMAL_ACCOUNT = 512
27    INTERDOMAIN_TRUST_ACCOUNT = 2048
28    WORKSTATION_TRUST_ACCOUNT = 4096
29    SERVER_TRUST_ACCOUNT = 8192
30    DONT_EXPIRE_PASSWORD = 65536
31    MNS_LOGON_ACCOUNT = 131072
32    SMARTCARD_REQUIRED = 262144
33    TRUSTED_FOR_DELEGATION = 524288
34    NOT_DELEGATED = 1048576
35    USE_DES_KEY_ONLY = 2097152
36    DONT_REQ_PREAUTH = 4194304
37    PASSWORD_EXPIRED = 8388608
38    TRUSTED_TO_AUTH_FOR_DELEGATION = 16777216
39    PARTIAL_SECRETS_ACCOUNT = 67108864
40
41
42class MicrosoftActiveDirectory(BaseLDAPSynchronizer):
43    """Microsoft-specific LDAP"""
44
45    @staticmethod
46    def name() -> str:
47        return "microsoft_ad"
48
49    def get_objects(self, **kwargs) -> Generator:
50        yield None
51
52    def sync(self, attributes: dict[str, Any], user: User, created: bool):
53        self.ms_check_pwd_last_set(attributes, user, created)
54        self.ms_check_uac(attributes, user)
55
56    def ms_check_pwd_last_set(self, attributes: dict[str, Any], user: User, created: bool):
57        """Check pwdLastSet"""
58        if "pwdLastSet" not in attributes:
59            return
60        pwd_last_set: datetime = attributes.get("pwdLastSet", datetime.now())
61        pwd_last_set = pwd_last_set.replace(tzinfo=UTC)
62        if created or pwd_last_set >= user.password_change_date:
63            self._task.info(f"'{user.username}': Reset user's password")
64            self._logger.debug(
65                "Reset user's password",
66                user=user.username,
67                created=created,
68                pwd_last_set=pwd_last_set,
69            )
70            user.set_unusable_password()
71            user.save()
72
73    def ms_check_uac(self, attributes: dict[str, Any], user: User):
74        """Check userAccountControl"""
75        if "userAccountControl" not in attributes:
76            return
77        # Default from https://docs.microsoft.com/en-us/troubleshoot/windows-server/identity
78        #   /useraccountcontrol-manipulate-account-properties
79        uac_bit = attributes.get("userAccountControl", 512)
80        uac = UserAccountControl(uac_bit)
81        is_active = (
82            UserAccountControl.ACCOUNTDISABLE not in uac and UserAccountControl.LOCKOUT not in uac
83        )
84        if is_active != user.is_active:
85            user.is_active = is_active
86            user.save()
class UserAccountControl(enum.IntFlag):
13class UserAccountControl(IntFlag):
14    """UserAccountControl attribute for Active directory users"""
15
16    # https://docs.microsoft.com/en-us/troubleshoot/windows-server/identity
17    #   /useraccountcontrol-manipulate-account-properties
18
19    SCRIPT = 1
20    ACCOUNTDISABLE = 2
21    HOMEDIR_REQUIRED = 8
22    LOCKOUT = 16
23    PASSWD_NOTREQD = 32
24    PASSWD_CANT_CHANGE = 64
25    ENCRYPTED_TEXT_PWD_ALLOWED = 128
26    TEMP_DUPLICATE_ACCOUNT = 256
27    NORMAL_ACCOUNT = 512
28    INTERDOMAIN_TRUST_ACCOUNT = 2048
29    WORKSTATION_TRUST_ACCOUNT = 4096
30    SERVER_TRUST_ACCOUNT = 8192
31    DONT_EXPIRE_PASSWORD = 65536
32    MNS_LOGON_ACCOUNT = 131072
33    SMARTCARD_REQUIRED = 262144
34    TRUSTED_FOR_DELEGATION = 524288
35    NOT_DELEGATED = 1048576
36    USE_DES_KEY_ONLY = 2097152
37    DONT_REQ_PREAUTH = 4194304
38    PASSWORD_EXPIRED = 8388608
39    TRUSTED_TO_AUTH_FOR_DELEGATION = 16777216
40    PARTIAL_SECRETS_ACCOUNT = 67108864

UserAccountControl attribute for Active directory users

ACCOUNTDISABLE = <UserAccountControl.ACCOUNTDISABLE: 2>
HOMEDIR_REQUIRED = <UserAccountControl.HOMEDIR_REQUIRED: 8>
LOCKOUT = <UserAccountControl.LOCKOUT: 16>
PASSWD_NOTREQD = <UserAccountControl.PASSWD_NOTREQD: 32>
PASSWD_CANT_CHANGE = <UserAccountControl.PASSWD_CANT_CHANGE: 64>
ENCRYPTED_TEXT_PWD_ALLOWED = <UserAccountControl.ENCRYPTED_TEXT_PWD_ALLOWED: 128>
TEMP_DUPLICATE_ACCOUNT = <UserAccountControl.TEMP_DUPLICATE_ACCOUNT: 256>
NORMAL_ACCOUNT = <UserAccountControl.NORMAL_ACCOUNT: 512>
INTERDOMAIN_TRUST_ACCOUNT = <UserAccountControl.INTERDOMAIN_TRUST_ACCOUNT: 2048>
WORKSTATION_TRUST_ACCOUNT = <UserAccountControl.WORKSTATION_TRUST_ACCOUNT: 4096>
SERVER_TRUST_ACCOUNT = <UserAccountControl.SERVER_TRUST_ACCOUNT: 8192>
DONT_EXPIRE_PASSWORD = <UserAccountControl.DONT_EXPIRE_PASSWORD: 65536>
MNS_LOGON_ACCOUNT = <UserAccountControl.MNS_LOGON_ACCOUNT: 131072>
SMARTCARD_REQUIRED = <UserAccountControl.SMARTCARD_REQUIRED: 262144>
TRUSTED_FOR_DELEGATION = <UserAccountControl.TRUSTED_FOR_DELEGATION: 524288>
NOT_DELEGATED = <UserAccountControl.NOT_DELEGATED: 1048576>
USE_DES_KEY_ONLY = <UserAccountControl.USE_DES_KEY_ONLY: 2097152>
DONT_REQ_PREAUTH = <UserAccountControl.DONT_REQ_PREAUTH: 4194304>
PASSWORD_EXPIRED = <UserAccountControl.PASSWORD_EXPIRED: 8388608>
TRUSTED_TO_AUTH_FOR_DELEGATION = <UserAccountControl.TRUSTED_TO_AUTH_FOR_DELEGATION: 16777216>
PARTIAL_SECRETS_ACCOUNT = <UserAccountControl.PARTIAL_SECRETS_ACCOUNT: 67108864>
class MicrosoftActiveDirectory(authentik.sources.ldap.sync.base.BaseLDAPSynchronizer):
43class MicrosoftActiveDirectory(BaseLDAPSynchronizer):
44    """Microsoft-specific LDAP"""
45
46    @staticmethod
47    def name() -> str:
48        return "microsoft_ad"
49
50    def get_objects(self, **kwargs) -> Generator:
51        yield None
52
53    def sync(self, attributes: dict[str, Any], user: User, created: bool):
54        self.ms_check_pwd_last_set(attributes, user, created)
55        self.ms_check_uac(attributes, user)
56
57    def ms_check_pwd_last_set(self, attributes: dict[str, Any], user: User, created: bool):
58        """Check pwdLastSet"""
59        if "pwdLastSet" not in attributes:
60            return
61        pwd_last_set: datetime = attributes.get("pwdLastSet", datetime.now())
62        pwd_last_set = pwd_last_set.replace(tzinfo=UTC)
63        if created or pwd_last_set >= user.password_change_date:
64            self._task.info(f"'{user.username}': Reset user's password")
65            self._logger.debug(
66                "Reset user's password",
67                user=user.username,
68                created=created,
69                pwd_last_set=pwd_last_set,
70            )
71            user.set_unusable_password()
72            user.save()
73
74    def ms_check_uac(self, attributes: dict[str, Any], user: User):
75        """Check userAccountControl"""
76        if "userAccountControl" not in attributes:
77            return
78        # Default from https://docs.microsoft.com/en-us/troubleshoot/windows-server/identity
79        #   /useraccountcontrol-manipulate-account-properties
80        uac_bit = attributes.get("userAccountControl", 512)
81        uac = UserAccountControl(uac_bit)
82        is_active = (
83            UserAccountControl.ACCOUNTDISABLE not in uac and UserAccountControl.LOCKOUT not in uac
84        )
85        if is_active != user.is_active:
86            user.is_active = is_active
87            user.save()

Microsoft-specific LDAP

@staticmethod
def name() -> str:
46    @staticmethod
47    def name() -> str:
48        return "microsoft_ad"

UI name for the type of object this class synchronizes

def get_objects(self, **kwargs) -> Generator:
50    def get_objects(self, **kwargs) -> Generator:
51        yield None

Get objects from LDAP, implemented in subclass

def sync( self, attributes: dict[str, typing.Any], user: authentik.core.models.User, created: bool):
53    def sync(self, attributes: dict[str, Any], user: User, created: bool):
54        self.ms_check_pwd_last_set(attributes, user, created)
55        self.ms_check_uac(attributes, user)

Sync function, implemented in subclass

def ms_check_pwd_last_set( self, attributes: dict[str, typing.Any], user: authentik.core.models.User, created: bool):
57    def ms_check_pwd_last_set(self, attributes: dict[str, Any], user: User, created: bool):
58        """Check pwdLastSet"""
59        if "pwdLastSet" not in attributes:
60            return
61        pwd_last_set: datetime = attributes.get("pwdLastSet", datetime.now())
62        pwd_last_set = pwd_last_set.replace(tzinfo=UTC)
63        if created or pwd_last_set >= user.password_change_date:
64            self._task.info(f"'{user.username}': Reset user's password")
65            self._logger.debug(
66                "Reset user's password",
67                user=user.username,
68                created=created,
69                pwd_last_set=pwd_last_set,
70            )
71            user.set_unusable_password()
72            user.save()

Check pwdLastSet

def ms_check_uac( self, attributes: dict[str, typing.Any], user: authentik.core.models.User):
74    def ms_check_uac(self, attributes: dict[str, Any], user: User):
75        """Check userAccountControl"""
76        if "userAccountControl" not in attributes:
77            return
78        # Default from https://docs.microsoft.com/en-us/troubleshoot/windows-server/identity
79        #   /useraccountcontrol-manipulate-account-properties
80        uac_bit = attributes.get("userAccountControl", 512)
81        uac = UserAccountControl(uac_bit)
82        is_active = (
83            UserAccountControl.ACCOUNTDISABLE not in uac and UserAccountControl.LOCKOUT not in uac
84        )
85        if is_active != user.is_active:
86            user.is_active = is_active
87            user.save()

Check userAccountControl