authentik.lib.sync.mapper
1from collections.abc import Generator 2 3from django.db.models import QuerySet 4from django.http import HttpRequest 5 6from authentik.core.expression.evaluator import PropertyMappingEvaluator 7from authentik.core.expression.exceptions import ( 8 PropertyMappingExpressionException, 9) 10from authentik.core.models import PropertyMapping, User 11from authentik.lib.expression.exceptions import ControlFlowException 12 13 14class PropertyMappingManager: 15 """Pre-compile and cache property mappings when an identical 16 set is used multiple times""" 17 18 query_set: QuerySet[PropertyMapping] 19 mapping_subclass: type[PropertyMapping] 20 21 _evaluators: list[PropertyMappingEvaluator] 22 23 globals: dict 24 25 __has_compiled: bool 26 27 def __init__( 28 self, 29 qs: QuerySet[PropertyMapping], 30 # Expected subclass of PropertyMappings, any objects in the queryset 31 # that are not an instance of this class will be discarded 32 mapping_subclass: type[PropertyMapping], 33 # As they keys of parameters are part of the compilation, 34 # we need a list of all parameter names that will be used during evaluation 35 context_keys: list[str], 36 ) -> None: 37 self.query_set = qs.order_by("name") 38 self.mapping_subclass = mapping_subclass 39 self.context_keys = context_keys 40 self.globals = {} 41 self.__has_compiled = False 42 43 def compile(self): 44 self._evaluators = [] 45 for mapping in self.query_set: 46 if not isinstance(mapping, self.mapping_subclass): 47 continue 48 evaluator = PropertyMappingEvaluator( 49 mapping, **{key: None for key in self.context_keys} 50 ) 51 evaluator._globals.update(self.globals) 52 # Compile and cache expression 53 evaluator.compile() 54 self._evaluators.append(evaluator) 55 56 def iter_eval( 57 self, 58 user: User | None, 59 request: HttpRequest | None, 60 return_mapping: bool = False, 61 **kwargs, 62 ) -> Generator[tuple[dict, PropertyMapping]]: 63 """Iterate over all mappings that were pre-compiled and 64 execute all of them with the given context""" 65 if not self.__has_compiled: 66 self.compile() 67 self.__has_compiled = True 68 for mapping in self._evaluators: 69 mapping.set_context(user, request, **kwargs) 70 try: 71 value = mapping.evaluate(mapping.model.expression) 72 except (PropertyMappingExpressionException, ControlFlowException) as exc: 73 raise exc from exc 74 except Exception as exc: 75 raise PropertyMappingExpressionException(exc, mapping.model) from exc 76 if value is None: 77 continue 78 if return_mapping: 79 yield value, mapping.model 80 else: 81 yield value
class
PropertyMappingManager:
15class PropertyMappingManager: 16 """Pre-compile and cache property mappings when an identical 17 set is used multiple times""" 18 19 query_set: QuerySet[PropertyMapping] 20 mapping_subclass: type[PropertyMapping] 21 22 _evaluators: list[PropertyMappingEvaluator] 23 24 globals: dict 25 26 __has_compiled: bool 27 28 def __init__( 29 self, 30 qs: QuerySet[PropertyMapping], 31 # Expected subclass of PropertyMappings, any objects in the queryset 32 # that are not an instance of this class will be discarded 33 mapping_subclass: type[PropertyMapping], 34 # As they keys of parameters are part of the compilation, 35 # we need a list of all parameter names that will be used during evaluation 36 context_keys: list[str], 37 ) -> None: 38 self.query_set = qs.order_by("name") 39 self.mapping_subclass = mapping_subclass 40 self.context_keys = context_keys 41 self.globals = {} 42 self.__has_compiled = False 43 44 def compile(self): 45 self._evaluators = [] 46 for mapping in self.query_set: 47 if not isinstance(mapping, self.mapping_subclass): 48 continue 49 evaluator = PropertyMappingEvaluator( 50 mapping, **{key: None for key in self.context_keys} 51 ) 52 evaluator._globals.update(self.globals) 53 # Compile and cache expression 54 evaluator.compile() 55 self._evaluators.append(evaluator) 56 57 def iter_eval( 58 self, 59 user: User | None, 60 request: HttpRequest | None, 61 return_mapping: bool = False, 62 **kwargs, 63 ) -> Generator[tuple[dict, PropertyMapping]]: 64 """Iterate over all mappings that were pre-compiled and 65 execute all of them with the given context""" 66 if not self.__has_compiled: 67 self.compile() 68 self.__has_compiled = True 69 for mapping in self._evaluators: 70 mapping.set_context(user, request, **kwargs) 71 try: 72 value = mapping.evaluate(mapping.model.expression) 73 except (PropertyMappingExpressionException, ControlFlowException) as exc: 74 raise exc from exc 75 except Exception as exc: 76 raise PropertyMappingExpressionException(exc, mapping.model) from exc 77 if value is None: 78 continue 79 if return_mapping: 80 yield value, mapping.model 81 else: 82 yield value
Pre-compile and cache property mappings when an identical set is used multiple times
PropertyMappingManager( qs: django.db.models.query.QuerySet, mapping_subclass: type[authentik.core.models.PropertyMapping], context_keys: list[str])
28 def __init__( 29 self, 30 qs: QuerySet[PropertyMapping], 31 # Expected subclass of PropertyMappings, any objects in the queryset 32 # that are not an instance of this class will be discarded 33 mapping_subclass: type[PropertyMapping], 34 # As they keys of parameters are part of the compilation, 35 # we need a list of all parameter names that will be used during evaluation 36 context_keys: list[str], 37 ) -> None: 38 self.query_set = qs.order_by("name") 39 self.mapping_subclass = mapping_subclass 40 self.context_keys = context_keys 41 self.globals = {} 42 self.__has_compiled = False
mapping_subclass: type[authentik.core.models.PropertyMapping]
def
compile(self):
44 def compile(self): 45 self._evaluators = [] 46 for mapping in self.query_set: 47 if not isinstance(mapping, self.mapping_subclass): 48 continue 49 evaluator = PropertyMappingEvaluator( 50 mapping, **{key: None for key in self.context_keys} 51 ) 52 evaluator._globals.update(self.globals) 53 # Compile and cache expression 54 evaluator.compile() 55 self._evaluators.append(evaluator)
def
iter_eval( self, user: authentik.core.models.User | None, request: django.http.request.HttpRequest | None, return_mapping: bool = False, **kwargs) -> Generator[tuple[dict, authentik.core.models.PropertyMapping]]:
57 def iter_eval( 58 self, 59 user: User | None, 60 request: HttpRequest | None, 61 return_mapping: bool = False, 62 **kwargs, 63 ) -> Generator[tuple[dict, PropertyMapping]]: 64 """Iterate over all mappings that were pre-compiled and 65 execute all of them with the given context""" 66 if not self.__has_compiled: 67 self.compile() 68 self.__has_compiled = True 69 for mapping in self._evaluators: 70 mapping.set_context(user, request, **kwargs) 71 try: 72 value = mapping.evaluate(mapping.model.expression) 73 except (PropertyMappingExpressionException, ControlFlowException) as exc: 74 raise exc from exc 75 except Exception as exc: 76 raise PropertyMappingExpressionException(exc, mapping.model) from exc 77 if value is None: 78 continue 79 if return_mapping: 80 yield value, mapping.model 81 else: 82 yield value
Iterate over all mappings that were pre-compiled and execute all of them with the given context