authentik.sources.kerberos.tests.test_spnego

Kerberos Source SPNEGO tests

 1"""Kerberos Source SPNEGO tests"""
 2
 3from base64 import b64decode, b64encode
 4from pathlib import Path
 5from sys import platform
 6from unittest import skipUnless
 7
 8import gssapi
 9from django.urls import reverse
10
11from authentik.core.tests.utils import create_test_admin_user
12from authentik.sources.kerberos.models import KerberosSource
13from authentik.sources.kerberos.tests.utils import KerberosTestCase
14
15
16class TestSPNEGOSource(KerberosTestCase):
17    """Kerberos Source SPNEGO tests"""
18
19    def setUp(self):
20        self.source = KerberosSource.objects.create(
21            name="test",
22            slug="test",
23            spnego_keytab=b64encode(Path(self.realm.http_keytab).read_bytes()).decode(),
24        )
25        # Force store creation early
26        self.source.get_gssapi_store()
27
28    def test_api_read(self):
29        """Test reading a source"""
30        self.client.force_login(create_test_admin_user())
31        response = self.client.get(
32            reverse(
33                "authentik_api:kerberossource-detail",
34                kwargs={
35                    "slug": self.source.slug,
36                },
37            )
38        )
39        self.assertEqual(response.status_code, 200)
40
41    @skipUnless(platform.startswith("linux"), "Requires compatible GSSAPI implementation")
42    def test_source_login(self):
43        """test login view"""
44        response = self.client.get(
45            reverse(
46                "authentik_sources_kerberos:spnego-login",
47                kwargs={"source_slug": self.source.slug},
48            )
49        )
50        self.assertEqual(response.status_code, 302)
51
52        endpoint = response.headers["Location"]
53
54        response = self.client.get(endpoint)
55        self.assertEqual(response.status_code, 401)
56        self.assertEqual(response.headers["WWW-Authenticate"], "Negotiate")
57
58        server_name = gssapi.names.Name("HTTP/testserver@")
59        client_creds = gssapi.creds.Credentials(
60            usage="initiate", store={"ccache": self.realm.ccache}
61        )
62        client_ctx = gssapi.sec_contexts.SecurityContext(
63            name=server_name, usage="initiate", creds=client_creds
64        )
65
66        status = 401
67        server_token = None
68        while status == 401 and not client_ctx.complete:  # noqa: PLR2004
69            client_token = client_ctx.step(server_token)
70            if not client_token:
71                break
72            response = self.client.get(
73                endpoint,
74                headers={"Authorization": f"Negotiate {b64encode(client_token).decode('ascii')}"},
75            )
76            status = response.status_code
77            if status == 401:  # noqa: PLR2004
78                server_token = b64decode(response.headers["WWW-Authenticate"][9:].strip())
79
80        # 400 because no enroll flow
81        self.assertEqual(status, 400)
class TestSPNEGOSource(authentik.sources.kerberos.tests.utils.KerberosTestCase):
17class TestSPNEGOSource(KerberosTestCase):
18    """Kerberos Source SPNEGO tests"""
19
20    def setUp(self):
21        self.source = KerberosSource.objects.create(
22            name="test",
23            slug="test",
24            spnego_keytab=b64encode(Path(self.realm.http_keytab).read_bytes()).decode(),
25        )
26        # Force store creation early
27        self.source.get_gssapi_store()
28
29    def test_api_read(self):
30        """Test reading a source"""
31        self.client.force_login(create_test_admin_user())
32        response = self.client.get(
33            reverse(
34                "authentik_api:kerberossource-detail",
35                kwargs={
36                    "slug": self.source.slug,
37                },
38            )
39        )
40        self.assertEqual(response.status_code, 200)
41
42    @skipUnless(platform.startswith("linux"), "Requires compatible GSSAPI implementation")
43    def test_source_login(self):
44        """test login view"""
45        response = self.client.get(
46            reverse(
47                "authentik_sources_kerberos:spnego-login",
48                kwargs={"source_slug": self.source.slug},
49            )
50        )
51        self.assertEqual(response.status_code, 302)
52
53        endpoint = response.headers["Location"]
54
55        response = self.client.get(endpoint)
56        self.assertEqual(response.status_code, 401)
57        self.assertEqual(response.headers["WWW-Authenticate"], "Negotiate")
58
59        server_name = gssapi.names.Name("HTTP/testserver@")
60        client_creds = gssapi.creds.Credentials(
61            usage="initiate", store={"ccache": self.realm.ccache}
62        )
63        client_ctx = gssapi.sec_contexts.SecurityContext(
64            name=server_name, usage="initiate", creds=client_creds
65        )
66
67        status = 401
68        server_token = None
69        while status == 401 and not client_ctx.complete:  # noqa: PLR2004
70            client_token = client_ctx.step(server_token)
71            if not client_token:
72                break
73            response = self.client.get(
74                endpoint,
75                headers={"Authorization": f"Negotiate {b64encode(client_token).decode('ascii')}"},
76            )
77            status = response.status_code
78            if status == 401:  # noqa: PLR2004
79                server_token = b64decode(response.headers["WWW-Authenticate"][9:].strip())
80
81        # 400 because no enroll flow
82        self.assertEqual(status, 400)

Kerberos Source SPNEGO tests

def setUp(self):
20    def setUp(self):
21        self.source = KerberosSource.objects.create(
22            name="test",
23            slug="test",
24            spnego_keytab=b64encode(Path(self.realm.http_keytab).read_bytes()).decode(),
25        )
26        # Force store creation early
27        self.source.get_gssapi_store()

Hook method for setting up the test fixture before exercising it.

def test_api_read(self):
29    def test_api_read(self):
30        """Test reading a source"""
31        self.client.force_login(create_test_admin_user())
32        response = self.client.get(
33            reverse(
34                "authentik_api:kerberossource-detail",
35                kwargs={
36                    "slug": self.source.slug,
37                },
38            )
39        )
40        self.assertEqual(response.status_code, 200)

Test reading a source

@skipUnless(platform.startswith('linux'), 'Requires compatible GSSAPI implementation')
def test_source_login(self):
42    @skipUnless(platform.startswith("linux"), "Requires compatible GSSAPI implementation")
43    def test_source_login(self):
44        """test login view"""
45        response = self.client.get(
46            reverse(
47                "authentik_sources_kerberos:spnego-login",
48                kwargs={"source_slug": self.source.slug},
49            )
50        )
51        self.assertEqual(response.status_code, 302)
52
53        endpoint = response.headers["Location"]
54
55        response = self.client.get(endpoint)
56        self.assertEqual(response.status_code, 401)
57        self.assertEqual(response.headers["WWW-Authenticate"], "Negotiate")
58
59        server_name = gssapi.names.Name("HTTP/testserver@")
60        client_creds = gssapi.creds.Credentials(
61            usage="initiate", store={"ccache": self.realm.ccache}
62        )
63        client_ctx = gssapi.sec_contexts.SecurityContext(
64            name=server_name, usage="initiate", creds=client_creds
65        )
66
67        status = 401
68        server_token = None
69        while status == 401 and not client_ctx.complete:  # noqa: PLR2004
70            client_token = client_ctx.step(server_token)
71            if not client_token:
72                break
73            response = self.client.get(
74                endpoint,
75                headers={"Authorization": f"Negotiate {b64encode(client_token).decode('ascii')}"},
76            )
77            status = response.status_code
78            if status == 401:  # noqa: PLR2004
79                server_token = b64decode(response.headers["WWW-Authenticate"][9:].strip())
80
81        # 400 because no enroll flow
82        self.assertEqual(status, 400)

test login view