authentik.tenants.channels

 1from channels.db import database_sync_to_async
 2from django.db import close_old_connections, connection
 3from django.http.request import split_domain_port
 4from django_tenants.utils import (
 5    get_public_schema_name,
 6    remove_www,
 7)
 8
 9from authentik.tenants.models import Domain, Tenant
10
11
12class TenantsAwareMiddleware:
13    """Set the database schema for use with django-tenants"""
14
15    def __init__(self, inner):
16        self.inner = inner
17
18    def get_hostname_from_scope(self, scope: list[tuple[bytes, bytes]]) -> str | None:
19        headers = {k.replace(b"-", b"_").upper(): v for k, v in scope.get("headers", [])}
20        hostname, _ = split_domain_port(headers.get(b"HOST", b"").decode("utf-8"))
21        if not hostname:
22            return None
23        return remove_www(hostname)
24
25    async def get_default_tenant(self) -> Tenant:
26        return await database_sync_to_async(Tenant.objects.get)(
27            schema_name=get_public_schema_name()
28        )
29
30    async def get_tenant(self, hostname: str | None) -> Tenant:
31        if not hostname:
32            return await self.get_default_tenant()
33
34        try:
35            domain = await database_sync_to_async(Domain.objects.select_related("tenant").get)(
36                domain=hostname
37            )
38        except Domain.DoesNotExist:
39            return await self.get_default_tenant()
40        return domain.tenant
41
42    async def __call__(self, scope, receive, send):
43        close_old_connections()
44        hostname = self.get_hostname_from_scope(scope)
45        tenant = await self.get_tenant(hostname)
46        scope["tenant"] = tenant
47        connection.set_tenant(tenant)
48        return await self.inner(scope, receive, send)
class TenantsAwareMiddleware:
13class TenantsAwareMiddleware:
14    """Set the database schema for use with django-tenants"""
15
16    def __init__(self, inner):
17        self.inner = inner
18
19    def get_hostname_from_scope(self, scope: list[tuple[bytes, bytes]]) -> str | None:
20        headers = {k.replace(b"-", b"_").upper(): v for k, v in scope.get("headers", [])}
21        hostname, _ = split_domain_port(headers.get(b"HOST", b"").decode("utf-8"))
22        if not hostname:
23            return None
24        return remove_www(hostname)
25
26    async def get_default_tenant(self) -> Tenant:
27        return await database_sync_to_async(Tenant.objects.get)(
28            schema_name=get_public_schema_name()
29        )
30
31    async def get_tenant(self, hostname: str | None) -> Tenant:
32        if not hostname:
33            return await self.get_default_tenant()
34
35        try:
36            domain = await database_sync_to_async(Domain.objects.select_related("tenant").get)(
37                domain=hostname
38            )
39        except Domain.DoesNotExist:
40            return await self.get_default_tenant()
41        return domain.tenant
42
43    async def __call__(self, scope, receive, send):
44        close_old_connections()
45        hostname = self.get_hostname_from_scope(scope)
46        tenant = await self.get_tenant(hostname)
47        scope["tenant"] = tenant
48        connection.set_tenant(tenant)
49        return await self.inner(scope, receive, send)

Set the database schema for use with django-tenants

TenantsAwareMiddleware(inner)
16    def __init__(self, inner):
17        self.inner = inner
inner
def get_hostname_from_scope(self, scope: list[tuple[bytes, bytes]]) -> str | None:
19    def get_hostname_from_scope(self, scope: list[tuple[bytes, bytes]]) -> str | None:
20        headers = {k.replace(b"-", b"_").upper(): v for k, v in scope.get("headers", [])}
21        hostname, _ = split_domain_port(headers.get(b"HOST", b"").decode("utf-8"))
22        if not hostname:
23            return None
24        return remove_www(hostname)
async def get_default_tenant(self) -> authentik.tenants.models.Tenant:
26    async def get_default_tenant(self) -> Tenant:
27        return await database_sync_to_async(Tenant.objects.get)(
28            schema_name=get_public_schema_name()
29        )
async def get_tenant(self, hostname: str | None) -> authentik.tenants.models.Tenant:
31    async def get_tenant(self, hostname: str | None) -> Tenant:
32        if not hostname:
33            return await self.get_default_tenant()
34
35        try:
36            domain = await database_sync_to_async(Domain.objects.select_related("tenant").get)(
37                domain=hostname
38            )
39        except Domain.DoesNotExist:
40            return await self.get_default_tenant()
41        return domain.tenant