authentik.sources.kerberos.tests.test_spnego
Kerberos Source SPNEGO tests
1"""Kerberos Source SPNEGO tests""" 2 3from base64 import b64decode, b64encode 4from pathlib import Path 5from sys import platform 6from unittest import skipUnless 7 8import gssapi 9from django.urls import reverse 10 11from authentik.core.tests.utils import create_test_admin_user 12from authentik.sources.kerberos.models import KerberosSource 13from authentik.sources.kerberos.tests.utils import KerberosTestCase 14 15 16class TestSPNEGOSource(KerberosTestCase): 17 """Kerberos Source SPNEGO tests""" 18 19 def setUp(self): 20 self.source = KerberosSource.objects.create( 21 name="test", 22 slug="test", 23 spnego_keytab=b64encode(Path(self.realm.http_keytab).read_bytes()).decode(), 24 ) 25 # Force store creation early 26 self.source.get_gssapi_store() 27 28 def test_api_read(self): 29 """Test reading a source""" 30 self.client.force_login(create_test_admin_user()) 31 response = self.client.get( 32 reverse( 33 "authentik_api:kerberossource-detail", 34 kwargs={ 35 "slug": self.source.slug, 36 }, 37 ) 38 ) 39 self.assertEqual(response.status_code, 200) 40 41 @skipUnless(platform.startswith("linux"), "Requires compatible GSSAPI implementation") 42 def test_source_login(self): 43 """test login view""" 44 response = self.client.get( 45 reverse( 46 "authentik_sources_kerberos:spnego-login", 47 kwargs={"source_slug": self.source.slug}, 48 ) 49 ) 50 self.assertEqual(response.status_code, 302) 51 52 endpoint = response.headers["Location"] 53 54 response = self.client.get(endpoint) 55 self.assertEqual(response.status_code, 401) 56 self.assertEqual(response.headers["WWW-Authenticate"], "Negotiate") 57 58 server_name = gssapi.names.Name("HTTP/testserver@") 59 client_creds = gssapi.creds.Credentials( 60 usage="initiate", store={"ccache": self.realm.ccache} 61 ) 62 client_ctx = gssapi.sec_contexts.SecurityContext( 63 name=server_name, usage="initiate", creds=client_creds 64 ) 65 66 status = 401 67 server_token = None 68 while status == 401 and not client_ctx.complete: # noqa: PLR2004 69 client_token = client_ctx.step(server_token) 70 if not client_token: 71 break 72 response = self.client.get( 73 endpoint, 74 headers={"Authorization": f"Negotiate {b64encode(client_token).decode('ascii')}"}, 75 ) 76 status = response.status_code 77 if status == 401: # noqa: PLR2004 78 server_token = b64decode(response.headers["WWW-Authenticate"][9:].strip()) 79 80 # 400 because no enroll flow 81 self.assertEqual(status, 400)
17class TestSPNEGOSource(KerberosTestCase): 18 """Kerberos Source SPNEGO tests""" 19 20 def setUp(self): 21 self.source = KerberosSource.objects.create( 22 name="test", 23 slug="test", 24 spnego_keytab=b64encode(Path(self.realm.http_keytab).read_bytes()).decode(), 25 ) 26 # Force store creation early 27 self.source.get_gssapi_store() 28 29 def test_api_read(self): 30 """Test reading a source""" 31 self.client.force_login(create_test_admin_user()) 32 response = self.client.get( 33 reverse( 34 "authentik_api:kerberossource-detail", 35 kwargs={ 36 "slug": self.source.slug, 37 }, 38 ) 39 ) 40 self.assertEqual(response.status_code, 200) 41 42 @skipUnless(platform.startswith("linux"), "Requires compatible GSSAPI implementation") 43 def test_source_login(self): 44 """test login view""" 45 response = self.client.get( 46 reverse( 47 "authentik_sources_kerberos:spnego-login", 48 kwargs={"source_slug": self.source.slug}, 49 ) 50 ) 51 self.assertEqual(response.status_code, 302) 52 53 endpoint = response.headers["Location"] 54 55 response = self.client.get(endpoint) 56 self.assertEqual(response.status_code, 401) 57 self.assertEqual(response.headers["WWW-Authenticate"], "Negotiate") 58 59 server_name = gssapi.names.Name("HTTP/testserver@") 60 client_creds = gssapi.creds.Credentials( 61 usage="initiate", store={"ccache": self.realm.ccache} 62 ) 63 client_ctx = gssapi.sec_contexts.SecurityContext( 64 name=server_name, usage="initiate", creds=client_creds 65 ) 66 67 status = 401 68 server_token = None 69 while status == 401 and not client_ctx.complete: # noqa: PLR2004 70 client_token = client_ctx.step(server_token) 71 if not client_token: 72 break 73 response = self.client.get( 74 endpoint, 75 headers={"Authorization": f"Negotiate {b64encode(client_token).decode('ascii')}"}, 76 ) 77 status = response.status_code 78 if status == 401: # noqa: PLR2004 79 server_token = b64decode(response.headers["WWW-Authenticate"][9:].strip()) 80 81 # 400 because no enroll flow 82 self.assertEqual(status, 400)
Kerberos Source SPNEGO tests
def
setUp(self):
20 def setUp(self): 21 self.source = KerberosSource.objects.create( 22 name="test", 23 slug="test", 24 spnego_keytab=b64encode(Path(self.realm.http_keytab).read_bytes()).decode(), 25 ) 26 # Force store creation early 27 self.source.get_gssapi_store()
Hook method for setting up the test fixture before exercising it.
def
test_api_read(self):
29 def test_api_read(self): 30 """Test reading a source""" 31 self.client.force_login(create_test_admin_user()) 32 response = self.client.get( 33 reverse( 34 "authentik_api:kerberossource-detail", 35 kwargs={ 36 "slug": self.source.slug, 37 }, 38 ) 39 ) 40 self.assertEqual(response.status_code, 200)
Test reading a source
@skipUnless(platform.startswith('linux'), 'Requires compatible GSSAPI implementation')
def
test_source_login(self):
42 @skipUnless(platform.startswith("linux"), "Requires compatible GSSAPI implementation") 43 def test_source_login(self): 44 """test login view""" 45 response = self.client.get( 46 reverse( 47 "authentik_sources_kerberos:spnego-login", 48 kwargs={"source_slug": self.source.slug}, 49 ) 50 ) 51 self.assertEqual(response.status_code, 302) 52 53 endpoint = response.headers["Location"] 54 55 response = self.client.get(endpoint) 56 self.assertEqual(response.status_code, 401) 57 self.assertEqual(response.headers["WWW-Authenticate"], "Negotiate") 58 59 server_name = gssapi.names.Name("HTTP/testserver@") 60 client_creds = gssapi.creds.Credentials( 61 usage="initiate", store={"ccache": self.realm.ccache} 62 ) 63 client_ctx = gssapi.sec_contexts.SecurityContext( 64 name=server_name, usage="initiate", creds=client_creds 65 ) 66 67 status = 401 68 server_token = None 69 while status == 401 and not client_ctx.complete: # noqa: PLR2004 70 client_token = client_ctx.step(server_token) 71 if not client_token: 72 break 73 response = self.client.get( 74 endpoint, 75 headers={"Authorization": f"Negotiate {b64encode(client_token).decode('ascii')}"}, 76 ) 77 status = response.status_code 78 if status == 401: # noqa: PLR2004 79 server_token = b64decode(response.headers["WWW-Authenticate"][9:].strip()) 80 81 # 400 because no enroll flow 82 self.assertEqual(status, 400)
test login view