authentik.core.expression.evaluator
Property Mapping Evaluator
1"""Property Mapping Evaluator""" 2 3from types import CodeType 4from typing import Any 5 6from django.db.models import Model 7from django.http import HttpRequest 8from prometheus_client import Histogram 9 10from authentik.core.expression.exceptions import SkipObjectException 11from authentik.core.models import User 12from authentik.events.models import Event, EventAction 13from authentik.lib.expression.evaluator import BaseEvaluator 14from authentik.policies.types import PolicyRequest 15 16PROPERTY_MAPPING_TIME = Histogram( 17 "authentik_property_mapping_execution_time", 18 "Evaluation time of property mappings", 19 ["mapping_name"], 20) 21 22 23class PropertyMappingEvaluator(BaseEvaluator): 24 """Custom Evaluator that adds some different context variables.""" 25 26 dry_run: bool 27 model: Model 28 _compiled: CodeType | None = None 29 30 def __init__( 31 self, 32 model: Model, 33 user: User | None = None, 34 request: HttpRequest | None = None, 35 dry_run: bool | None = False, 36 **kwargs, 37 ): 38 self.model = model 39 if hasattr(model, "name"): 40 _filename = model.name 41 else: 42 _filename = str(model) 43 super().__init__(filename=_filename) 44 self.dry_run = dry_run 45 self.set_context(user, request, **kwargs) 46 47 def set_context( 48 self, 49 user: User | None = None, 50 request: HttpRequest | None = None, 51 **kwargs, 52 ): 53 req = PolicyRequest(user=User()) 54 req.obj = self.model 55 if user: 56 req.user = user 57 self._context["user"] = user 58 if request: 59 req.http_request = request 60 self._context["http_request"] = request 61 req.context.update(**kwargs) 62 self._context["request"] = req 63 self._context.update(**kwargs) 64 self._globals["SkipObject"] = SkipObjectException 65 66 def handle_error(self, exc: Exception, expression_source: str): 67 """Exception Handler""" 68 # For dry-run requests we don't save exceptions 69 if self.dry_run: 70 return 71 event = Event.new( 72 EventAction.PROPERTY_MAPPING_EXCEPTION, 73 expression=expression_source, 74 message="Failed to execute property mapping", 75 ).with_exception(exc) 76 if "request" in self._context: 77 req: PolicyRequest = self._context["request"] 78 if req.http_request: 79 event.from_http(req.http_request, req.user) 80 return 81 elif req.user: 82 event.set_user(req.user) 83 event.save() 84 85 def evaluate(self, *args, **kwargs) -> Any: 86 with PROPERTY_MAPPING_TIME.labels(mapping_name=self._filename).time(): 87 return super().evaluate(*args, **kwargs) 88 89 def compile(self, expression: str | None = None) -> Any: 90 if not self._compiled: 91 compiled = super().compile(expression or self.model.expression) 92 self._compiled = compiled 93 return self._compiled
PROPERTY_MAPPING_TIME =
prometheus_client.metrics.Histogram(authentik_property_mapping_execution_time)
24class PropertyMappingEvaluator(BaseEvaluator): 25 """Custom Evaluator that adds some different context variables.""" 26 27 dry_run: bool 28 model: Model 29 _compiled: CodeType | None = None 30 31 def __init__( 32 self, 33 model: Model, 34 user: User | None = None, 35 request: HttpRequest | None = None, 36 dry_run: bool | None = False, 37 **kwargs, 38 ): 39 self.model = model 40 if hasattr(model, "name"): 41 _filename = model.name 42 else: 43 _filename = str(model) 44 super().__init__(filename=_filename) 45 self.dry_run = dry_run 46 self.set_context(user, request, **kwargs) 47 48 def set_context( 49 self, 50 user: User | None = None, 51 request: HttpRequest | None = None, 52 **kwargs, 53 ): 54 req = PolicyRequest(user=User()) 55 req.obj = self.model 56 if user: 57 req.user = user 58 self._context["user"] = user 59 if request: 60 req.http_request = request 61 self._context["http_request"] = request 62 req.context.update(**kwargs) 63 self._context["request"] = req 64 self._context.update(**kwargs) 65 self._globals["SkipObject"] = SkipObjectException 66 67 def handle_error(self, exc: Exception, expression_source: str): 68 """Exception Handler""" 69 # For dry-run requests we don't save exceptions 70 if self.dry_run: 71 return 72 event = Event.new( 73 EventAction.PROPERTY_MAPPING_EXCEPTION, 74 expression=expression_source, 75 message="Failed to execute property mapping", 76 ).with_exception(exc) 77 if "request" in self._context: 78 req: PolicyRequest = self._context["request"] 79 if req.http_request: 80 event.from_http(req.http_request, req.user) 81 return 82 elif req.user: 83 event.set_user(req.user) 84 event.save() 85 86 def evaluate(self, *args, **kwargs) -> Any: 87 with PROPERTY_MAPPING_TIME.labels(mapping_name=self._filename).time(): 88 return super().evaluate(*args, **kwargs) 89 90 def compile(self, expression: str | None = None) -> Any: 91 if not self._compiled: 92 compiled = super().compile(expression or self.model.expression) 93 self._compiled = compiled 94 return self._compiled
Custom Evaluator that adds some different context variables.
PropertyMappingEvaluator( model: django.db.models.base.Model, user: authentik.core.models.User | None = None, request: django.http.request.HttpRequest | None = None, dry_run: bool | None = False, **kwargs)
31 def __init__( 32 self, 33 model: Model, 34 user: User | None = None, 35 request: HttpRequest | None = None, 36 dry_run: bool | None = False, 37 **kwargs, 38 ): 39 self.model = model 40 if hasattr(model, "name"): 41 _filename = model.name 42 else: 43 _filename = str(model) 44 super().__init__(filename=_filename) 45 self.dry_run = dry_run 46 self.set_context(user, request, **kwargs)
def
set_context( self, user: authentik.core.models.User | None = None, request: django.http.request.HttpRequest | None = None, **kwargs):
48 def set_context( 49 self, 50 user: User | None = None, 51 request: HttpRequest | None = None, 52 **kwargs, 53 ): 54 req = PolicyRequest(user=User()) 55 req.obj = self.model 56 if user: 57 req.user = user 58 self._context["user"] = user 59 if request: 60 req.http_request = request 61 self._context["http_request"] = request 62 req.context.update(**kwargs) 63 self._context["request"] = req 64 self._context.update(**kwargs) 65 self._globals["SkipObject"] = SkipObjectException
def
handle_error(self, exc: Exception, expression_source: str):
67 def handle_error(self, exc: Exception, expression_source: str): 68 """Exception Handler""" 69 # For dry-run requests we don't save exceptions 70 if self.dry_run: 71 return 72 event = Event.new( 73 EventAction.PROPERTY_MAPPING_EXCEPTION, 74 expression=expression_source, 75 message="Failed to execute property mapping", 76 ).with_exception(exc) 77 if "request" in self._context: 78 req: PolicyRequest = self._context["request"] 79 if req.http_request: 80 event.from_http(req.http_request, req.user) 81 return 82 elif req.user: 83 event.set_user(req.user) 84 event.save()
Exception Handler
def
evaluate(self, *args, **kwargs) -> Any:
86 def evaluate(self, *args, **kwargs) -> Any: 87 with PROPERTY_MAPPING_TIME.labels(mapping_name=self._filename).time(): 88 return super().evaluate(*args, **kwargs)
Parse and evaluate expression. If the syntax is incorrect, a SyntaxError is raised. If any exception is raised during execution, it is raised. The result is returned without any type-checking.
def
compile(self, expression: str | None = None) -> Any:
90 def compile(self, expression: str | None = None) -> Any: 91 if not self._compiled: 92 compiled = super().compile(expression or self.model.expression) 93 self._compiled = compiled 94 return self._compiled
Parse expression. Raises SyntaxError or ValueError if the syntax is incorrect.
Inherited Members
- authentik.lib.expression.evaluator.BaseEvaluator
- expr_resolve_dns
- expr_reverse_dns
- expr_flatten
- expr_regex_match
- expr_regex_replace
- expr_is_group_member
- expr_user_by
- expr_func_user_has_authenticator
- expr_event_create
- expr_func_call_policy
- expr_create_jwt
- expr_create_jwt_raw
- expr_send_email
- wrap_expression
- validate