authentik.enterprise.search.fields
DjangoQL search
1"""DjangoQL search""" 2 3from collections import OrderedDict, defaultdict 4from collections.abc import Generator 5 6from django.db import connection 7from django.db.models import Model, Q 8from djangoql.compat import text_type 9from djangoql.schema import StrField 10from djangoql.serializers import DjangoQLSchemaSerializer 11 12 13class JSONSearchField(StrField): 14 """JSON field for DjangoQL""" 15 16 model: Model 17 18 def __init__( 19 self, 20 model=None, 21 name=None, 22 nullable=None, 23 suggest_nested=False, 24 fixed_structure: OrderedDict | None = None, 25 ): 26 # Set this in the constructor to not clobber the type variable 27 self.type = "relation" 28 self.suggest_nested = suggest_nested 29 self.fixed_structure = fixed_structure 30 super().__init__(model, name, nullable) 31 32 def get_lookup(self, path, operator, value): 33 search = "__".join(path) 34 op, invert = self.get_operator(operator) 35 q = Q(**{f"{search}{op}": self.get_lookup_value(value)}) 36 return ~q if invert else q 37 38 def json_field_keys(self) -> Generator[tuple[str]]: 39 with connection.cursor() as cursor: 40 cursor.execute(f""" 41 WITH RECURSIVE "{self.name}_keys" AS ( 42 SELECT 43 ARRAY[jsonb_object_keys("{self.name}")] AS key_path_array, 44 "{self.name}" -> jsonb_object_keys("{self.name}") AS value 45 FROM {self.model._meta.db_table} 46 WHERE "{self.name}" IS NOT NULL 47 AND jsonb_typeof("{self.name}") = 'object' 48 49 UNION ALL 50 51 SELECT 52 ck.key_path_array || jsonb_object_keys(ck.value), 53 ck.value -> jsonb_object_keys(ck.value) AS value 54 FROM "{self.name}_keys" ck 55 WHERE jsonb_typeof(ck.value) = 'object' 56 ), 57 58 unique_paths AS ( 59 SELECT DISTINCT key_path_array 60 FROM "{self.name}_keys" 61 ) 62 63 SELECT key_path_array FROM unique_paths; 64 """) # nosec 65 return (x[0] for x in cursor.fetchall()) 66 67 def get_fixed_structure(self, serializer: DjangoQLSchemaSerializer) -> OrderedDict: 68 new_dict = OrderedDict() 69 if not self.fixed_structure: 70 return new_dict 71 new_dict.setdefault(self.relation(), {}) 72 for key, value in self.fixed_structure.items(): 73 new_dict[self.relation()][key] = serializer.serialize_field(value) 74 if isinstance(value, JSONSearchField): 75 new_dict.update(value.get_nested_options(serializer)) 76 return new_dict 77 78 def get_nested_options(self, serializer: DjangoQLSchemaSerializer) -> OrderedDict: 79 """Get keys of all nested objects to show autocomplete""" 80 if not self.suggest_nested: 81 if self.fixed_structure: 82 return self.get_fixed_structure(serializer) 83 return OrderedDict() 84 85 def recursive_function(parts: list[str], parent_parts: list[str] | None = None): 86 if not parent_parts: 87 parent_parts = [] 88 path = parts.pop(0) 89 parent_parts.append(path) 90 relation_key = "_".join(parent_parts) 91 if len(parts) > 1: 92 out_dict = { 93 relation_key: { 94 parts[0]: { 95 "type": "relation", 96 "relation": f"{relation_key}_{parts[0]}", 97 } 98 } 99 } 100 child_paths = recursive_function(parts.copy(), parent_parts.copy()) 101 child_paths.update(out_dict) 102 return child_paths 103 else: 104 return {relation_key: {parts[0]: {}}} 105 106 relation_structure = defaultdict(dict) 107 108 for relations in self.json_field_keys(): 109 result = recursive_function([self.relation()] + relations) 110 for relation_key, value in result.items(): 111 for sub_relation_key, sub_value in value.items(): 112 if not relation_structure[relation_key].get(sub_relation_key, None): 113 relation_structure[relation_key][sub_relation_key] = sub_value 114 else: 115 relation_structure[relation_key][sub_relation_key].update(sub_value) 116 117 final_dict = defaultdict(dict) 118 119 for key, value in relation_structure.items(): 120 for sub_key, sub_value in value.items(): 121 if not sub_value: 122 final_dict[key][sub_key] = { 123 "type": "str", 124 "nullable": True, 125 } 126 else: 127 final_dict[key][sub_key] = sub_value 128 return OrderedDict(final_dict) 129 130 def relation(self) -> str: 131 return f"{self.model._meta.app_label}.{self.model._meta.model_name}_{self.name}" 132 133 134class ChoiceSearchField(StrField): 135 def __init__(self, model=None, name=None, nullable=None): 136 super().__init__(model, name, nullable, suggest_options=True) 137 138 def get_options(self, search): 139 result = [] 140 choices = self._field_choices() 141 if choices: 142 search = search.lower() 143 for c in choices: 144 choice = text_type(c[0]) 145 if search in choice.lower(): 146 result.append(choice) 147 return result
14class JSONSearchField(StrField): 15 """JSON field for DjangoQL""" 16 17 model: Model 18 19 def __init__( 20 self, 21 model=None, 22 name=None, 23 nullable=None, 24 suggest_nested=False, 25 fixed_structure: OrderedDict | None = None, 26 ): 27 # Set this in the constructor to not clobber the type variable 28 self.type = "relation" 29 self.suggest_nested = suggest_nested 30 self.fixed_structure = fixed_structure 31 super().__init__(model, name, nullable) 32 33 def get_lookup(self, path, operator, value): 34 search = "__".join(path) 35 op, invert = self.get_operator(operator) 36 q = Q(**{f"{search}{op}": self.get_lookup_value(value)}) 37 return ~q if invert else q 38 39 def json_field_keys(self) -> Generator[tuple[str]]: 40 with connection.cursor() as cursor: 41 cursor.execute(f""" 42 WITH RECURSIVE "{self.name}_keys" AS ( 43 SELECT 44 ARRAY[jsonb_object_keys("{self.name}")] AS key_path_array, 45 "{self.name}" -> jsonb_object_keys("{self.name}") AS value 46 FROM {self.model._meta.db_table} 47 WHERE "{self.name}" IS NOT NULL 48 AND jsonb_typeof("{self.name}") = 'object' 49 50 UNION ALL 51 52 SELECT 53 ck.key_path_array || jsonb_object_keys(ck.value), 54 ck.value -> jsonb_object_keys(ck.value) AS value 55 FROM "{self.name}_keys" ck 56 WHERE jsonb_typeof(ck.value) = 'object' 57 ), 58 59 unique_paths AS ( 60 SELECT DISTINCT key_path_array 61 FROM "{self.name}_keys" 62 ) 63 64 SELECT key_path_array FROM unique_paths; 65 """) # nosec 66 return (x[0] for x in cursor.fetchall()) 67 68 def get_fixed_structure(self, serializer: DjangoQLSchemaSerializer) -> OrderedDict: 69 new_dict = OrderedDict() 70 if not self.fixed_structure: 71 return new_dict 72 new_dict.setdefault(self.relation(), {}) 73 for key, value in self.fixed_structure.items(): 74 new_dict[self.relation()][key] = serializer.serialize_field(value) 75 if isinstance(value, JSONSearchField): 76 new_dict.update(value.get_nested_options(serializer)) 77 return new_dict 78 79 def get_nested_options(self, serializer: DjangoQLSchemaSerializer) -> OrderedDict: 80 """Get keys of all nested objects to show autocomplete""" 81 if not self.suggest_nested: 82 if self.fixed_structure: 83 return self.get_fixed_structure(serializer) 84 return OrderedDict() 85 86 def recursive_function(parts: list[str], parent_parts: list[str] | None = None): 87 if not parent_parts: 88 parent_parts = [] 89 path = parts.pop(0) 90 parent_parts.append(path) 91 relation_key = "_".join(parent_parts) 92 if len(parts) > 1: 93 out_dict = { 94 relation_key: { 95 parts[0]: { 96 "type": "relation", 97 "relation": f"{relation_key}_{parts[0]}", 98 } 99 } 100 } 101 child_paths = recursive_function(parts.copy(), parent_parts.copy()) 102 child_paths.update(out_dict) 103 return child_paths 104 else: 105 return {relation_key: {parts[0]: {}}} 106 107 relation_structure = defaultdict(dict) 108 109 for relations in self.json_field_keys(): 110 result = recursive_function([self.relation()] + relations) 111 for relation_key, value in result.items(): 112 for sub_relation_key, sub_value in value.items(): 113 if not relation_structure[relation_key].get(sub_relation_key, None): 114 relation_structure[relation_key][sub_relation_key] = sub_value 115 else: 116 relation_structure[relation_key][sub_relation_key].update(sub_value) 117 118 final_dict = defaultdict(dict) 119 120 for key, value in relation_structure.items(): 121 for sub_key, sub_value in value.items(): 122 if not sub_value: 123 final_dict[key][sub_key] = { 124 "type": "str", 125 "nullable": True, 126 } 127 else: 128 final_dict[key][sub_key] = sub_value 129 return OrderedDict(final_dict) 130 131 def relation(self) -> str: 132 return f"{self.model._meta.app_label}.{self.model._meta.model_name}_{self.name}"
JSON field for DjangoQL
19 def __init__( 20 self, 21 model=None, 22 name=None, 23 nullable=None, 24 suggest_nested=False, 25 fixed_structure: OrderedDict | None = None, 26 ): 27 # Set this in the constructor to not clobber the type variable 28 self.type = "relation" 29 self.suggest_nested = suggest_nested 30 self.fixed_structure = fixed_structure 31 super().__init__(model, name, nullable)
33 def get_lookup(self, path, operator, value): 34 search = "__".join(path) 35 op, invert = self.get_operator(operator) 36 q = Q(**{f"{search}{op}": self.get_lookup_value(value)}) 37 return ~q if invert else q
Performs a lookup for this field with given path, operator and value.
Override this if you'd like to implement a fully custom lookup. It should support all comparison operators compatible with the field type.
:param path: a list of names preceding current lookup. For example, if expression looks like 'author.groups.name = "Foo"' path would be ['author', 'groups']. 'name' is not included, because it's the current field instance itself. :param operator: a string with comparison operator. It could be one of the following: '=', '!=', '>', '>=', '<', '<=', '~', '!~', 'in', 'not in'. Depending on the field type, some operators may be excluded. '~' and '!~' can be applied to StrField only and aren't allowed for any other fields. BoolField can't be used with less or greater operators, '>', '>=', '<' and '<=' are excluded for it. :param value: value passed for comparison :return: Q-object
39 def json_field_keys(self) -> Generator[tuple[str]]: 40 with connection.cursor() as cursor: 41 cursor.execute(f""" 42 WITH RECURSIVE "{self.name}_keys" AS ( 43 SELECT 44 ARRAY[jsonb_object_keys("{self.name}")] AS key_path_array, 45 "{self.name}" -> jsonb_object_keys("{self.name}") AS value 46 FROM {self.model._meta.db_table} 47 WHERE "{self.name}" IS NOT NULL 48 AND jsonb_typeof("{self.name}") = 'object' 49 50 UNION ALL 51 52 SELECT 53 ck.key_path_array || jsonb_object_keys(ck.value), 54 ck.value -> jsonb_object_keys(ck.value) AS value 55 FROM "{self.name}_keys" ck 56 WHERE jsonb_typeof(ck.value) = 'object' 57 ), 58 59 unique_paths AS ( 60 SELECT DISTINCT key_path_array 61 FROM "{self.name}_keys" 62 ) 63 64 SELECT key_path_array FROM unique_paths; 65 """) # nosec 66 return (x[0] for x in cursor.fetchall())
68 def get_fixed_structure(self, serializer: DjangoQLSchemaSerializer) -> OrderedDict: 69 new_dict = OrderedDict() 70 if not self.fixed_structure: 71 return new_dict 72 new_dict.setdefault(self.relation(), {}) 73 for key, value in self.fixed_structure.items(): 74 new_dict[self.relation()][key] = serializer.serialize_field(value) 75 if isinstance(value, JSONSearchField): 76 new_dict.update(value.get_nested_options(serializer)) 77 return new_dict
79 def get_nested_options(self, serializer: DjangoQLSchemaSerializer) -> OrderedDict: 80 """Get keys of all nested objects to show autocomplete""" 81 if not self.suggest_nested: 82 if self.fixed_structure: 83 return self.get_fixed_structure(serializer) 84 return OrderedDict() 85 86 def recursive_function(parts: list[str], parent_parts: list[str] | None = None): 87 if not parent_parts: 88 parent_parts = [] 89 path = parts.pop(0) 90 parent_parts.append(path) 91 relation_key = "_".join(parent_parts) 92 if len(parts) > 1: 93 out_dict = { 94 relation_key: { 95 parts[0]: { 96 "type": "relation", 97 "relation": f"{relation_key}_{parts[0]}", 98 } 99 } 100 } 101 child_paths = recursive_function(parts.copy(), parent_parts.copy()) 102 child_paths.update(out_dict) 103 return child_paths 104 else: 105 return {relation_key: {parts[0]: {}}} 106 107 relation_structure = defaultdict(dict) 108 109 for relations in self.json_field_keys(): 110 result = recursive_function([self.relation()] + relations) 111 for relation_key, value in result.items(): 112 for sub_relation_key, sub_value in value.items(): 113 if not relation_structure[relation_key].get(sub_relation_key, None): 114 relation_structure[relation_key][sub_relation_key] = sub_value 115 else: 116 relation_structure[relation_key][sub_relation_key].update(sub_value) 117 118 final_dict = defaultdict(dict) 119 120 for key, value in relation_structure.items(): 121 for sub_key, sub_value in value.items(): 122 if not sub_value: 123 final_dict[key][sub_key] = { 124 "type": "str", 125 "nullable": True, 126 } 127 else: 128 final_dict[key][sub_key] = sub_value 129 return OrderedDict(final_dict)
Get keys of all nested objects to show autocomplete
135class ChoiceSearchField(StrField): 136 def __init__(self, model=None, name=None, nullable=None): 137 super().__init__(model, name, nullable, suggest_options=True) 138 139 def get_options(self, search): 140 result = [] 141 choices = self._field_choices() 142 if choices: 143 search = search.lower() 144 for c in choices: 145 choice = text_type(c[0]) 146 if search in choice.lower(): 147 result.append(choice) 148 return result
Abstract searchable field
139 def get_options(self, search): 140 result = [] 141 choices = self._field_choices() 142 if choices: 143 search = search.lower() 144 for c in choices: 145 choice = text_type(c[0]) 146 if search in choice.lower(): 147 result.append(choice) 148 return result
Override this method to provide custom suggestion options