authentik.sources.kerberos.auth
authentik Kerberos Authentication Backend
1"""authentik Kerberos Authentication Backend""" 2 3import gssapi 4from django.http import HttpRequest 5from structlog.stdlib import get_logger 6 7from authentik.core.auth import InbuiltBackend 8from authentik.core.models import User 9from authentik.lib.generators import generate_id 10from authentik.sources.kerberos.models import ( 11 KerberosSource, 12 Krb5ConfContext, 13 UserKerberosSourceConnection, 14) 15 16LOGGER = get_logger() 17 18 19class KerberosBackend(InbuiltBackend): 20 """Authenticate users against Kerberos realm""" 21 22 def authenticate(self, request: HttpRequest, **kwargs): 23 """Try to authenticate a user via kerberos""" 24 if "password" not in kwargs or "username" not in kwargs: 25 return None 26 username = kwargs.pop("username") 27 realm = None 28 if "@" in username: 29 username, realm = username.rsplit("@", 1) 30 31 user, source = self.auth_user(request, username, realm, **kwargs) 32 if user: 33 self.set_method("kerberos", request, source=source) 34 return user 35 return None 36 37 def auth_user( 38 self, request: HttpRequest, username: str, realm: str | None, password: str, **filters 39 ) -> tuple[User | None, KerberosSource | None]: 40 sources = KerberosSource.objects.filter(enabled=True) 41 user = User.objects.filter( 42 usersourceconnection__source__in=sources, username=username, **filters 43 ).first() 44 45 if user is not None: 46 # User found, let's get its connections for the sources that are available 47 user_source_connections = UserKerberosSourceConnection.objects.filter( 48 user=user, source__in=sources 49 ) 50 elif realm is not None: 51 user_source_connections = UserKerberosSourceConnection.objects.filter( 52 source__in=sources, identifier=f"{username}@{realm}" 53 ) 54 # no realm specified, we can't do anything 55 else: 56 user_source_connections = UserKerberosSourceConnection.objects.none() 57 58 if not user_source_connections.exists(): 59 LOGGER.debug("no kerberos source found for user", username=username) 60 return None, None 61 62 for user_source_connection in user_source_connections.prefetch_related().select_related( 63 "source__kerberossource" 64 ): 65 # User either has an unusable password, 66 # or has a password, but couldn't be authenticated by ModelBackend 67 # This means we check with a kinit to see if the Kerberos password has changed 68 if self.auth_user_by_kinit(user_source_connection, password): 69 # Password was successful in kinit to Kerberos, so we save it in database 70 if ( 71 user_source_connection.source.kerberossource.password_login_update_internal_password 72 ): 73 LOGGER.debug( 74 "Updating user's password in DB", 75 source=user_source_connection.source, 76 user=user_source_connection.user, 77 ) 78 user_source_connection.user.set_password( 79 password, sender=user_source_connection.source, request=request 80 ) 81 user_source_connection.user.save() 82 return user_source_connection.user, user_source_connection.source 83 # Password doesn't match, onto next source 84 LOGGER.debug( 85 "failed to kinit, password invalid", 86 source=user_source_connection.source, 87 user=user_source_connection.user, 88 ) 89 # No source with valid password found 90 LOGGER.debug("no valid kerberos source found for user", user=user) 91 return None, None 92 93 def auth_user_by_kinit( 94 self, user_source_connection: UserKerberosSourceConnection, password: str 95 ) -> bool: 96 """Attempt authentication by kinit to the source.""" 97 LOGGER.debug( 98 "Attempting to kinit as user", 99 user=user_source_connection.user, 100 source=user_source_connection.source, 101 principal=user_source_connection.identifier, 102 ) 103 104 with Krb5ConfContext(user_source_connection.source.kerberossource): 105 name = gssapi.raw.import_name( 106 user_source_connection.identifier.encode(), gssapi.raw.NameType.kerberos_principal 107 ) 108 try: 109 # Use a temporary credentials cache to not interfere with whatever is defined 110 # elsewhere 111 gssapi.raw.ext_krb5.krb5_ccache_name(f"MEMORY:{generate_id(12)}".encode()) 112 gssapi.raw.ext_password.acquire_cred_with_password(name, password.encode()) 113 # Restore the credentials cache to what it was before 114 gssapi.raw.ext_krb5.krb5_ccache_name(None) 115 return True 116 except gssapi.exceptions.GSSError as exc: 117 LOGGER.warning("failed to kinit", exc=exc) 118 return False
LOGGER =
<BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
20class KerberosBackend(InbuiltBackend): 21 """Authenticate users against Kerberos realm""" 22 23 def authenticate(self, request: HttpRequest, **kwargs): 24 """Try to authenticate a user via kerberos""" 25 if "password" not in kwargs or "username" not in kwargs: 26 return None 27 username = kwargs.pop("username") 28 realm = None 29 if "@" in username: 30 username, realm = username.rsplit("@", 1) 31 32 user, source = self.auth_user(request, username, realm, **kwargs) 33 if user: 34 self.set_method("kerberos", request, source=source) 35 return user 36 return None 37 38 def auth_user( 39 self, request: HttpRequest, username: str, realm: str | None, password: str, **filters 40 ) -> tuple[User | None, KerberosSource | None]: 41 sources = KerberosSource.objects.filter(enabled=True) 42 user = User.objects.filter( 43 usersourceconnection__source__in=sources, username=username, **filters 44 ).first() 45 46 if user is not None: 47 # User found, let's get its connections for the sources that are available 48 user_source_connections = UserKerberosSourceConnection.objects.filter( 49 user=user, source__in=sources 50 ) 51 elif realm is not None: 52 user_source_connections = UserKerberosSourceConnection.objects.filter( 53 source__in=sources, identifier=f"{username}@{realm}" 54 ) 55 # no realm specified, we can't do anything 56 else: 57 user_source_connections = UserKerberosSourceConnection.objects.none() 58 59 if not user_source_connections.exists(): 60 LOGGER.debug("no kerberos source found for user", username=username) 61 return None, None 62 63 for user_source_connection in user_source_connections.prefetch_related().select_related( 64 "source__kerberossource" 65 ): 66 # User either has an unusable password, 67 # or has a password, but couldn't be authenticated by ModelBackend 68 # This means we check with a kinit to see if the Kerberos password has changed 69 if self.auth_user_by_kinit(user_source_connection, password): 70 # Password was successful in kinit to Kerberos, so we save it in database 71 if ( 72 user_source_connection.source.kerberossource.password_login_update_internal_password 73 ): 74 LOGGER.debug( 75 "Updating user's password in DB", 76 source=user_source_connection.source, 77 user=user_source_connection.user, 78 ) 79 user_source_connection.user.set_password( 80 password, sender=user_source_connection.source, request=request 81 ) 82 user_source_connection.user.save() 83 return user_source_connection.user, user_source_connection.source 84 # Password doesn't match, onto next source 85 LOGGER.debug( 86 "failed to kinit, password invalid", 87 source=user_source_connection.source, 88 user=user_source_connection.user, 89 ) 90 # No source with valid password found 91 LOGGER.debug("no valid kerberos source found for user", user=user) 92 return None, None 93 94 def auth_user_by_kinit( 95 self, user_source_connection: UserKerberosSourceConnection, password: str 96 ) -> bool: 97 """Attempt authentication by kinit to the source.""" 98 LOGGER.debug( 99 "Attempting to kinit as user", 100 user=user_source_connection.user, 101 source=user_source_connection.source, 102 principal=user_source_connection.identifier, 103 ) 104 105 with Krb5ConfContext(user_source_connection.source.kerberossource): 106 name = gssapi.raw.import_name( 107 user_source_connection.identifier.encode(), gssapi.raw.NameType.kerberos_principal 108 ) 109 try: 110 # Use a temporary credentials cache to not interfere with whatever is defined 111 # elsewhere 112 gssapi.raw.ext_krb5.krb5_ccache_name(f"MEMORY:{generate_id(12)}".encode()) 113 gssapi.raw.ext_password.acquire_cred_with_password(name, password.encode()) 114 # Restore the credentials cache to what it was before 115 gssapi.raw.ext_krb5.krb5_ccache_name(None) 116 return True 117 except gssapi.exceptions.GSSError as exc: 118 LOGGER.warning("failed to kinit", exc=exc) 119 return False
Authenticate users against Kerberos realm
def
authenticate(self, request: django.http.request.HttpRequest, **kwargs):
23 def authenticate(self, request: HttpRequest, **kwargs): 24 """Try to authenticate a user via kerberos""" 25 if "password" not in kwargs or "username" not in kwargs: 26 return None 27 username = kwargs.pop("username") 28 realm = None 29 if "@" in username: 30 username, realm = username.rsplit("@", 1) 31 32 user, source = self.auth_user(request, username, realm, **kwargs) 33 if user: 34 self.set_method("kerberos", request, source=source) 35 return user 36 return None
Try to authenticate a user via kerberos
def
auth_user( self, request: django.http.request.HttpRequest, username: str, realm: str | None, password: str, **filters) -> tuple[authentik.core.models.User | None, authentik.sources.kerberos.models.KerberosSource | None]:
38 def auth_user( 39 self, request: HttpRequest, username: str, realm: str | None, password: str, **filters 40 ) -> tuple[User | None, KerberosSource | None]: 41 sources = KerberosSource.objects.filter(enabled=True) 42 user = User.objects.filter( 43 usersourceconnection__source__in=sources, username=username, **filters 44 ).first() 45 46 if user is not None: 47 # User found, let's get its connections for the sources that are available 48 user_source_connections = UserKerberosSourceConnection.objects.filter( 49 user=user, source__in=sources 50 ) 51 elif realm is not None: 52 user_source_connections = UserKerberosSourceConnection.objects.filter( 53 source__in=sources, identifier=f"{username}@{realm}" 54 ) 55 # no realm specified, we can't do anything 56 else: 57 user_source_connections = UserKerberosSourceConnection.objects.none() 58 59 if not user_source_connections.exists(): 60 LOGGER.debug("no kerberos source found for user", username=username) 61 return None, None 62 63 for user_source_connection in user_source_connections.prefetch_related().select_related( 64 "source__kerberossource" 65 ): 66 # User either has an unusable password, 67 # or has a password, but couldn't be authenticated by ModelBackend 68 # This means we check with a kinit to see if the Kerberos password has changed 69 if self.auth_user_by_kinit(user_source_connection, password): 70 # Password was successful in kinit to Kerberos, so we save it in database 71 if ( 72 user_source_connection.source.kerberossource.password_login_update_internal_password 73 ): 74 LOGGER.debug( 75 "Updating user's password in DB", 76 source=user_source_connection.source, 77 user=user_source_connection.user, 78 ) 79 user_source_connection.user.set_password( 80 password, sender=user_source_connection.source, request=request 81 ) 82 user_source_connection.user.save() 83 return user_source_connection.user, user_source_connection.source 84 # Password doesn't match, onto next source 85 LOGGER.debug( 86 "failed to kinit, password invalid", 87 source=user_source_connection.source, 88 user=user_source_connection.user, 89 ) 90 # No source with valid password found 91 LOGGER.debug("no valid kerberos source found for user", user=user) 92 return None, None
def
auth_user_by_kinit( self, user_source_connection: authentik.sources.kerberos.models.UserKerberosSourceConnection, password: str) -> bool:
94 def auth_user_by_kinit( 95 self, user_source_connection: UserKerberosSourceConnection, password: str 96 ) -> bool: 97 """Attempt authentication by kinit to the source.""" 98 LOGGER.debug( 99 "Attempting to kinit as user", 100 user=user_source_connection.user, 101 source=user_source_connection.source, 102 principal=user_source_connection.identifier, 103 ) 104 105 with Krb5ConfContext(user_source_connection.source.kerberossource): 106 name = gssapi.raw.import_name( 107 user_source_connection.identifier.encode(), gssapi.raw.NameType.kerberos_principal 108 ) 109 try: 110 # Use a temporary credentials cache to not interfere with whatever is defined 111 # elsewhere 112 gssapi.raw.ext_krb5.krb5_ccache_name(f"MEMORY:{generate_id(12)}".encode()) 113 gssapi.raw.ext_password.acquire_cred_with_password(name, password.encode()) 114 # Restore the credentials cache to what it was before 115 gssapi.raw.ext_krb5.krb5_ccache_name(None) 116 return True 117 except gssapi.exceptions.GSSError as exc: 118 LOGGER.warning("failed to kinit", exc=exc) 119 return False
Attempt authentication by kinit to the source.