authentik.enterprise.search.ql
DjangoQL search
1"""DjangoQL search""" 2 3from django.apps import apps 4from django.db.models import QuerySet 5from djangoql.ast import Name 6from djangoql.exceptions import DjangoQLError 7from djangoql.queryset import apply_search 8from djangoql.schema import DjangoQLSchema 9from drf_spectacular.plumbing import ResolvedComponent, build_object_type 10from rest_framework.filters import SearchFilter 11from rest_framework.request import Request 12from structlog.stdlib import get_logger 13 14from authentik.enterprise.search.fields import JSONSearchField 15 16LOGGER = get_logger() 17AUTOCOMPLETE_SCHEMA = ResolvedComponent( 18 name="Autocomplete", 19 object="Autocomplete", 20 type=ResolvedComponent.SCHEMA, 21 schema=build_object_type(additionalProperties={}), 22) 23 24 25class BaseSchema(DjangoQLSchema): 26 """Base Schema which deals with JSON Fields""" 27 28 def resolve_name(self, name: Name): 29 model = self.model_label(self.current_model) 30 root_field = name.parts[0] 31 field = self.models[model].get(root_field) 32 # If the query goes into a JSON field, return the root 33 # field as the JSON field will do the rest 34 if isinstance(field, JSONSearchField): 35 # This is a workaround; build_filter will remove the right-most 36 # entry in the path as that is intended to be the same as the field 37 # however for JSON that is not the case 38 if name.parts[-1] != root_field: 39 name.parts.append(root_field) 40 return field 41 return super().resolve_name(name) 42 43 44class QLSearch(SearchFilter): 45 """rest_framework search filter which uses DjangoQL""" 46 47 def __init__(self): 48 super().__init__() 49 self._fallback = SearchFilter() 50 51 @property 52 def enabled(self): 53 return apps.get_app_config("authentik_enterprise").enabled() 54 55 def get_search_terms(self, request: Request) -> str: 56 """Search terms are set by a ?search=... query parameter, 57 and may be comma and/or whitespace delimited.""" 58 params = request.query_params.get("search", "") 59 params = params.replace("\x00", "") # strip null characters 60 return params 61 62 def get_schema(self, request: Request, view) -> BaseSchema: 63 ql_fields = [] 64 if hasattr(view, "get_ql_fields"): 65 ql_fields = view.get_ql_fields() 66 67 class InlineSchema(BaseSchema): 68 def get_fields(self, model): 69 return ql_fields or [] 70 71 return InlineSchema 72 73 def filter_queryset(self, request: Request, queryset: QuerySet, view) -> QuerySet: 74 search_query = self.get_search_terms(request) 75 schema = self.get_schema(request, view) 76 if len(search_query) == 0 or not self.enabled: 77 return self._fallback.filter_queryset(request, queryset, view) 78 try: 79 return apply_search(queryset, search_query, schema=schema) 80 except DjangoQLError as exc: 81 LOGGER.debug("Failed to parse search expression", exc=exc) 82 return self._fallback.filter_queryset(request, queryset, view)
LOGGER =
<BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
AUTOCOMPLETE_SCHEMA =
<drf_spectacular.plumbing.ResolvedComponent object>
class
BaseSchema(djangoql.schema.DjangoQLSchema):
26class BaseSchema(DjangoQLSchema): 27 """Base Schema which deals with JSON Fields""" 28 29 def resolve_name(self, name: Name): 30 model = self.model_label(self.current_model) 31 root_field = name.parts[0] 32 field = self.models[model].get(root_field) 33 # If the query goes into a JSON field, return the root 34 # field as the JSON field will do the rest 35 if isinstance(field, JSONSearchField): 36 # This is a workaround; build_filter will remove the right-most 37 # entry in the path as that is intended to be the same as the field 38 # however for JSON that is not the case 39 if name.parts[-1] != root_field: 40 name.parts.append(root_field) 41 return field 42 return super().resolve_name(name)
Base Schema which deals with JSON Fields
def
resolve_name(self, name: djangoql.ast.Name):
29 def resolve_name(self, name: Name): 30 model = self.model_label(self.current_model) 31 root_field = name.parts[0] 32 field = self.models[model].get(root_field) 33 # If the query goes into a JSON field, return the root 34 # field as the JSON field will do the rest 35 if isinstance(field, JSONSearchField): 36 # This is a workaround; build_filter will remove the right-most 37 # entry in the path as that is intended to be the same as the field 38 # however for JSON that is not the case 39 if name.parts[-1] != root_field: 40 name.parts.append(root_field) 41 return field 42 return super().resolve_name(name)
class
QLSearch(rest_framework.filters.SearchFilter):
45class QLSearch(SearchFilter): 46 """rest_framework search filter which uses DjangoQL""" 47 48 def __init__(self): 49 super().__init__() 50 self._fallback = SearchFilter() 51 52 @property 53 def enabled(self): 54 return apps.get_app_config("authentik_enterprise").enabled() 55 56 def get_search_terms(self, request: Request) -> str: 57 """Search terms are set by a ?search=... query parameter, 58 and may be comma and/or whitespace delimited.""" 59 params = request.query_params.get("search", "") 60 params = params.replace("\x00", "") # strip null characters 61 return params 62 63 def get_schema(self, request: Request, view) -> BaseSchema: 64 ql_fields = [] 65 if hasattr(view, "get_ql_fields"): 66 ql_fields = view.get_ql_fields() 67 68 class InlineSchema(BaseSchema): 69 def get_fields(self, model): 70 return ql_fields or [] 71 72 return InlineSchema 73 74 def filter_queryset(self, request: Request, queryset: QuerySet, view) -> QuerySet: 75 search_query = self.get_search_terms(request) 76 schema = self.get_schema(request, view) 77 if len(search_query) == 0 or not self.enabled: 78 return self._fallback.filter_queryset(request, queryset, view) 79 try: 80 return apply_search(queryset, search_query, schema=schema) 81 except DjangoQLError as exc: 82 LOGGER.debug("Failed to parse search expression", exc=exc) 83 return self._fallback.filter_queryset(request, queryset, view)
rest_framework search filter which uses DjangoQL
def
get_search_terms(self, request: rest_framework.request.Request) -> str:
56 def get_search_terms(self, request: Request) -> str: 57 """Search terms are set by a ?search=... query parameter, 58 and may be comma and/or whitespace delimited.""" 59 params = request.query_params.get("search", "") 60 params = params.replace("\x00", "") # strip null characters 61 return params
Search terms are set by a ?search=... query parameter, and may be comma and/or whitespace delimited.
def
filter_queryset( self, request: rest_framework.request.Request, queryset: django.db.models.query.QuerySet, view) -> django.db.models.query.QuerySet:
74 def filter_queryset(self, request: Request, queryset: QuerySet, view) -> QuerySet: 75 search_query = self.get_search_terms(request) 76 schema = self.get_schema(request, view) 77 if len(search_query) == 0 or not self.enabled: 78 return self._fallback.filter_queryset(request, queryset, view) 79 try: 80 return apply_search(queryset, search_query, schema=schema) 81 except DjangoQLError as exc: 82 LOGGER.debug("Failed to parse search expression", exc=exc) 83 return self._fallback.filter_queryset(request, queryset, view)
Return a filtered queryset.