authentik.sources.kerberos.views
Kerberos source SPNEGO views
1"""Kerberos source SPNEGO views""" 2 3from base64 import b64decode, b64encode 4 5import gssapi 6from django.core.cache import cache 7from django.core.exceptions import SuspiciousOperation 8from django.http import HttpResponse 9from django.shortcuts import get_object_or_404, redirect, render, reverse 10from django.utils.crypto import get_random_string 11from django.utils.translation import gettext_lazy as _ 12from django.views import View 13from structlog.stdlib import get_logger 14 15from authentik.core.sources.flow_manager import SourceFlowManager 16from authentik.sources.kerberos.models import ( 17 GroupKerberosSourceConnection, 18 KerberosSource, 19 Krb5ConfContext, 20 UserKerberosSourceConnection, 21) 22 23LOGGER = get_logger() 24 25SPNEGO_REQUEST_STATUS = 401 26WWW_AUTHENTICATE = "WWW-Authenticate" 27HTTP_AUTHORIZATION = "Authorization" 28NEGOTIATE = "Negotiate" 29 30SPNEGO_STATE_CACHE_PREFIX = "goauthentik.io/sources/spnego" 31SPNEGO_STATE_CACHE_TIMEOUT = 60 * 5 # 5 minutes 32 33 34def add_negotiate_to_response( 35 response: HttpResponse, token: str | bytes | None = None 36) -> HttpResponse: 37 if isinstance(token, str): 38 token = token.encode() 39 response[WWW_AUTHENTICATE] = ( 40 NEGOTIATE if token is None else f"{NEGOTIATE} {b64encode(token).decode('ascii')}" 41 ) 42 return response 43 44 45class SPNEGOView(View): 46 """SPNEGO login""" 47 48 source: KerberosSource 49 50 def challenge(self, request, token: str | bytes | None = None) -> HttpResponse: 51 """Get SNPEGO challenge response""" 52 response = render( 53 request, 54 "if/error.html", 55 context={ 56 "title": _("SPNEGO authentication required"), 57 "message": _(""" 58 Make sure you have valid tickets (obtainable via kinit) 59 and configured the browser correctly. 60 Please contact your administrator. 61 """), 62 }, 63 status=401, 64 ) 65 return add_negotiate_to_response(response, token) 66 67 def get_authstr(self, request) -> str | None: 68 """Get SPNEGO authentication string from headers""" 69 authorization_header = request.headers.get(HTTP_AUTHORIZATION, "") 70 if NEGOTIATE.lower() not in authorization_header.lower(): 71 return None 72 73 auth_tuple = authorization_header.split(" ", 1) 74 if not auth_tuple or auth_tuple[0].lower() != NEGOTIATE.lower(): 75 return None 76 if len(auth_tuple) != 2: # noqa: PLR2004 77 raise SuspiciousOperation("Malformed authorization header") 78 return auth_tuple[1] 79 80 def new_state(self) -> str: 81 """Generate request state""" 82 return get_random_string(32) 83 84 def get_server_ctx(self, key: str) -> gssapi.sec_contexts.SecurityContext | None: 85 """Get GSSAPI server context from cache or create it""" 86 server_creds = self.source.get_gssapi_creds() 87 if server_creds is None: 88 return None 89 90 state = cache.get(f"{SPNEGO_STATE_CACHE_PREFIX}/{key}", None) 91 92 if state: 93 # pylint: disable=c-extension-no-member 94 return gssapi.sec_contexts.SecurityContext( 95 base=gssapi.raw.sec_contexts.import_sec_context(state), 96 ) 97 98 return gssapi.sec_contexts.SecurityContext(creds=server_creds, usage="accept") 99 100 def set_server_ctx(self, key: str, ctx: gssapi.sec_contexts.SecurityContext): 101 """Store the GSSAPI server context in cache""" 102 cache.set(f"{SPNEGO_STATE_CACHE_PREFIX}/{key}", ctx.export(), SPNEGO_STATE_CACHE_TIMEOUT) 103 104 # pylint: disable=too-many-return-statements 105 def dispatch(self, request, *args, **kwargs) -> HttpResponse: 106 """Process SPNEGO request""" 107 self.source: KerberosSource = get_object_or_404( 108 KerberosSource, 109 slug=kwargs.get("source_slug", ""), 110 enabled=True, 111 ) 112 113 qstring = request.GET if request.method == "GET" else request.POST 114 state = qstring.get("state", None) 115 if not state: 116 return redirect( 117 reverse( 118 "authentik_sources_kerberos:spnego-login", 119 kwargs={"source_slug": self.source.slug}, 120 ) 121 + f"?state={self.new_state()}" 122 ) 123 124 authstr = self.get_authstr(request) 125 if not authstr: 126 LOGGER.debug("authstr not present, sending challenge") 127 return self.challenge(request) 128 129 try: 130 in_token = b64decode(authstr) 131 except TypeError, ValueError: 132 return self.challenge(request) 133 134 with Krb5ConfContext(self.source): 135 server_ctx = self.get_server_ctx(state) 136 if not server_ctx: 137 return self.challenge(request) 138 139 try: 140 out_token = server_ctx.step(in_token) 141 except gssapi.exceptions.GSSError as exc: 142 LOGGER.debug("GSSAPI security context failure", exc=exc) 143 return self.challenge(request) 144 145 if not server_ctx.complete or server_ctx.initiator_name is None: 146 self.set_server_ctx(state, server_ctx) 147 return self.challenge(request, out_token) 148 149 def name_to_str(n: gssapi.names.Name) -> str: 150 return n.display_as(n.name_type) 151 152 identifier = name_to_str(server_ctx.initiator_name) 153 context = { 154 "spnego_info": { 155 "initiator_name": name_to_str(server_ctx.initiator_name), 156 "target_name": name_to_str(server_ctx.target_name), 157 "mech": str(server_ctx.mech), 158 "actual_flags": server_ctx.actual_flags, 159 }, 160 } 161 162 response = SPNEGOSourceFlowManager( 163 source=self.source, 164 request=request, 165 identifier=identifier, 166 user_info={ 167 "principal": identifier, 168 **context, 169 }, 170 policy_context=context, 171 ).get_flow() 172 return add_negotiate_to_response(response, out_token) 173 174 175class SPNEGOSourceFlowManager(SourceFlowManager): 176 """Flow manager for Kerberos SPNEGO sources""" 177 178 user_connection_type = UserKerberosSourceConnection 179 group_connection_type = GroupKerberosSourceConnection
LOGGER =
<BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
SPNEGO_REQUEST_STATUS =
401
WWW_AUTHENTICATE =
'WWW-Authenticate'
HTTP_AUTHORIZATION =
'Authorization'
NEGOTIATE =
'Negotiate'
SPNEGO_STATE_CACHE_PREFIX =
'goauthentik.io/sources/spnego'
SPNEGO_STATE_CACHE_TIMEOUT =
300
def
add_negotiate_to_response( response: django.http.response.HttpResponse, token: str | bytes | None = None) -> django.http.response.HttpResponse:
35def add_negotiate_to_response( 36 response: HttpResponse, token: str | bytes | None = None 37) -> HttpResponse: 38 if isinstance(token, str): 39 token = token.encode() 40 response[WWW_AUTHENTICATE] = ( 41 NEGOTIATE if token is None else f"{NEGOTIATE} {b64encode(token).decode('ascii')}" 42 ) 43 return response
class
SPNEGOView(django.views.generic.base.View):
46class SPNEGOView(View): 47 """SPNEGO login""" 48 49 source: KerberosSource 50 51 def challenge(self, request, token: str | bytes | None = None) -> HttpResponse: 52 """Get SNPEGO challenge response""" 53 response = render( 54 request, 55 "if/error.html", 56 context={ 57 "title": _("SPNEGO authentication required"), 58 "message": _(""" 59 Make sure you have valid tickets (obtainable via kinit) 60 and configured the browser correctly. 61 Please contact your administrator. 62 """), 63 }, 64 status=401, 65 ) 66 return add_negotiate_to_response(response, token) 67 68 def get_authstr(self, request) -> str | None: 69 """Get SPNEGO authentication string from headers""" 70 authorization_header = request.headers.get(HTTP_AUTHORIZATION, "") 71 if NEGOTIATE.lower() not in authorization_header.lower(): 72 return None 73 74 auth_tuple = authorization_header.split(" ", 1) 75 if not auth_tuple or auth_tuple[0].lower() != NEGOTIATE.lower(): 76 return None 77 if len(auth_tuple) != 2: # noqa: PLR2004 78 raise SuspiciousOperation("Malformed authorization header") 79 return auth_tuple[1] 80 81 def new_state(self) -> str: 82 """Generate request state""" 83 return get_random_string(32) 84 85 def get_server_ctx(self, key: str) -> gssapi.sec_contexts.SecurityContext | None: 86 """Get GSSAPI server context from cache or create it""" 87 server_creds = self.source.get_gssapi_creds() 88 if server_creds is None: 89 return None 90 91 state = cache.get(f"{SPNEGO_STATE_CACHE_PREFIX}/{key}", None) 92 93 if state: 94 # pylint: disable=c-extension-no-member 95 return gssapi.sec_contexts.SecurityContext( 96 base=gssapi.raw.sec_contexts.import_sec_context(state), 97 ) 98 99 return gssapi.sec_contexts.SecurityContext(creds=server_creds, usage="accept") 100 101 def set_server_ctx(self, key: str, ctx: gssapi.sec_contexts.SecurityContext): 102 """Store the GSSAPI server context in cache""" 103 cache.set(f"{SPNEGO_STATE_CACHE_PREFIX}/{key}", ctx.export(), SPNEGO_STATE_CACHE_TIMEOUT) 104 105 # pylint: disable=too-many-return-statements 106 def dispatch(self, request, *args, **kwargs) -> HttpResponse: 107 """Process SPNEGO request""" 108 self.source: KerberosSource = get_object_or_404( 109 KerberosSource, 110 slug=kwargs.get("source_slug", ""), 111 enabled=True, 112 ) 113 114 qstring = request.GET if request.method == "GET" else request.POST 115 state = qstring.get("state", None) 116 if not state: 117 return redirect( 118 reverse( 119 "authentik_sources_kerberos:spnego-login", 120 kwargs={"source_slug": self.source.slug}, 121 ) 122 + f"?state={self.new_state()}" 123 ) 124 125 authstr = self.get_authstr(request) 126 if not authstr: 127 LOGGER.debug("authstr not present, sending challenge") 128 return self.challenge(request) 129 130 try: 131 in_token = b64decode(authstr) 132 except TypeError, ValueError: 133 return self.challenge(request) 134 135 with Krb5ConfContext(self.source): 136 server_ctx = self.get_server_ctx(state) 137 if not server_ctx: 138 return self.challenge(request) 139 140 try: 141 out_token = server_ctx.step(in_token) 142 except gssapi.exceptions.GSSError as exc: 143 LOGGER.debug("GSSAPI security context failure", exc=exc) 144 return self.challenge(request) 145 146 if not server_ctx.complete or server_ctx.initiator_name is None: 147 self.set_server_ctx(state, server_ctx) 148 return self.challenge(request, out_token) 149 150 def name_to_str(n: gssapi.names.Name) -> str: 151 return n.display_as(n.name_type) 152 153 identifier = name_to_str(server_ctx.initiator_name) 154 context = { 155 "spnego_info": { 156 "initiator_name": name_to_str(server_ctx.initiator_name), 157 "target_name": name_to_str(server_ctx.target_name), 158 "mech": str(server_ctx.mech), 159 "actual_flags": server_ctx.actual_flags, 160 }, 161 } 162 163 response = SPNEGOSourceFlowManager( 164 source=self.source, 165 request=request, 166 identifier=identifier, 167 user_info={ 168 "principal": identifier, 169 **context, 170 }, 171 policy_context=context, 172 ).get_flow() 173 return add_negotiate_to_response(response, out_token)
SPNEGO login
def
challenge( self, request, token: str | bytes | None = None) -> django.http.response.HttpResponse:
51 def challenge(self, request, token: str | bytes | None = None) -> HttpResponse: 52 """Get SNPEGO challenge response""" 53 response = render( 54 request, 55 "if/error.html", 56 context={ 57 "title": _("SPNEGO authentication required"), 58 "message": _(""" 59 Make sure you have valid tickets (obtainable via kinit) 60 and configured the browser correctly. 61 Please contact your administrator. 62 """), 63 }, 64 status=401, 65 ) 66 return add_negotiate_to_response(response, token)
Get SNPEGO challenge response
def
get_authstr(self, request) -> str | None:
68 def get_authstr(self, request) -> str | None: 69 """Get SPNEGO authentication string from headers""" 70 authorization_header = request.headers.get(HTTP_AUTHORIZATION, "") 71 if NEGOTIATE.lower() not in authorization_header.lower(): 72 return None 73 74 auth_tuple = authorization_header.split(" ", 1) 75 if not auth_tuple or auth_tuple[0].lower() != NEGOTIATE.lower(): 76 return None 77 if len(auth_tuple) != 2: # noqa: PLR2004 78 raise SuspiciousOperation("Malformed authorization header") 79 return auth_tuple[1]
Get SPNEGO authentication string from headers
def
get_server_ctx(self, key: str) -> gssapi.sec_contexts.SecurityContext | None:
85 def get_server_ctx(self, key: str) -> gssapi.sec_contexts.SecurityContext | None: 86 """Get GSSAPI server context from cache or create it""" 87 server_creds = self.source.get_gssapi_creds() 88 if server_creds is None: 89 return None 90 91 state = cache.get(f"{SPNEGO_STATE_CACHE_PREFIX}/{key}", None) 92 93 if state: 94 # pylint: disable=c-extension-no-member 95 return gssapi.sec_contexts.SecurityContext( 96 base=gssapi.raw.sec_contexts.import_sec_context(state), 97 ) 98 99 return gssapi.sec_contexts.SecurityContext(creds=server_creds, usage="accept")
Get GSSAPI server context from cache or create it
def
set_server_ctx(self, key: str, ctx: gssapi.sec_contexts.SecurityContext):
101 def set_server_ctx(self, key: str, ctx: gssapi.sec_contexts.SecurityContext): 102 """Store the GSSAPI server context in cache""" 103 cache.set(f"{SPNEGO_STATE_CACHE_PREFIX}/{key}", ctx.export(), SPNEGO_STATE_CACHE_TIMEOUT)
Store the GSSAPI server context in cache
def
dispatch(self, request, *args, **kwargs) -> django.http.response.HttpResponse:
106 def dispatch(self, request, *args, **kwargs) -> HttpResponse: 107 """Process SPNEGO request""" 108 self.source: KerberosSource = get_object_or_404( 109 KerberosSource, 110 slug=kwargs.get("source_slug", ""), 111 enabled=True, 112 ) 113 114 qstring = request.GET if request.method == "GET" else request.POST 115 state = qstring.get("state", None) 116 if not state: 117 return redirect( 118 reverse( 119 "authentik_sources_kerberos:spnego-login", 120 kwargs={"source_slug": self.source.slug}, 121 ) 122 + f"?state={self.new_state()}" 123 ) 124 125 authstr = self.get_authstr(request) 126 if not authstr: 127 LOGGER.debug("authstr not present, sending challenge") 128 return self.challenge(request) 129 130 try: 131 in_token = b64decode(authstr) 132 except TypeError, ValueError: 133 return self.challenge(request) 134 135 with Krb5ConfContext(self.source): 136 server_ctx = self.get_server_ctx(state) 137 if not server_ctx: 138 return self.challenge(request) 139 140 try: 141 out_token = server_ctx.step(in_token) 142 except gssapi.exceptions.GSSError as exc: 143 LOGGER.debug("GSSAPI security context failure", exc=exc) 144 return self.challenge(request) 145 146 if not server_ctx.complete or server_ctx.initiator_name is None: 147 self.set_server_ctx(state, server_ctx) 148 return self.challenge(request, out_token) 149 150 def name_to_str(n: gssapi.names.Name) -> str: 151 return n.display_as(n.name_type) 152 153 identifier = name_to_str(server_ctx.initiator_name) 154 context = { 155 "spnego_info": { 156 "initiator_name": name_to_str(server_ctx.initiator_name), 157 "target_name": name_to_str(server_ctx.target_name), 158 "mech": str(server_ctx.mech), 159 "actual_flags": server_ctx.actual_flags, 160 }, 161 } 162 163 response = SPNEGOSourceFlowManager( 164 source=self.source, 165 request=request, 166 identifier=identifier, 167 user_info={ 168 "principal": identifier, 169 **context, 170 }, 171 policy_context=context, 172 ).get_flow() 173 return add_negotiate_to_response(response, out_token)
Process SPNEGO request
176class SPNEGOSourceFlowManager(SourceFlowManager): 177 """Flow manager for Kerberos SPNEGO sources""" 178 179 user_connection_type = UserKerberosSourceConnection 180 group_connection_type = GroupKerberosSourceConnection
Flow manager for Kerberos SPNEGO sources
user_connection_type =
<class 'authentik.sources.kerberos.models.UserKerberosSourceConnection'>
group_connection_type =
<class 'authentik.sources.kerberos.models.GroupKerberosSourceConnection'>