authentik.enterprise.search.fields

DjangoQL search

  1"""DjangoQL search"""
  2
  3from collections import OrderedDict, defaultdict
  4from collections.abc import Generator
  5
  6from django.db import connection
  7from django.db.models import Model, Q
  8from djangoql.compat import text_type
  9from djangoql.schema import StrField
 10from djangoql.serializers import DjangoQLSchemaSerializer
 11
 12
 13class JSONSearchField(StrField):
 14    """JSON field for DjangoQL"""
 15
 16    model: Model
 17
 18    def __init__(
 19        self,
 20        model=None,
 21        name=None,
 22        nullable=None,
 23        suggest_nested=False,
 24        fixed_structure: OrderedDict | None = None,
 25    ):
 26        # Set this in the constructor to not clobber the type variable
 27        self.type = "relation"
 28        self.suggest_nested = suggest_nested
 29        self.fixed_structure = fixed_structure
 30        super().__init__(model, name, nullable)
 31
 32    def get_lookup(self, path, operator, value):
 33        search = "__".join(path)
 34        op, invert = self.get_operator(operator)
 35        q = Q(**{f"{search}{op}": self.get_lookup_value(value)})
 36        return ~q if invert else q
 37
 38    def json_field_keys(self) -> Generator[tuple[str]]:
 39        with connection.cursor() as cursor:
 40            cursor.execute(f"""
 41                WITH RECURSIVE "{self.name}_keys" AS (
 42                    SELECT
 43                        ARRAY[jsonb_object_keys("{self.name}")] AS key_path_array,
 44                        "{self.name}" -> jsonb_object_keys("{self.name}") AS value
 45                    FROM {self.model._meta.db_table}
 46                    WHERE "{self.name}" IS NOT NULL
 47                        AND jsonb_typeof("{self.name}") = 'object'
 48
 49                    UNION ALL
 50
 51                    SELECT
 52                        ck.key_path_array || jsonb_object_keys(ck.value),
 53                        ck.value -> jsonb_object_keys(ck.value) AS value
 54                    FROM "{self.name}_keys" ck
 55                    WHERE jsonb_typeof(ck.value) = 'object'
 56                ),
 57
 58                unique_paths AS (
 59                    SELECT DISTINCT key_path_array
 60                    FROM "{self.name}_keys"
 61                )
 62
 63                SELECT key_path_array FROM unique_paths;
 64            """)  # nosec
 65            return (x[0] for x in cursor.fetchall())
 66
 67    def get_fixed_structure(self, serializer: DjangoQLSchemaSerializer) -> OrderedDict:
 68        new_dict = OrderedDict()
 69        if not self.fixed_structure:
 70            return new_dict
 71        new_dict.setdefault(self.relation(), {})
 72        for key, value in self.fixed_structure.items():
 73            new_dict[self.relation()][key] = serializer.serialize_field(value)
 74            if isinstance(value, JSONSearchField):
 75                new_dict.update(value.get_nested_options(serializer))
 76        return new_dict
 77
 78    def get_nested_options(self, serializer: DjangoQLSchemaSerializer) -> OrderedDict:
 79        """Get keys of all nested objects to show autocomplete"""
 80        if not self.suggest_nested:
 81            if self.fixed_structure:
 82                return self.get_fixed_structure(serializer)
 83            return OrderedDict()
 84
 85        def recursive_function(parts: list[str], parent_parts: list[str] | None = None):
 86            if not parent_parts:
 87                parent_parts = []
 88            path = parts.pop(0)
 89            parent_parts.append(path)
 90            relation_key = "_".join(parent_parts)
 91            if len(parts) > 1:
 92                out_dict = {
 93                    relation_key: {
 94                        parts[0]: {
 95                            "type": "relation",
 96                            "relation": f"{relation_key}_{parts[0]}",
 97                        }
 98                    }
 99                }
100                child_paths = recursive_function(parts.copy(), parent_parts.copy())
101                child_paths.update(out_dict)
102                return child_paths
103            else:
104                return {relation_key: {parts[0]: {}}}
105
106        relation_structure = defaultdict(dict)
107
108        for relations in self.json_field_keys():
109            result = recursive_function([self.relation()] + relations)
110            for relation_key, value in result.items():
111                for sub_relation_key, sub_value in value.items():
112                    if not relation_structure[relation_key].get(sub_relation_key, None):
113                        relation_structure[relation_key][sub_relation_key] = sub_value
114                    else:
115                        relation_structure[relation_key][sub_relation_key].update(sub_value)
116
117        final_dict = defaultdict(dict)
118
119        for key, value in relation_structure.items():
120            for sub_key, sub_value in value.items():
121                if not sub_value:
122                    final_dict[key][sub_key] = {
123                        "type": "str",
124                        "nullable": True,
125                    }
126                else:
127                    final_dict[key][sub_key] = sub_value
128        return OrderedDict(final_dict)
129
130    def relation(self) -> str:
131        return f"{self.model._meta.app_label}.{self.model._meta.model_name}_{self.name}"
132
133
134class ChoiceSearchField(StrField):
135    def __init__(self, model=None, name=None, nullable=None):
136        super().__init__(model, name, nullable, suggest_options=True)
137
138    def get_options(self, search):
139        result = []
140        choices = self._field_choices()
141        if choices:
142            search = search.lower()
143            for c in choices:
144                choice = text_type(c[0])
145                if search in choice.lower():
146                    result.append(choice)
147        return result
class JSONSearchField(djangoql.schema.StrField):
 14class JSONSearchField(StrField):
 15    """JSON field for DjangoQL"""
 16
 17    model: Model
 18
 19    def __init__(
 20        self,
 21        model=None,
 22        name=None,
 23        nullable=None,
 24        suggest_nested=False,
 25        fixed_structure: OrderedDict | None = None,
 26    ):
 27        # Set this in the constructor to not clobber the type variable
 28        self.type = "relation"
 29        self.suggest_nested = suggest_nested
 30        self.fixed_structure = fixed_structure
 31        super().__init__(model, name, nullable)
 32
 33    def get_lookup(self, path, operator, value):
 34        search = "__".join(path)
 35        op, invert = self.get_operator(operator)
 36        q = Q(**{f"{search}{op}": self.get_lookup_value(value)})
 37        return ~q if invert else q
 38
 39    def json_field_keys(self) -> Generator[tuple[str]]:
 40        with connection.cursor() as cursor:
 41            cursor.execute(f"""
 42                WITH RECURSIVE "{self.name}_keys" AS (
 43                    SELECT
 44                        ARRAY[jsonb_object_keys("{self.name}")] AS key_path_array,
 45                        "{self.name}" -> jsonb_object_keys("{self.name}") AS value
 46                    FROM {self.model._meta.db_table}
 47                    WHERE "{self.name}" IS NOT NULL
 48                        AND jsonb_typeof("{self.name}") = 'object'
 49
 50                    UNION ALL
 51
 52                    SELECT
 53                        ck.key_path_array || jsonb_object_keys(ck.value),
 54                        ck.value -> jsonb_object_keys(ck.value) AS value
 55                    FROM "{self.name}_keys" ck
 56                    WHERE jsonb_typeof(ck.value) = 'object'
 57                ),
 58
 59                unique_paths AS (
 60                    SELECT DISTINCT key_path_array
 61                    FROM "{self.name}_keys"
 62                )
 63
 64                SELECT key_path_array FROM unique_paths;
 65            """)  # nosec
 66            return (x[0] for x in cursor.fetchall())
 67
 68    def get_fixed_structure(self, serializer: DjangoQLSchemaSerializer) -> OrderedDict:
 69        new_dict = OrderedDict()
 70        if not self.fixed_structure:
 71            return new_dict
 72        new_dict.setdefault(self.relation(), {})
 73        for key, value in self.fixed_structure.items():
 74            new_dict[self.relation()][key] = serializer.serialize_field(value)
 75            if isinstance(value, JSONSearchField):
 76                new_dict.update(value.get_nested_options(serializer))
 77        return new_dict
 78
 79    def get_nested_options(self, serializer: DjangoQLSchemaSerializer) -> OrderedDict:
 80        """Get keys of all nested objects to show autocomplete"""
 81        if not self.suggest_nested:
 82            if self.fixed_structure:
 83                return self.get_fixed_structure(serializer)
 84            return OrderedDict()
 85
 86        def recursive_function(parts: list[str], parent_parts: list[str] | None = None):
 87            if not parent_parts:
 88                parent_parts = []
 89            path = parts.pop(0)
 90            parent_parts.append(path)
 91            relation_key = "_".join(parent_parts)
 92            if len(parts) > 1:
 93                out_dict = {
 94                    relation_key: {
 95                        parts[0]: {
 96                            "type": "relation",
 97                            "relation": f"{relation_key}_{parts[0]}",
 98                        }
 99                    }
100                }
101                child_paths = recursive_function(parts.copy(), parent_parts.copy())
102                child_paths.update(out_dict)
103                return child_paths
104            else:
105                return {relation_key: {parts[0]: {}}}
106
107        relation_structure = defaultdict(dict)
108
109        for relations in self.json_field_keys():
110            result = recursive_function([self.relation()] + relations)
111            for relation_key, value in result.items():
112                for sub_relation_key, sub_value in value.items():
113                    if not relation_structure[relation_key].get(sub_relation_key, None):
114                        relation_structure[relation_key][sub_relation_key] = sub_value
115                    else:
116                        relation_structure[relation_key][sub_relation_key].update(sub_value)
117
118        final_dict = defaultdict(dict)
119
120        for key, value in relation_structure.items():
121            for sub_key, sub_value in value.items():
122                if not sub_value:
123                    final_dict[key][sub_key] = {
124                        "type": "str",
125                        "nullable": True,
126                    }
127                else:
128                    final_dict[key][sub_key] = sub_value
129        return OrderedDict(final_dict)
130
131    def relation(self) -> str:
132        return f"{self.model._meta.app_label}.{self.model._meta.model_name}_{self.name}"

JSON field for DjangoQL

JSONSearchField( model=None, name=None, nullable=None, suggest_nested=False, fixed_structure: collections.OrderedDict | None = None)
19    def __init__(
20        self,
21        model=None,
22        name=None,
23        nullable=None,
24        suggest_nested=False,
25        fixed_structure: OrderedDict | None = None,
26    ):
27        # Set this in the constructor to not clobber the type variable
28        self.type = "relation"
29        self.suggest_nested = suggest_nested
30        self.fixed_structure = fixed_structure
31        super().__init__(model, name, nullable)
model: django.db.models.base.Model = None
type = 'str'
suggest_nested
fixed_structure
def get_lookup(self, path, operator, value):
33    def get_lookup(self, path, operator, value):
34        search = "__".join(path)
35        op, invert = self.get_operator(operator)
36        q = Q(**{f"{search}{op}": self.get_lookup_value(value)})
37        return ~q if invert else q

Performs a lookup for this field with given path, operator and value.

Override this if you'd like to implement a fully custom lookup. It should support all comparison operators compatible with the field type.

:param path: a list of names preceding current lookup. For example, if expression looks like 'author.groups.name = "Foo"' path would be ['author', 'groups']. 'name' is not included, because it's the current field instance itself. :param operator: a string with comparison operator. It could be one of the following: '=', '!=', '>', '>=', '<', '<=', '~', '!~', 'in', 'not in'. Depending on the field type, some operators may be excluded. '~' and '!~' can be applied to StrField only and aren't allowed for any other fields. BoolField can't be used with less or greater operators, '>', '>=', '<' and '<=' are excluded for it. :param value: value passed for comparison :return: Q-object

def json_field_keys(self) -> Generator[tuple[str]]:
39    def json_field_keys(self) -> Generator[tuple[str]]:
40        with connection.cursor() as cursor:
41            cursor.execute(f"""
42                WITH RECURSIVE "{self.name}_keys" AS (
43                    SELECT
44                        ARRAY[jsonb_object_keys("{self.name}")] AS key_path_array,
45                        "{self.name}" -> jsonb_object_keys("{self.name}") AS value
46                    FROM {self.model._meta.db_table}
47                    WHERE "{self.name}" IS NOT NULL
48                        AND jsonb_typeof("{self.name}") = 'object'
49
50                    UNION ALL
51
52                    SELECT
53                        ck.key_path_array || jsonb_object_keys(ck.value),
54                        ck.value -> jsonb_object_keys(ck.value) AS value
55                    FROM "{self.name}_keys" ck
56                    WHERE jsonb_typeof(ck.value) = 'object'
57                ),
58
59                unique_paths AS (
60                    SELECT DISTINCT key_path_array
61                    FROM "{self.name}_keys"
62                )
63
64                SELECT key_path_array FROM unique_paths;
65            """)  # nosec
66            return (x[0] for x in cursor.fetchall())
def get_fixed_structure( self, serializer: djangoql.serializers.DjangoQLSchemaSerializer) -> collections.OrderedDict:
68    def get_fixed_structure(self, serializer: DjangoQLSchemaSerializer) -> OrderedDict:
69        new_dict = OrderedDict()
70        if not self.fixed_structure:
71            return new_dict
72        new_dict.setdefault(self.relation(), {})
73        for key, value in self.fixed_structure.items():
74            new_dict[self.relation()][key] = serializer.serialize_field(value)
75            if isinstance(value, JSONSearchField):
76                new_dict.update(value.get_nested_options(serializer))
77        return new_dict
def get_nested_options( self, serializer: djangoql.serializers.DjangoQLSchemaSerializer) -> collections.OrderedDict:
 79    def get_nested_options(self, serializer: DjangoQLSchemaSerializer) -> OrderedDict:
 80        """Get keys of all nested objects to show autocomplete"""
 81        if not self.suggest_nested:
 82            if self.fixed_structure:
 83                return self.get_fixed_structure(serializer)
 84            return OrderedDict()
 85
 86        def recursive_function(parts: list[str], parent_parts: list[str] | None = None):
 87            if not parent_parts:
 88                parent_parts = []
 89            path = parts.pop(0)
 90            parent_parts.append(path)
 91            relation_key = "_".join(parent_parts)
 92            if len(parts) > 1:
 93                out_dict = {
 94                    relation_key: {
 95                        parts[0]: {
 96                            "type": "relation",
 97                            "relation": f"{relation_key}_{parts[0]}",
 98                        }
 99                    }
100                }
101                child_paths = recursive_function(parts.copy(), parent_parts.copy())
102                child_paths.update(out_dict)
103                return child_paths
104            else:
105                return {relation_key: {parts[0]: {}}}
106
107        relation_structure = defaultdict(dict)
108
109        for relations in self.json_field_keys():
110            result = recursive_function([self.relation()] + relations)
111            for relation_key, value in result.items():
112                for sub_relation_key, sub_value in value.items():
113                    if not relation_structure[relation_key].get(sub_relation_key, None):
114                        relation_structure[relation_key][sub_relation_key] = sub_value
115                    else:
116                        relation_structure[relation_key][sub_relation_key].update(sub_value)
117
118        final_dict = defaultdict(dict)
119
120        for key, value in relation_structure.items():
121            for sub_key, sub_value in value.items():
122                if not sub_value:
123                    final_dict[key][sub_key] = {
124                        "type": "str",
125                        "nullable": True,
126                    }
127                else:
128                    final_dict[key][sub_key] = sub_value
129        return OrderedDict(final_dict)

Get keys of all nested objects to show autocomplete

def relation(self) -> str:
131    def relation(self) -> str:
132        return f"{self.model._meta.app_label}.{self.model._meta.model_name}_{self.name}"
class ChoiceSearchField(djangoql.schema.StrField):
135class ChoiceSearchField(StrField):
136    def __init__(self, model=None, name=None, nullable=None):
137        super().__init__(model, name, nullable, suggest_options=True)
138
139    def get_options(self, search):
140        result = []
141        choices = self._field_choices()
142        if choices:
143            search = search.lower()
144            for c in choices:
145                choice = text_type(c[0])
146                if search in choice.lower():
147                    result.append(choice)
148        return result

Abstract searchable field

ChoiceSearchField(model=None, name=None, nullable=None)
136    def __init__(self, model=None, name=None, nullable=None):
137        super().__init__(model, name, nullable, suggest_options=True)
def get_options(self, search):
139    def get_options(self, search):
140        result = []
141        choices = self._field_choices()
142        if choices:
143            search = search.lower()
144            for c in choices:
145                choice = text_type(c[0])
146                if search in choice.lower():
147                    result.append(choice)
148        return result

Override this method to provide custom suggestion options