authentik.api.search.ql

DjangoQL search

 1"""DjangoQL search"""
 2
 3from django.db.models import QuerySet
 4from djangoql.ast import Name
 5from djangoql.exceptions import DjangoQLError
 6from djangoql.queryset import apply_search
 7from djangoql.schema import DjangoQLSchema
 8from rest_framework.filters import SearchFilter
 9from rest_framework.request import Request
10from structlog.stdlib import get_logger
11
12from authentik.api.search.fields import JSONSearchField
13
14LOGGER = get_logger()
15
16
17class BaseSchema(DjangoQLSchema):
18    """Base Schema which deals with JSON Fields"""
19
20    def resolve_name(self, name: Name):
21        model = self.model_label(self.current_model)
22        root_field = name.parts[0]
23        field = self.models[model].get(root_field)
24        # If the query goes into a JSON field, return the root
25        # field as the JSON field will do the rest
26        if isinstance(field, JSONSearchField):
27            # This is a workaround; build_filter will remove the right-most
28            # entry in the path as that is intended to be the same as the field
29            # however for JSON that is not the case
30            if name.parts[-1] != root_field:
31                name.parts.append(root_field)
32            return field
33        return super().resolve_name(name)
34
35
36class QLSearch(SearchFilter):
37    """rest_framework search filter which uses DjangoQL"""
38
39    def __init__(self):
40        super().__init__()
41        self._fallback = SearchFilter()
42
43    def get_search_terms(self, request: Request) -> str:
44        """Search terms are set by a ?search=... query parameter,
45        and may be comma and/or whitespace delimited."""
46        params = request.query_params.get("search", "")
47        params = params.replace("\x00", "")  # strip null characters
48        return params
49
50    def get_schema(self, request: Request, view) -> BaseSchema:
51        ql_fields = []
52        if hasattr(view, "get_ql_fields"):
53            ql_fields = view.get_ql_fields()
54
55        class InlineSchema(BaseSchema):
56            def get_fields(self, model):
57                return ql_fields or []
58
59        return InlineSchema
60
61    def filter_queryset(self, request: Request, queryset: QuerySet, view) -> QuerySet:
62        search_query = self.get_search_terms(request)
63        schema = self.get_schema(request, view)
64        if len(search_query) == 0:
65            return self._fallback.filter_queryset(request, queryset, view)
66        try:
67            return apply_search(queryset, search_query, schema=schema)
68        except DjangoQLError as exc:
69            LOGGER.debug("Failed to parse search expression", exc=exc)
70            return self._fallback.filter_queryset(request, queryset, view)
LOGGER = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
class BaseSchema(djangoql.schema.DjangoQLSchema):
18class BaseSchema(DjangoQLSchema):
19    """Base Schema which deals with JSON Fields"""
20
21    def resolve_name(self, name: Name):
22        model = self.model_label(self.current_model)
23        root_field = name.parts[0]
24        field = self.models[model].get(root_field)
25        # If the query goes into a JSON field, return the root
26        # field as the JSON field will do the rest
27        if isinstance(field, JSONSearchField):
28            # This is a workaround; build_filter will remove the right-most
29            # entry in the path as that is intended to be the same as the field
30            # however for JSON that is not the case
31            if name.parts[-1] != root_field:
32                name.parts.append(root_field)
33            return field
34        return super().resolve_name(name)

Base Schema which deals with JSON Fields

def resolve_name(self, name: djangoql.ast.Name):
21    def resolve_name(self, name: Name):
22        model = self.model_label(self.current_model)
23        root_field = name.parts[0]
24        field = self.models[model].get(root_field)
25        # If the query goes into a JSON field, return the root
26        # field as the JSON field will do the rest
27        if isinstance(field, JSONSearchField):
28            # This is a workaround; build_filter will remove the right-most
29            # entry in the path as that is intended to be the same as the field
30            # however for JSON that is not the case
31            if name.parts[-1] != root_field:
32                name.parts.append(root_field)
33            return field
34        return super().resolve_name(name)
class QLSearch(rest_framework.filters.SearchFilter):
37class QLSearch(SearchFilter):
38    """rest_framework search filter which uses DjangoQL"""
39
40    def __init__(self):
41        super().__init__()
42        self._fallback = SearchFilter()
43
44    def get_search_terms(self, request: Request) -> str:
45        """Search terms are set by a ?search=... query parameter,
46        and may be comma and/or whitespace delimited."""
47        params = request.query_params.get("search", "")
48        params = params.replace("\x00", "")  # strip null characters
49        return params
50
51    def get_schema(self, request: Request, view) -> BaseSchema:
52        ql_fields = []
53        if hasattr(view, "get_ql_fields"):
54            ql_fields = view.get_ql_fields()
55
56        class InlineSchema(BaseSchema):
57            def get_fields(self, model):
58                return ql_fields or []
59
60        return InlineSchema
61
62    def filter_queryset(self, request: Request, queryset: QuerySet, view) -> QuerySet:
63        search_query = self.get_search_terms(request)
64        schema = self.get_schema(request, view)
65        if len(search_query) == 0:
66            return self._fallback.filter_queryset(request, queryset, view)
67        try:
68            return apply_search(queryset, search_query, schema=schema)
69        except DjangoQLError as exc:
70            LOGGER.debug("Failed to parse search expression", exc=exc)
71            return self._fallback.filter_queryset(request, queryset, view)

rest_framework search filter which uses DjangoQL

def get_search_terms(self, request: rest_framework.request.Request) -> str:
44    def get_search_terms(self, request: Request) -> str:
45        """Search terms are set by a ?search=... query parameter,
46        and may be comma and/or whitespace delimited."""
47        params = request.query_params.get("search", "")
48        params = params.replace("\x00", "")  # strip null characters
49        return params

Search terms are set by a ?search=... query parameter, and may be comma and/or whitespace delimited.

def get_schema( self, request: rest_framework.request.Request, view) -> BaseSchema:
51    def get_schema(self, request: Request, view) -> BaseSchema:
52        ql_fields = []
53        if hasattr(view, "get_ql_fields"):
54            ql_fields = view.get_ql_fields()
55
56        class InlineSchema(BaseSchema):
57            def get_fields(self, model):
58                return ql_fields or []
59
60        return InlineSchema
def filter_queryset( self, request: rest_framework.request.Request, queryset: django.db.models.query.QuerySet, view) -> django.db.models.query.QuerySet:
62    def filter_queryset(self, request: Request, queryset: QuerySet, view) -> QuerySet:
63        search_query = self.get_search_terms(request)
64        schema = self.get_schema(request, view)
65        if len(search_query) == 0:
66            return self._fallback.filter_queryset(request, queryset, view)
67        try:
68            return apply_search(queryset, search_query, schema=schema)
69        except DjangoQLError as exc:
70            LOGGER.debug("Failed to parse search expression", exc=exc)
71            return self._fallback.filter_queryset(request, queryset, view)

Return a filtered queryset.