authentik.enterprise.search.ql

DjangoQL search

 1"""DjangoQL search"""
 2
 3from django.apps import apps
 4from django.db.models import QuerySet
 5from djangoql.ast import Name
 6from djangoql.exceptions import DjangoQLError
 7from djangoql.queryset import apply_search
 8from djangoql.schema import DjangoQLSchema
 9from drf_spectacular.plumbing import ResolvedComponent, build_object_type
10from rest_framework.filters import SearchFilter
11from rest_framework.request import Request
12from structlog.stdlib import get_logger
13
14from authentik.enterprise.search.fields import JSONSearchField
15
16LOGGER = get_logger()
17AUTOCOMPLETE_SCHEMA = ResolvedComponent(
18    name="Autocomplete",
19    object="Autocomplete",
20    type=ResolvedComponent.SCHEMA,
21    schema=build_object_type(additionalProperties={}),
22)
23
24
25class BaseSchema(DjangoQLSchema):
26    """Base Schema which deals with JSON Fields"""
27
28    def resolve_name(self, name: Name):
29        model = self.model_label(self.current_model)
30        root_field = name.parts[0]
31        field = self.models[model].get(root_field)
32        # If the query goes into a JSON field, return the root
33        # field as the JSON field will do the rest
34        if isinstance(field, JSONSearchField):
35            # This is a workaround; build_filter will remove the right-most
36            # entry in the path as that is intended to be the same as the field
37            # however for JSON that is not the case
38            if name.parts[-1] != root_field:
39                name.parts.append(root_field)
40            return field
41        return super().resolve_name(name)
42
43
44class QLSearch(SearchFilter):
45    """rest_framework search filter which uses DjangoQL"""
46
47    def __init__(self):
48        super().__init__()
49        self._fallback = SearchFilter()
50
51    @property
52    def enabled(self):
53        return apps.get_app_config("authentik_enterprise").enabled()
54
55    def get_search_terms(self, request: Request) -> str:
56        """Search terms are set by a ?search=... query parameter,
57        and may be comma and/or whitespace delimited."""
58        params = request.query_params.get("search", "")
59        params = params.replace("\x00", "")  # strip null characters
60        return params
61
62    def get_schema(self, request: Request, view) -> BaseSchema:
63        ql_fields = []
64        if hasattr(view, "get_ql_fields"):
65            ql_fields = view.get_ql_fields()
66
67        class InlineSchema(BaseSchema):
68            def get_fields(self, model):
69                return ql_fields or []
70
71        return InlineSchema
72
73    def filter_queryset(self, request: Request, queryset: QuerySet, view) -> QuerySet:
74        search_query = self.get_search_terms(request)
75        schema = self.get_schema(request, view)
76        if len(search_query) == 0 or not self.enabled:
77            return self._fallback.filter_queryset(request, queryset, view)
78        try:
79            return apply_search(queryset, search_query, schema=schema)
80        except DjangoQLError as exc:
81            LOGGER.debug("Failed to parse search expression", exc=exc)
82            return self._fallback.filter_queryset(request, queryset, view)
LOGGER = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
AUTOCOMPLETE_SCHEMA = <drf_spectacular.plumbing.ResolvedComponent object>
class BaseSchema(djangoql.schema.DjangoQLSchema):
26class BaseSchema(DjangoQLSchema):
27    """Base Schema which deals with JSON Fields"""
28
29    def resolve_name(self, name: Name):
30        model = self.model_label(self.current_model)
31        root_field = name.parts[0]
32        field = self.models[model].get(root_field)
33        # If the query goes into a JSON field, return the root
34        # field as the JSON field will do the rest
35        if isinstance(field, JSONSearchField):
36            # This is a workaround; build_filter will remove the right-most
37            # entry in the path as that is intended to be the same as the field
38            # however for JSON that is not the case
39            if name.parts[-1] != root_field:
40                name.parts.append(root_field)
41            return field
42        return super().resolve_name(name)

Base Schema which deals with JSON Fields

def resolve_name(self, name: djangoql.ast.Name):
29    def resolve_name(self, name: Name):
30        model = self.model_label(self.current_model)
31        root_field = name.parts[0]
32        field = self.models[model].get(root_field)
33        # If the query goes into a JSON field, return the root
34        # field as the JSON field will do the rest
35        if isinstance(field, JSONSearchField):
36            # This is a workaround; build_filter will remove the right-most
37            # entry in the path as that is intended to be the same as the field
38            # however for JSON that is not the case
39            if name.parts[-1] != root_field:
40                name.parts.append(root_field)
41            return field
42        return super().resolve_name(name)
class QLSearch(rest_framework.filters.SearchFilter):
45class QLSearch(SearchFilter):
46    """rest_framework search filter which uses DjangoQL"""
47
48    def __init__(self):
49        super().__init__()
50        self._fallback = SearchFilter()
51
52    @property
53    def enabled(self):
54        return apps.get_app_config("authentik_enterprise").enabled()
55
56    def get_search_terms(self, request: Request) -> str:
57        """Search terms are set by a ?search=... query parameter,
58        and may be comma and/or whitespace delimited."""
59        params = request.query_params.get("search", "")
60        params = params.replace("\x00", "")  # strip null characters
61        return params
62
63    def get_schema(self, request: Request, view) -> BaseSchema:
64        ql_fields = []
65        if hasattr(view, "get_ql_fields"):
66            ql_fields = view.get_ql_fields()
67
68        class InlineSchema(BaseSchema):
69            def get_fields(self, model):
70                return ql_fields or []
71
72        return InlineSchema
73
74    def filter_queryset(self, request: Request, queryset: QuerySet, view) -> QuerySet:
75        search_query = self.get_search_terms(request)
76        schema = self.get_schema(request, view)
77        if len(search_query) == 0 or not self.enabled:
78            return self._fallback.filter_queryset(request, queryset, view)
79        try:
80            return apply_search(queryset, search_query, schema=schema)
81        except DjangoQLError as exc:
82            LOGGER.debug("Failed to parse search expression", exc=exc)
83            return self._fallback.filter_queryset(request, queryset, view)

rest_framework search filter which uses DjangoQL

enabled
52    @property
53    def enabled(self):
54        return apps.get_app_config("authentik_enterprise").enabled()
def get_search_terms(self, request: rest_framework.request.Request) -> str:
56    def get_search_terms(self, request: Request) -> str:
57        """Search terms are set by a ?search=... query parameter,
58        and may be comma and/or whitespace delimited."""
59        params = request.query_params.get("search", "")
60        params = params.replace("\x00", "")  # strip null characters
61        return params

Search terms are set by a ?search=... query parameter, and may be comma and/or whitespace delimited.

def get_schema( self, request: rest_framework.request.Request, view) -> BaseSchema:
63    def get_schema(self, request: Request, view) -> BaseSchema:
64        ql_fields = []
65        if hasattr(view, "get_ql_fields"):
66            ql_fields = view.get_ql_fields()
67
68        class InlineSchema(BaseSchema):
69            def get_fields(self, model):
70                return ql_fields or []
71
72        return InlineSchema
def filter_queryset( self, request: rest_framework.request.Request, queryset: django.db.models.query.QuerySet, view) -> django.db.models.query.QuerySet:
74    def filter_queryset(self, request: Request, queryset: QuerySet, view) -> QuerySet:
75        search_query = self.get_search_terms(request)
76        schema = self.get_schema(request, view)
77        if len(search_query) == 0 or not self.enabled:
78            return self._fallback.filter_queryset(request, queryset, view)
79        try:
80            return apply_search(queryset, search_query, schema=schema)
81        except DjangoQLError as exc:
82            LOGGER.debug("Failed to parse search expression", exc=exc)
83            return self._fallback.filter_queryset(request, queryset, view)

Return a filtered queryset.