authentik.sources.ldap.sync.vendor.ms_ad
Active Directory specific
1"""Active Directory specific""" 2 3from collections.abc import Generator 4from datetime import UTC, datetime 5from enum import IntFlag 6from typing import Any 7 8from authentik.core.models import User 9from authentik.sources.ldap.sync.base import BaseLDAPSynchronizer 10 11 12class UserAccountControl(IntFlag): 13 """UserAccountControl attribute for Active directory users""" 14 15 # https://docs.microsoft.com/en-us/troubleshoot/windows-server/identity 16 # /useraccountcontrol-manipulate-account-properties 17 18 SCRIPT = 1 19 ACCOUNTDISABLE = 2 20 HOMEDIR_REQUIRED = 8 21 LOCKOUT = 16 22 PASSWD_NOTREQD = 32 23 PASSWD_CANT_CHANGE = 64 24 ENCRYPTED_TEXT_PWD_ALLOWED = 128 25 TEMP_DUPLICATE_ACCOUNT = 256 26 NORMAL_ACCOUNT = 512 27 INTERDOMAIN_TRUST_ACCOUNT = 2048 28 WORKSTATION_TRUST_ACCOUNT = 4096 29 SERVER_TRUST_ACCOUNT = 8192 30 DONT_EXPIRE_PASSWORD = 65536 31 MNS_LOGON_ACCOUNT = 131072 32 SMARTCARD_REQUIRED = 262144 33 TRUSTED_FOR_DELEGATION = 524288 34 NOT_DELEGATED = 1048576 35 USE_DES_KEY_ONLY = 2097152 36 DONT_REQ_PREAUTH = 4194304 37 PASSWORD_EXPIRED = 8388608 38 TRUSTED_TO_AUTH_FOR_DELEGATION = 16777216 39 PARTIAL_SECRETS_ACCOUNT = 67108864 40 41 42class MicrosoftActiveDirectory(BaseLDAPSynchronizer): 43 """Microsoft-specific LDAP""" 44 45 @staticmethod 46 def name() -> str: 47 return "microsoft_ad" 48 49 def get_objects(self, **kwargs) -> Generator: 50 yield None 51 52 def sync(self, attributes: dict[str, Any], user: User, created: bool): 53 self.ms_check_pwd_last_set(attributes, user, created) 54 self.ms_check_uac(attributes, user) 55 56 def ms_check_pwd_last_set(self, attributes: dict[str, Any], user: User, created: bool): 57 """Check pwdLastSet""" 58 if "pwdLastSet" not in attributes: 59 return 60 pwd_last_set: datetime = attributes.get("pwdLastSet", datetime.now()) 61 pwd_last_set = pwd_last_set.replace(tzinfo=UTC) 62 if created or pwd_last_set >= user.password_change_date: 63 self._task.info(f"'{user.username}': Reset user's password") 64 self._logger.debug( 65 "Reset user's password", 66 user=user.username, 67 created=created, 68 pwd_last_set=pwd_last_set, 69 ) 70 user.set_unusable_password() 71 user.save() 72 73 def ms_check_uac(self, attributes: dict[str, Any], user: User): 74 """Check userAccountControl""" 75 if "userAccountControl" not in attributes: 76 return 77 # Default from https://docs.microsoft.com/en-us/troubleshoot/windows-server/identity 78 # /useraccountcontrol-manipulate-account-properties 79 uac_bit = attributes.get("userAccountControl", 512) 80 uac = UserAccountControl(uac_bit) 81 is_active = ( 82 UserAccountControl.ACCOUNTDISABLE not in uac and UserAccountControl.LOCKOUT not in uac 83 ) 84 if is_active != user.is_active: 85 user.is_active = is_active 86 user.save()
class
UserAccountControl(enum.IntFlag):
13class UserAccountControl(IntFlag): 14 """UserAccountControl attribute for Active directory users""" 15 16 # https://docs.microsoft.com/en-us/troubleshoot/windows-server/identity 17 # /useraccountcontrol-manipulate-account-properties 18 19 SCRIPT = 1 20 ACCOUNTDISABLE = 2 21 HOMEDIR_REQUIRED = 8 22 LOCKOUT = 16 23 PASSWD_NOTREQD = 32 24 PASSWD_CANT_CHANGE = 64 25 ENCRYPTED_TEXT_PWD_ALLOWED = 128 26 TEMP_DUPLICATE_ACCOUNT = 256 27 NORMAL_ACCOUNT = 512 28 INTERDOMAIN_TRUST_ACCOUNT = 2048 29 WORKSTATION_TRUST_ACCOUNT = 4096 30 SERVER_TRUST_ACCOUNT = 8192 31 DONT_EXPIRE_PASSWORD = 65536 32 MNS_LOGON_ACCOUNT = 131072 33 SMARTCARD_REQUIRED = 262144 34 TRUSTED_FOR_DELEGATION = 524288 35 NOT_DELEGATED = 1048576 36 USE_DES_KEY_ONLY = 2097152 37 DONT_REQ_PREAUTH = 4194304 38 PASSWORD_EXPIRED = 8388608 39 TRUSTED_TO_AUTH_FOR_DELEGATION = 16777216 40 PARTIAL_SECRETS_ACCOUNT = 67108864
UserAccountControl attribute for Active directory users
SCRIPT =
<UserAccountControl.SCRIPT: 1>
ACCOUNTDISABLE =
<UserAccountControl.ACCOUNTDISABLE: 2>
HOMEDIR_REQUIRED =
<UserAccountControl.HOMEDIR_REQUIRED: 8>
LOCKOUT =
<UserAccountControl.LOCKOUT: 16>
PASSWD_NOTREQD =
<UserAccountControl.PASSWD_NOTREQD: 32>
PASSWD_CANT_CHANGE =
<UserAccountControl.PASSWD_CANT_CHANGE: 64>
ENCRYPTED_TEXT_PWD_ALLOWED =
<UserAccountControl.ENCRYPTED_TEXT_PWD_ALLOWED: 128>
TEMP_DUPLICATE_ACCOUNT =
<UserAccountControl.TEMP_DUPLICATE_ACCOUNT: 256>
NORMAL_ACCOUNT =
<UserAccountControl.NORMAL_ACCOUNT: 512>
INTERDOMAIN_TRUST_ACCOUNT =
<UserAccountControl.INTERDOMAIN_TRUST_ACCOUNT: 2048>
WORKSTATION_TRUST_ACCOUNT =
<UserAccountControl.WORKSTATION_TRUST_ACCOUNT: 4096>
SERVER_TRUST_ACCOUNT =
<UserAccountControl.SERVER_TRUST_ACCOUNT: 8192>
DONT_EXPIRE_PASSWORD =
<UserAccountControl.DONT_EXPIRE_PASSWORD: 65536>
MNS_LOGON_ACCOUNT =
<UserAccountControl.MNS_LOGON_ACCOUNT: 131072>
SMARTCARD_REQUIRED =
<UserAccountControl.SMARTCARD_REQUIRED: 262144>
TRUSTED_FOR_DELEGATION =
<UserAccountControl.TRUSTED_FOR_DELEGATION: 524288>
NOT_DELEGATED =
<UserAccountControl.NOT_DELEGATED: 1048576>
USE_DES_KEY_ONLY =
<UserAccountControl.USE_DES_KEY_ONLY: 2097152>
DONT_REQ_PREAUTH =
<UserAccountControl.DONT_REQ_PREAUTH: 4194304>
PASSWORD_EXPIRED =
<UserAccountControl.PASSWORD_EXPIRED: 8388608>
TRUSTED_TO_AUTH_FOR_DELEGATION =
<UserAccountControl.TRUSTED_TO_AUTH_FOR_DELEGATION: 16777216>
PARTIAL_SECRETS_ACCOUNT =
<UserAccountControl.PARTIAL_SECRETS_ACCOUNT: 67108864>
43class MicrosoftActiveDirectory(BaseLDAPSynchronizer): 44 """Microsoft-specific LDAP""" 45 46 @staticmethod 47 def name() -> str: 48 return "microsoft_ad" 49 50 def get_objects(self, **kwargs) -> Generator: 51 yield None 52 53 def sync(self, attributes: dict[str, Any], user: User, created: bool): 54 self.ms_check_pwd_last_set(attributes, user, created) 55 self.ms_check_uac(attributes, user) 56 57 def ms_check_pwd_last_set(self, attributes: dict[str, Any], user: User, created: bool): 58 """Check pwdLastSet""" 59 if "pwdLastSet" not in attributes: 60 return 61 pwd_last_set: datetime = attributes.get("pwdLastSet", datetime.now()) 62 pwd_last_set = pwd_last_set.replace(tzinfo=UTC) 63 if created or pwd_last_set >= user.password_change_date: 64 self._task.info(f"'{user.username}': Reset user's password") 65 self._logger.debug( 66 "Reset user's password", 67 user=user.username, 68 created=created, 69 pwd_last_set=pwd_last_set, 70 ) 71 user.set_unusable_password() 72 user.save() 73 74 def ms_check_uac(self, attributes: dict[str, Any], user: User): 75 """Check userAccountControl""" 76 if "userAccountControl" not in attributes: 77 return 78 # Default from https://docs.microsoft.com/en-us/troubleshoot/windows-server/identity 79 # /useraccountcontrol-manipulate-account-properties 80 uac_bit = attributes.get("userAccountControl", 512) 81 uac = UserAccountControl(uac_bit) 82 is_active = ( 83 UserAccountControl.ACCOUNTDISABLE not in uac and UserAccountControl.LOCKOUT not in uac 84 ) 85 if is_active != user.is_active: 86 user.is_active = is_active 87 user.save()
Microsoft-specific LDAP
def
sync( self, attributes: dict[str, typing.Any], user: authentik.core.models.User, created: bool):
53 def sync(self, attributes: dict[str, Any], user: User, created: bool): 54 self.ms_check_pwd_last_set(attributes, user, created) 55 self.ms_check_uac(attributes, user)
Sync function, implemented in subclass
def
ms_check_pwd_last_set( self, attributes: dict[str, typing.Any], user: authentik.core.models.User, created: bool):
57 def ms_check_pwd_last_set(self, attributes: dict[str, Any], user: User, created: bool): 58 """Check pwdLastSet""" 59 if "pwdLastSet" not in attributes: 60 return 61 pwd_last_set: datetime = attributes.get("pwdLastSet", datetime.now()) 62 pwd_last_set = pwd_last_set.replace(tzinfo=UTC) 63 if created or pwd_last_set >= user.password_change_date: 64 self._task.info(f"'{user.username}': Reset user's password") 65 self._logger.debug( 66 "Reset user's password", 67 user=user.username, 68 created=created, 69 pwd_last_set=pwd_last_set, 70 ) 71 user.set_unusable_password() 72 user.save()
Check pwdLastSet
74 def ms_check_uac(self, attributes: dict[str, Any], user: User): 75 """Check userAccountControl""" 76 if "userAccountControl" not in attributes: 77 return 78 # Default from https://docs.microsoft.com/en-us/troubleshoot/windows-server/identity 79 # /useraccountcontrol-manipulate-account-properties 80 uac_bit = attributes.get("userAccountControl", 512) 81 uac = UserAccountControl(uac_bit) 82 is_active = ( 83 UserAccountControl.ACCOUNTDISABLE not in uac and UserAccountControl.LOCKOUT not in uac 84 ) 85 if is_active != user.is_active: 86 user.is_active = is_active 87 user.save()
Check userAccountControl