authentik.sources.kerberos.views

Kerberos source SPNEGO views

  1"""Kerberos source SPNEGO views"""
  2
  3from base64 import b64decode, b64encode
  4
  5import gssapi
  6from django.core.cache import cache
  7from django.core.exceptions import SuspiciousOperation
  8from django.http import HttpResponse
  9from django.shortcuts import get_object_or_404, redirect, render, reverse
 10from django.utils.crypto import get_random_string
 11from django.utils.translation import gettext_lazy as _
 12from django.views import View
 13from structlog.stdlib import get_logger
 14
 15from authentik.core.sources.flow_manager import SourceFlowManager
 16from authentik.sources.kerberos.models import (
 17    GroupKerberosSourceConnection,
 18    KerberosSource,
 19    Krb5ConfContext,
 20    UserKerberosSourceConnection,
 21)
 22
 23LOGGER = get_logger()
 24
 25SPNEGO_REQUEST_STATUS = 401
 26WWW_AUTHENTICATE = "WWW-Authenticate"
 27HTTP_AUTHORIZATION = "Authorization"
 28NEGOTIATE = "Negotiate"
 29
 30SPNEGO_STATE_CACHE_PREFIX = "goauthentik.io/sources/spnego"
 31SPNEGO_STATE_CACHE_TIMEOUT = 60 * 5  # 5 minutes
 32
 33
 34def add_negotiate_to_response(
 35    response: HttpResponse, token: str | bytes | None = None
 36) -> HttpResponse:
 37    if isinstance(token, str):
 38        token = token.encode()
 39    response[WWW_AUTHENTICATE] = (
 40        NEGOTIATE if token is None else f"{NEGOTIATE} {b64encode(token).decode('ascii')}"
 41    )
 42    return response
 43
 44
 45class SPNEGOView(View):
 46    """SPNEGO login"""
 47
 48    source: KerberosSource
 49
 50    def challenge(self, request, token: str | bytes | None = None) -> HttpResponse:
 51        """Get SNPEGO challenge response"""
 52        response = render(
 53            request,
 54            "if/error.html",
 55            context={
 56                "title": _("SPNEGO authentication required"),
 57                "message": _("""
 58                    Make sure you have valid tickets (obtainable via kinit)
 59                    and configured the browser correctly.
 60                    Please contact your administrator.
 61                """),
 62            },
 63            status=401,
 64        )
 65        return add_negotiate_to_response(response, token)
 66
 67    def get_authstr(self, request) -> str | None:
 68        """Get SPNEGO authentication string from headers"""
 69        authorization_header = request.headers.get(HTTP_AUTHORIZATION, "")
 70        if NEGOTIATE.lower() not in authorization_header.lower():
 71            return None
 72
 73        auth_tuple = authorization_header.split(" ", 1)
 74        if not auth_tuple or auth_tuple[0].lower() != NEGOTIATE.lower():
 75            return None
 76        if len(auth_tuple) != 2:  # noqa: PLR2004
 77            raise SuspiciousOperation("Malformed authorization header")
 78        return auth_tuple[1]
 79
 80    def new_state(self) -> str:
 81        """Generate request state"""
 82        return get_random_string(32)
 83
 84    def get_server_ctx(self, key: str) -> gssapi.sec_contexts.SecurityContext | None:
 85        """Get GSSAPI server context from cache or create it"""
 86        server_creds = self.source.get_gssapi_creds()
 87        if server_creds is None:
 88            return None
 89
 90        state = cache.get(f"{SPNEGO_STATE_CACHE_PREFIX}/{key}", None)
 91
 92        if state:
 93            # pylint: disable=c-extension-no-member
 94            return gssapi.sec_contexts.SecurityContext(
 95                base=gssapi.raw.sec_contexts.import_sec_context(state),
 96            )
 97
 98        return gssapi.sec_contexts.SecurityContext(creds=server_creds, usage="accept")
 99
100    def set_server_ctx(self, key: str, ctx: gssapi.sec_contexts.SecurityContext):
101        """Store the GSSAPI server context in cache"""
102        cache.set(f"{SPNEGO_STATE_CACHE_PREFIX}/{key}", ctx.export(), SPNEGO_STATE_CACHE_TIMEOUT)
103
104    # pylint: disable=too-many-return-statements
105    def dispatch(self, request, *args, **kwargs) -> HttpResponse:
106        """Process SPNEGO request"""
107        self.source: KerberosSource = get_object_or_404(
108            KerberosSource,
109            slug=kwargs.get("source_slug", ""),
110            enabled=True,
111        )
112
113        qstring = request.GET if request.method == "GET" else request.POST
114        state = qstring.get("state", None)
115        if not state:
116            return redirect(
117                reverse(
118                    "authentik_sources_kerberos:spnego-login",
119                    kwargs={"source_slug": self.source.slug},
120                )
121                + f"?state={self.new_state()}"
122            )
123
124        authstr = self.get_authstr(request)
125        if not authstr:
126            LOGGER.debug("authstr not present, sending challenge")
127            return self.challenge(request)
128
129        try:
130            in_token = b64decode(authstr)
131        except TypeError, ValueError:
132            return self.challenge(request)
133
134        with Krb5ConfContext(self.source):
135            server_ctx = self.get_server_ctx(state)
136            if not server_ctx:
137                return self.challenge(request)
138
139            try:
140                out_token = server_ctx.step(in_token)
141            except gssapi.exceptions.GSSError as exc:
142                LOGGER.debug("GSSAPI security context failure", exc=exc)
143                return self.challenge(request)
144
145            if not server_ctx.complete or server_ctx.initiator_name is None:
146                self.set_server_ctx(state, server_ctx)
147                return self.challenge(request, out_token)
148
149            def name_to_str(n: gssapi.names.Name) -> str:
150                return n.display_as(n.name_type)
151
152            identifier = name_to_str(server_ctx.initiator_name)
153            context = {
154                "spnego_info": {
155                    "initiator_name": name_to_str(server_ctx.initiator_name),
156                    "target_name": name_to_str(server_ctx.target_name),
157                    "mech": str(server_ctx.mech),
158                    "actual_flags": server_ctx.actual_flags,
159                },
160            }
161
162        response = SPNEGOSourceFlowManager(
163            source=self.source,
164            request=request,
165            identifier=identifier,
166            user_info={
167                "principal": identifier,
168                **context,
169            },
170            policy_context=context,
171        ).get_flow()
172        return add_negotiate_to_response(response, out_token)
173
174
175class SPNEGOSourceFlowManager(SourceFlowManager):
176    """Flow manager for Kerberos SPNEGO sources"""
177
178    user_connection_type = UserKerberosSourceConnection
179    group_connection_type = GroupKerberosSourceConnection
LOGGER = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
SPNEGO_REQUEST_STATUS = 401
WWW_AUTHENTICATE = 'WWW-Authenticate'
HTTP_AUTHORIZATION = 'Authorization'
NEGOTIATE = 'Negotiate'
SPNEGO_STATE_CACHE_PREFIX = 'goauthentik.io/sources/spnego'
SPNEGO_STATE_CACHE_TIMEOUT = 300
def add_negotiate_to_response( response: django.http.response.HttpResponse, token: str | bytes | None = None) -> django.http.response.HttpResponse:
35def add_negotiate_to_response(
36    response: HttpResponse, token: str | bytes | None = None
37) -> HttpResponse:
38    if isinstance(token, str):
39        token = token.encode()
40    response[WWW_AUTHENTICATE] = (
41        NEGOTIATE if token is None else f"{NEGOTIATE} {b64encode(token).decode('ascii')}"
42    )
43    return response
class SPNEGOView(django.views.generic.base.View):
 46class SPNEGOView(View):
 47    """SPNEGO login"""
 48
 49    source: KerberosSource
 50
 51    def challenge(self, request, token: str | bytes | None = None) -> HttpResponse:
 52        """Get SNPEGO challenge response"""
 53        response = render(
 54            request,
 55            "if/error.html",
 56            context={
 57                "title": _("SPNEGO authentication required"),
 58                "message": _("""
 59                    Make sure you have valid tickets (obtainable via kinit)
 60                    and configured the browser correctly.
 61                    Please contact your administrator.
 62                """),
 63            },
 64            status=401,
 65        )
 66        return add_negotiate_to_response(response, token)
 67
 68    def get_authstr(self, request) -> str | None:
 69        """Get SPNEGO authentication string from headers"""
 70        authorization_header = request.headers.get(HTTP_AUTHORIZATION, "")
 71        if NEGOTIATE.lower() not in authorization_header.lower():
 72            return None
 73
 74        auth_tuple = authorization_header.split(" ", 1)
 75        if not auth_tuple or auth_tuple[0].lower() != NEGOTIATE.lower():
 76            return None
 77        if len(auth_tuple) != 2:  # noqa: PLR2004
 78            raise SuspiciousOperation("Malformed authorization header")
 79        return auth_tuple[1]
 80
 81    def new_state(self) -> str:
 82        """Generate request state"""
 83        return get_random_string(32)
 84
 85    def get_server_ctx(self, key: str) -> gssapi.sec_contexts.SecurityContext | None:
 86        """Get GSSAPI server context from cache or create it"""
 87        server_creds = self.source.get_gssapi_creds()
 88        if server_creds is None:
 89            return None
 90
 91        state = cache.get(f"{SPNEGO_STATE_CACHE_PREFIX}/{key}", None)
 92
 93        if state:
 94            # pylint: disable=c-extension-no-member
 95            return gssapi.sec_contexts.SecurityContext(
 96                base=gssapi.raw.sec_contexts.import_sec_context(state),
 97            )
 98
 99        return gssapi.sec_contexts.SecurityContext(creds=server_creds, usage="accept")
100
101    def set_server_ctx(self, key: str, ctx: gssapi.sec_contexts.SecurityContext):
102        """Store the GSSAPI server context in cache"""
103        cache.set(f"{SPNEGO_STATE_CACHE_PREFIX}/{key}", ctx.export(), SPNEGO_STATE_CACHE_TIMEOUT)
104
105    # pylint: disable=too-many-return-statements
106    def dispatch(self, request, *args, **kwargs) -> HttpResponse:
107        """Process SPNEGO request"""
108        self.source: KerberosSource = get_object_or_404(
109            KerberosSource,
110            slug=kwargs.get("source_slug", ""),
111            enabled=True,
112        )
113
114        qstring = request.GET if request.method == "GET" else request.POST
115        state = qstring.get("state", None)
116        if not state:
117            return redirect(
118                reverse(
119                    "authentik_sources_kerberos:spnego-login",
120                    kwargs={"source_slug": self.source.slug},
121                )
122                + f"?state={self.new_state()}"
123            )
124
125        authstr = self.get_authstr(request)
126        if not authstr:
127            LOGGER.debug("authstr not present, sending challenge")
128            return self.challenge(request)
129
130        try:
131            in_token = b64decode(authstr)
132        except TypeError, ValueError:
133            return self.challenge(request)
134
135        with Krb5ConfContext(self.source):
136            server_ctx = self.get_server_ctx(state)
137            if not server_ctx:
138                return self.challenge(request)
139
140            try:
141                out_token = server_ctx.step(in_token)
142            except gssapi.exceptions.GSSError as exc:
143                LOGGER.debug("GSSAPI security context failure", exc=exc)
144                return self.challenge(request)
145
146            if not server_ctx.complete or server_ctx.initiator_name is None:
147                self.set_server_ctx(state, server_ctx)
148                return self.challenge(request, out_token)
149
150            def name_to_str(n: gssapi.names.Name) -> str:
151                return n.display_as(n.name_type)
152
153            identifier = name_to_str(server_ctx.initiator_name)
154            context = {
155                "spnego_info": {
156                    "initiator_name": name_to_str(server_ctx.initiator_name),
157                    "target_name": name_to_str(server_ctx.target_name),
158                    "mech": str(server_ctx.mech),
159                    "actual_flags": server_ctx.actual_flags,
160                },
161            }
162
163        response = SPNEGOSourceFlowManager(
164            source=self.source,
165            request=request,
166            identifier=identifier,
167            user_info={
168                "principal": identifier,
169                **context,
170            },
171            policy_context=context,
172        ).get_flow()
173        return add_negotiate_to_response(response, out_token)

SPNEGO login

def challenge( self, request, token: str | bytes | None = None) -> django.http.response.HttpResponse:
51    def challenge(self, request, token: str | bytes | None = None) -> HttpResponse:
52        """Get SNPEGO challenge response"""
53        response = render(
54            request,
55            "if/error.html",
56            context={
57                "title": _("SPNEGO authentication required"),
58                "message": _("""
59                    Make sure you have valid tickets (obtainable via kinit)
60                    and configured the browser correctly.
61                    Please contact your administrator.
62                """),
63            },
64            status=401,
65        )
66        return add_negotiate_to_response(response, token)

Get SNPEGO challenge response

def get_authstr(self, request) -> str | None:
68    def get_authstr(self, request) -> str | None:
69        """Get SPNEGO authentication string from headers"""
70        authorization_header = request.headers.get(HTTP_AUTHORIZATION, "")
71        if NEGOTIATE.lower() not in authorization_header.lower():
72            return None
73
74        auth_tuple = authorization_header.split(" ", 1)
75        if not auth_tuple or auth_tuple[0].lower() != NEGOTIATE.lower():
76            return None
77        if len(auth_tuple) != 2:  # noqa: PLR2004
78            raise SuspiciousOperation("Malformed authorization header")
79        return auth_tuple[1]

Get SPNEGO authentication string from headers

def new_state(self) -> str:
81    def new_state(self) -> str:
82        """Generate request state"""
83        return get_random_string(32)

Generate request state

def get_server_ctx(self, key: str) -> gssapi.sec_contexts.SecurityContext | None:
85    def get_server_ctx(self, key: str) -> gssapi.sec_contexts.SecurityContext | None:
86        """Get GSSAPI server context from cache or create it"""
87        server_creds = self.source.get_gssapi_creds()
88        if server_creds is None:
89            return None
90
91        state = cache.get(f"{SPNEGO_STATE_CACHE_PREFIX}/{key}", None)
92
93        if state:
94            # pylint: disable=c-extension-no-member
95            return gssapi.sec_contexts.SecurityContext(
96                base=gssapi.raw.sec_contexts.import_sec_context(state),
97            )
98
99        return gssapi.sec_contexts.SecurityContext(creds=server_creds, usage="accept")

Get GSSAPI server context from cache or create it

def set_server_ctx(self, key: str, ctx: gssapi.sec_contexts.SecurityContext):
101    def set_server_ctx(self, key: str, ctx: gssapi.sec_contexts.SecurityContext):
102        """Store the GSSAPI server context in cache"""
103        cache.set(f"{SPNEGO_STATE_CACHE_PREFIX}/{key}", ctx.export(), SPNEGO_STATE_CACHE_TIMEOUT)

Store the GSSAPI server context in cache

def dispatch(self, request, *args, **kwargs) -> django.http.response.HttpResponse:
106    def dispatch(self, request, *args, **kwargs) -> HttpResponse:
107        """Process SPNEGO request"""
108        self.source: KerberosSource = get_object_or_404(
109            KerberosSource,
110            slug=kwargs.get("source_slug", ""),
111            enabled=True,
112        )
113
114        qstring = request.GET if request.method == "GET" else request.POST
115        state = qstring.get("state", None)
116        if not state:
117            return redirect(
118                reverse(
119                    "authentik_sources_kerberos:spnego-login",
120                    kwargs={"source_slug": self.source.slug},
121                )
122                + f"?state={self.new_state()}"
123            )
124
125        authstr = self.get_authstr(request)
126        if not authstr:
127            LOGGER.debug("authstr not present, sending challenge")
128            return self.challenge(request)
129
130        try:
131            in_token = b64decode(authstr)
132        except TypeError, ValueError:
133            return self.challenge(request)
134
135        with Krb5ConfContext(self.source):
136            server_ctx = self.get_server_ctx(state)
137            if not server_ctx:
138                return self.challenge(request)
139
140            try:
141                out_token = server_ctx.step(in_token)
142            except gssapi.exceptions.GSSError as exc:
143                LOGGER.debug("GSSAPI security context failure", exc=exc)
144                return self.challenge(request)
145
146            if not server_ctx.complete or server_ctx.initiator_name is None:
147                self.set_server_ctx(state, server_ctx)
148                return self.challenge(request, out_token)
149
150            def name_to_str(n: gssapi.names.Name) -> str:
151                return n.display_as(n.name_type)
152
153            identifier = name_to_str(server_ctx.initiator_name)
154            context = {
155                "spnego_info": {
156                    "initiator_name": name_to_str(server_ctx.initiator_name),
157                    "target_name": name_to_str(server_ctx.target_name),
158                    "mech": str(server_ctx.mech),
159                    "actual_flags": server_ctx.actual_flags,
160                },
161            }
162
163        response = SPNEGOSourceFlowManager(
164            source=self.source,
165            request=request,
166            identifier=identifier,
167            user_info={
168                "principal": identifier,
169                **context,
170            },
171            policy_context=context,
172        ).get_flow()
173        return add_negotiate_to_response(response, out_token)

Process SPNEGO request

class SPNEGOSourceFlowManager(authentik.core.sources.flow_manager.SourceFlowManager):
176class SPNEGOSourceFlowManager(SourceFlowManager):
177    """Flow manager for Kerberos SPNEGO sources"""
178
179    user_connection_type = UserKerberosSourceConnection
180    group_connection_type = GroupKerberosSourceConnection

Flow manager for Kerberos SPNEGO sources