authentik.api.search.ql
DjangoQL search
1"""DjangoQL search""" 2 3from django.db.models import QuerySet 4from djangoql.ast import Name 5from djangoql.exceptions import DjangoQLError 6from djangoql.queryset import apply_search 7from djangoql.schema import DjangoQLSchema 8from rest_framework.filters import SearchFilter 9from rest_framework.request import Request 10from structlog.stdlib import get_logger 11 12from authentik.api.search.fields import JSONSearchField 13 14LOGGER = get_logger() 15 16 17class BaseSchema(DjangoQLSchema): 18 """Base Schema which deals with JSON Fields""" 19 20 def resolve_name(self, name: Name): 21 model = self.model_label(self.current_model) 22 root_field = name.parts[0] 23 field = self.models[model].get(root_field) 24 # If the query goes into a JSON field, return the root 25 # field as the JSON field will do the rest 26 if isinstance(field, JSONSearchField): 27 # This is a workaround; build_filter will remove the right-most 28 # entry in the path as that is intended to be the same as the field 29 # however for JSON that is not the case 30 if name.parts[-1] != root_field: 31 name.parts.append(root_field) 32 return field 33 return super().resolve_name(name) 34 35 36class QLSearch(SearchFilter): 37 """rest_framework search filter which uses DjangoQL""" 38 39 def __init__(self): 40 super().__init__() 41 self._fallback = SearchFilter() 42 43 def get_search_terms(self, request: Request) -> str: 44 """Search terms are set by a ?search=... query parameter, 45 and may be comma and/or whitespace delimited.""" 46 params = request.query_params.get("search", "") 47 params = params.replace("\x00", "") # strip null characters 48 return params 49 50 def get_schema(self, request: Request, view) -> BaseSchema: 51 ql_fields = [] 52 if hasattr(view, "get_ql_fields"): 53 ql_fields = view.get_ql_fields() 54 55 class InlineSchema(BaseSchema): 56 def get_fields(self, model): 57 return ql_fields or [] 58 59 return InlineSchema 60 61 def filter_queryset(self, request: Request, queryset: QuerySet, view) -> QuerySet: 62 search_query = self.get_search_terms(request) 63 schema = self.get_schema(request, view) 64 if len(search_query) == 0: 65 return self._fallback.filter_queryset(request, queryset, view) 66 try: 67 return apply_search(queryset, search_query, schema=schema) 68 except DjangoQLError as exc: 69 LOGGER.debug("Failed to parse search expression", exc=exc) 70 return self._fallback.filter_queryset(request, queryset, view)
LOGGER =
<BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
class
BaseSchema(djangoql.schema.DjangoQLSchema):
18class BaseSchema(DjangoQLSchema): 19 """Base Schema which deals with JSON Fields""" 20 21 def resolve_name(self, name: Name): 22 model = self.model_label(self.current_model) 23 root_field = name.parts[0] 24 field = self.models[model].get(root_field) 25 # If the query goes into a JSON field, return the root 26 # field as the JSON field will do the rest 27 if isinstance(field, JSONSearchField): 28 # This is a workaround; build_filter will remove the right-most 29 # entry in the path as that is intended to be the same as the field 30 # however for JSON that is not the case 31 if name.parts[-1] != root_field: 32 name.parts.append(root_field) 33 return field 34 return super().resolve_name(name)
Base Schema which deals with JSON Fields
def
resolve_name(self, name: djangoql.ast.Name):
21 def resolve_name(self, name: Name): 22 model = self.model_label(self.current_model) 23 root_field = name.parts[0] 24 field = self.models[model].get(root_field) 25 # If the query goes into a JSON field, return the root 26 # field as the JSON field will do the rest 27 if isinstance(field, JSONSearchField): 28 # This is a workaround; build_filter will remove the right-most 29 # entry in the path as that is intended to be the same as the field 30 # however for JSON that is not the case 31 if name.parts[-1] != root_field: 32 name.parts.append(root_field) 33 return field 34 return super().resolve_name(name)
class
QLSearch(rest_framework.filters.SearchFilter):
37class QLSearch(SearchFilter): 38 """rest_framework search filter which uses DjangoQL""" 39 40 def __init__(self): 41 super().__init__() 42 self._fallback = SearchFilter() 43 44 def get_search_terms(self, request: Request) -> str: 45 """Search terms are set by a ?search=... query parameter, 46 and may be comma and/or whitespace delimited.""" 47 params = request.query_params.get("search", "") 48 params = params.replace("\x00", "") # strip null characters 49 return params 50 51 def get_schema(self, request: Request, view) -> BaseSchema: 52 ql_fields = [] 53 if hasattr(view, "get_ql_fields"): 54 ql_fields = view.get_ql_fields() 55 56 class InlineSchema(BaseSchema): 57 def get_fields(self, model): 58 return ql_fields or [] 59 60 return InlineSchema 61 62 def filter_queryset(self, request: Request, queryset: QuerySet, view) -> QuerySet: 63 search_query = self.get_search_terms(request) 64 schema = self.get_schema(request, view) 65 if len(search_query) == 0: 66 return self._fallback.filter_queryset(request, queryset, view) 67 try: 68 return apply_search(queryset, search_query, schema=schema) 69 except DjangoQLError as exc: 70 LOGGER.debug("Failed to parse search expression", exc=exc) 71 return self._fallback.filter_queryset(request, queryset, view)
rest_framework search filter which uses DjangoQL
def
get_search_terms(self, request: rest_framework.request.Request) -> str:
44 def get_search_terms(self, request: Request) -> str: 45 """Search terms are set by a ?search=... query parameter, 46 and may be comma and/or whitespace delimited.""" 47 params = request.query_params.get("search", "") 48 params = params.replace("\x00", "") # strip null characters 49 return params
Search terms are set by a ?search=... query parameter, and may be comma and/or whitespace delimited.
def
filter_queryset( self, request: rest_framework.request.Request, queryset: django.db.models.query.QuerySet, view) -> django.db.models.query.QuerySet:
62 def filter_queryset(self, request: Request, queryset: QuerySet, view) -> QuerySet: 63 search_query = self.get_search_terms(request) 64 schema = self.get_schema(request, view) 65 if len(search_query) == 0: 66 return self._fallback.filter_queryset(request, queryset, view) 67 try: 68 return apply_search(queryset, search_query, schema=schema) 69 except DjangoQLError as exc: 70 LOGGER.debug("Failed to parse search expression", exc=exc) 71 return self._fallback.filter_queryset(request, queryset, view)
Return a filtered queryset.