authentik.sources.kerberos.auth

authentik Kerberos Authentication Backend

  1"""authentik Kerberos Authentication Backend"""
  2
  3import gssapi
  4from django.http import HttpRequest
  5from structlog.stdlib import get_logger
  6
  7from authentik.core.auth import InbuiltBackend
  8from authentik.core.models import User
  9from authentik.lib.generators import generate_id
 10from authentik.sources.kerberos.models import (
 11    KerberosSource,
 12    Krb5ConfContext,
 13    UserKerberosSourceConnection,
 14)
 15
 16LOGGER = get_logger()
 17
 18
 19class KerberosBackend(InbuiltBackend):
 20    """Authenticate users against Kerberos realm"""
 21
 22    def authenticate(self, request: HttpRequest, **kwargs):
 23        """Try to authenticate a user via kerberos"""
 24        if "password" not in kwargs or "username" not in kwargs:
 25            return None
 26        username = kwargs.pop("username")
 27        realm = None
 28        if "@" in username:
 29            username, realm = username.rsplit("@", 1)
 30
 31        user, source = self.auth_user(request, username, realm, **kwargs)
 32        if user:
 33            self.set_method("kerberos", request, source=source)
 34            return user
 35        return None
 36
 37    def auth_user(
 38        self, request: HttpRequest, username: str, realm: str | None, password: str, **filters
 39    ) -> tuple[User | None, KerberosSource | None]:
 40        sources = KerberosSource.objects.filter(enabled=True)
 41        user = User.objects.filter(
 42            usersourceconnection__source__in=sources, username=username, **filters
 43        ).first()
 44
 45        if user is not None:
 46            # User found, let's get its connections for the sources that are available
 47            user_source_connections = UserKerberosSourceConnection.objects.filter(
 48                user=user, source__in=sources
 49            )
 50        elif realm is not None:
 51            user_source_connections = UserKerberosSourceConnection.objects.filter(
 52                source__in=sources, identifier=f"{username}@{realm}"
 53            )
 54        # no realm specified, we can't do anything
 55        else:
 56            user_source_connections = UserKerberosSourceConnection.objects.none()
 57
 58        if not user_source_connections.exists():
 59            LOGGER.debug("no kerberos source found for user", username=username)
 60            return None, None
 61
 62        for user_source_connection in user_source_connections.prefetch_related().select_related(
 63            "source__kerberossource"
 64        ):
 65            # User either has an unusable password,
 66            # or has a password, but couldn't be authenticated by ModelBackend
 67            # This means we check with a kinit to see if the Kerberos password has changed
 68            if self.auth_user_by_kinit(user_source_connection, password):
 69                # Password was successful in kinit to Kerberos, so we save it in database
 70                if (
 71                    user_source_connection.source.kerberossource.password_login_update_internal_password
 72                ):
 73                    LOGGER.debug(
 74                        "Updating user's password in DB",
 75                        source=user_source_connection.source,
 76                        user=user_source_connection.user,
 77                    )
 78                    user_source_connection.user.set_password(
 79                        password, sender=user_source_connection.source, request=request
 80                    )
 81                    user_source_connection.user.save()
 82                return user_source_connection.user, user_source_connection.source
 83            # Password doesn't match, onto next source
 84            LOGGER.debug(
 85                "failed to kinit, password invalid",
 86                source=user_source_connection.source,
 87                user=user_source_connection.user,
 88            )
 89        # No source with valid password found
 90        LOGGER.debug("no valid kerberos source found for user", user=user)
 91        return None, None
 92
 93    def auth_user_by_kinit(
 94        self, user_source_connection: UserKerberosSourceConnection, password: str
 95    ) -> bool:
 96        """Attempt authentication by kinit to the source."""
 97        LOGGER.debug(
 98            "Attempting to kinit as user",
 99            user=user_source_connection.user,
100            source=user_source_connection.source,
101            principal=user_source_connection.identifier,
102        )
103
104        with Krb5ConfContext(user_source_connection.source.kerberossource):
105            name = gssapi.raw.import_name(
106                user_source_connection.identifier.encode(), gssapi.raw.NameType.kerberos_principal
107            )
108            try:
109                # Use a temporary credentials cache to not interfere with whatever is defined
110                # elsewhere
111                gssapi.raw.ext_krb5.krb5_ccache_name(f"MEMORY:{generate_id(12)}".encode())
112                gssapi.raw.ext_password.acquire_cred_with_password(name, password.encode())
113                # Restore the credentials cache to what it was before
114                gssapi.raw.ext_krb5.krb5_ccache_name(None)
115                return True
116            except gssapi.exceptions.GSSError as exc:
117                LOGGER.warning("failed to kinit", exc=exc)
118        return False
LOGGER = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
class KerberosBackend(authentik.core.auth.InbuiltBackend):
 20class KerberosBackend(InbuiltBackend):
 21    """Authenticate users against Kerberos realm"""
 22
 23    def authenticate(self, request: HttpRequest, **kwargs):
 24        """Try to authenticate a user via kerberos"""
 25        if "password" not in kwargs or "username" not in kwargs:
 26            return None
 27        username = kwargs.pop("username")
 28        realm = None
 29        if "@" in username:
 30            username, realm = username.rsplit("@", 1)
 31
 32        user, source = self.auth_user(request, username, realm, **kwargs)
 33        if user:
 34            self.set_method("kerberos", request, source=source)
 35            return user
 36        return None
 37
 38    def auth_user(
 39        self, request: HttpRequest, username: str, realm: str | None, password: str, **filters
 40    ) -> tuple[User | None, KerberosSource | None]:
 41        sources = KerberosSource.objects.filter(enabled=True)
 42        user = User.objects.filter(
 43            usersourceconnection__source__in=sources, username=username, **filters
 44        ).first()
 45
 46        if user is not None:
 47            # User found, let's get its connections for the sources that are available
 48            user_source_connections = UserKerberosSourceConnection.objects.filter(
 49                user=user, source__in=sources
 50            )
 51        elif realm is not None:
 52            user_source_connections = UserKerberosSourceConnection.objects.filter(
 53                source__in=sources, identifier=f"{username}@{realm}"
 54            )
 55        # no realm specified, we can't do anything
 56        else:
 57            user_source_connections = UserKerberosSourceConnection.objects.none()
 58
 59        if not user_source_connections.exists():
 60            LOGGER.debug("no kerberos source found for user", username=username)
 61            return None, None
 62
 63        for user_source_connection in user_source_connections.prefetch_related().select_related(
 64            "source__kerberossource"
 65        ):
 66            # User either has an unusable password,
 67            # or has a password, but couldn't be authenticated by ModelBackend
 68            # This means we check with a kinit to see if the Kerberos password has changed
 69            if self.auth_user_by_kinit(user_source_connection, password):
 70                # Password was successful in kinit to Kerberos, so we save it in database
 71                if (
 72                    user_source_connection.source.kerberossource.password_login_update_internal_password
 73                ):
 74                    LOGGER.debug(
 75                        "Updating user's password in DB",
 76                        source=user_source_connection.source,
 77                        user=user_source_connection.user,
 78                    )
 79                    user_source_connection.user.set_password(
 80                        password, sender=user_source_connection.source, request=request
 81                    )
 82                    user_source_connection.user.save()
 83                return user_source_connection.user, user_source_connection.source
 84            # Password doesn't match, onto next source
 85            LOGGER.debug(
 86                "failed to kinit, password invalid",
 87                source=user_source_connection.source,
 88                user=user_source_connection.user,
 89            )
 90        # No source with valid password found
 91        LOGGER.debug("no valid kerberos source found for user", user=user)
 92        return None, None
 93
 94    def auth_user_by_kinit(
 95        self, user_source_connection: UserKerberosSourceConnection, password: str
 96    ) -> bool:
 97        """Attempt authentication by kinit to the source."""
 98        LOGGER.debug(
 99            "Attempting to kinit as user",
100            user=user_source_connection.user,
101            source=user_source_connection.source,
102            principal=user_source_connection.identifier,
103        )
104
105        with Krb5ConfContext(user_source_connection.source.kerberossource):
106            name = gssapi.raw.import_name(
107                user_source_connection.identifier.encode(), gssapi.raw.NameType.kerberos_principal
108            )
109            try:
110                # Use a temporary credentials cache to not interfere with whatever is defined
111                # elsewhere
112                gssapi.raw.ext_krb5.krb5_ccache_name(f"MEMORY:{generate_id(12)}".encode())
113                gssapi.raw.ext_password.acquire_cred_with_password(name, password.encode())
114                # Restore the credentials cache to what it was before
115                gssapi.raw.ext_krb5.krb5_ccache_name(None)
116                return True
117            except gssapi.exceptions.GSSError as exc:
118                LOGGER.warning("failed to kinit", exc=exc)
119        return False

Authenticate users against Kerberos realm

def authenticate(self, request: django.http.request.HttpRequest, **kwargs):
23    def authenticate(self, request: HttpRequest, **kwargs):
24        """Try to authenticate a user via kerberos"""
25        if "password" not in kwargs or "username" not in kwargs:
26            return None
27        username = kwargs.pop("username")
28        realm = None
29        if "@" in username:
30            username, realm = username.rsplit("@", 1)
31
32        user, source = self.auth_user(request, username, realm, **kwargs)
33        if user:
34            self.set_method("kerberos", request, source=source)
35            return user
36        return None

Try to authenticate a user via kerberos

def auth_user( self, request: django.http.request.HttpRequest, username: str, realm: str | None, password: str, **filters) -> tuple[authentik.core.models.User | None, authentik.sources.kerberos.models.KerberosSource | None]:
38    def auth_user(
39        self, request: HttpRequest, username: str, realm: str | None, password: str, **filters
40    ) -> tuple[User | None, KerberosSource | None]:
41        sources = KerberosSource.objects.filter(enabled=True)
42        user = User.objects.filter(
43            usersourceconnection__source__in=sources, username=username, **filters
44        ).first()
45
46        if user is not None:
47            # User found, let's get its connections for the sources that are available
48            user_source_connections = UserKerberosSourceConnection.objects.filter(
49                user=user, source__in=sources
50            )
51        elif realm is not None:
52            user_source_connections = UserKerberosSourceConnection.objects.filter(
53                source__in=sources, identifier=f"{username}@{realm}"
54            )
55        # no realm specified, we can't do anything
56        else:
57            user_source_connections = UserKerberosSourceConnection.objects.none()
58
59        if not user_source_connections.exists():
60            LOGGER.debug("no kerberos source found for user", username=username)
61            return None, None
62
63        for user_source_connection in user_source_connections.prefetch_related().select_related(
64            "source__kerberossource"
65        ):
66            # User either has an unusable password,
67            # or has a password, but couldn't be authenticated by ModelBackend
68            # This means we check with a kinit to see if the Kerberos password has changed
69            if self.auth_user_by_kinit(user_source_connection, password):
70                # Password was successful in kinit to Kerberos, so we save it in database
71                if (
72                    user_source_connection.source.kerberossource.password_login_update_internal_password
73                ):
74                    LOGGER.debug(
75                        "Updating user's password in DB",
76                        source=user_source_connection.source,
77                        user=user_source_connection.user,
78                    )
79                    user_source_connection.user.set_password(
80                        password, sender=user_source_connection.source, request=request
81                    )
82                    user_source_connection.user.save()
83                return user_source_connection.user, user_source_connection.source
84            # Password doesn't match, onto next source
85            LOGGER.debug(
86                "failed to kinit, password invalid",
87                source=user_source_connection.source,
88                user=user_source_connection.user,
89            )
90        # No source with valid password found
91        LOGGER.debug("no valid kerberos source found for user", user=user)
92        return None, None
def auth_user_by_kinit( self, user_source_connection: authentik.sources.kerberos.models.UserKerberosSourceConnection, password: str) -> bool:
 94    def auth_user_by_kinit(
 95        self, user_source_connection: UserKerberosSourceConnection, password: str
 96    ) -> bool:
 97        """Attempt authentication by kinit to the source."""
 98        LOGGER.debug(
 99            "Attempting to kinit as user",
100            user=user_source_connection.user,
101            source=user_source_connection.source,
102            principal=user_source_connection.identifier,
103        )
104
105        with Krb5ConfContext(user_source_connection.source.kerberossource):
106            name = gssapi.raw.import_name(
107                user_source_connection.identifier.encode(), gssapi.raw.NameType.kerberos_principal
108            )
109            try:
110                # Use a temporary credentials cache to not interfere with whatever is defined
111                # elsewhere
112                gssapi.raw.ext_krb5.krb5_ccache_name(f"MEMORY:{generate_id(12)}".encode())
113                gssapi.raw.ext_password.acquire_cred_with_password(name, password.encode())
114                # Restore the credentials cache to what it was before
115                gssapi.raw.ext_krb5.krb5_ccache_name(None)
116                return True
117            except gssapi.exceptions.GSSError as exc:
118                LOGGER.warning("failed to kinit", exc=exc)
119        return False

Attempt authentication by kinit to the source.