authentik.policies.expression.evaluator
authentik expression policy evaluator
1"""authentik expression policy evaluator""" 2 3from ipaddress import ip_address 4from typing import TYPE_CHECKING 5 6from django.http import HttpRequest 7from structlog.stdlib import get_logger 8 9from authentik.events.models import Event 10from authentik.flows.planner import PLAN_CONTEXT_SSO 11from authentik.lib.expression.evaluator import BaseEvaluator 12from authentik.policies.exceptions import PolicyException 13from authentik.policies.types import PolicyRequest, PolicyResult 14from authentik.root.middleware import ClientIPMiddleware 15 16LOGGER = get_logger() 17if TYPE_CHECKING: 18 from authentik.policies.expression.models import ExpressionPolicy 19 20 21class PolicyEvaluator(BaseEvaluator): 22 """Validate and evaluate python-based expressions""" 23 24 _messages: list[str] 25 26 policy: ExpressionPolicy | None = None 27 28 def __init__(self, policy_name: str | None = None): 29 super().__init__(policy_name or "PolicyEvaluator") 30 self._messages = [] 31 # update website/docs/expressions/_objects.md 32 # update website/docs/expressions/_functions.md 33 self._context["ak_message"] = self.expr_func_message 34 self._context["ak_user_has_authenticator"] = self.expr_func_user_has_authenticator 35 36 def expr_func_message(self, message: str): 37 """Wrapper to append to messages list, which is returned with PolicyResult""" 38 self._messages.append(message) 39 40 def set_policy_request(self, request: PolicyRequest): 41 """Update context based on policy request (if http request is given, update that too)""" 42 # update website/docs/expressions/_objects.md 43 # update website/docs/expressions/_functions.md 44 self._context["ak_is_sso_flow"] = request.context.get(PLAN_CONTEXT_SSO, False) 45 if request.http_request: 46 self.set_http_request(request.http_request) 47 self._context["request"] = request 48 self._context["context"] = request.context 49 if request.obj and isinstance(request.obj, Event): 50 self._context["ak_client_ip"] = ip_address( 51 request.obj.client_ip or ClientIPMiddleware.default_ip 52 ) 53 54 def set_http_request(self, request: HttpRequest): 55 """Update context based on http request""" 56 # update website/docs/expressions/_objects.md 57 # update website/docs/expressions/_functions.md 58 self._context["ak_client_ip"] = ip_address(ClientIPMiddleware.get_client_ip(request)) 59 self._context["http_request"] = request 60 61 def handle_error(self, exc: Exception, expression_source: str): 62 """Exception Handler""" 63 raise PolicyException(exc) 64 65 def evaluate(self, expression_source: str) -> PolicyResult: 66 """Parse and evaluate expression. Policy is expected to return a truthy object. 67 Messages can be added using 'do ak_message()'.""" 68 try: 69 result = super().evaluate(expression_source) 70 except PolicyException as exc: 71 # PolicyExceptions should be propagated back to the process, 72 # which handles recording and returning a correct result 73 raise exc 74 except Exception as exc: # noqa 75 LOGGER.warning("Expression error", exc=exc) 76 return PolicyResult(False, str(exc)) 77 else: 78 policy_result = PolicyResult(False, *self._messages) 79 policy_result.raw_result = result 80 if result is None: 81 LOGGER.warning( 82 "Expression policy returned None", 83 src=expression_source, 84 policy=self._filename, 85 ) 86 policy_result.passing = False 87 if result: 88 policy_result.passing = bool(result) 89 return policy_result
LOGGER =
<BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
22class PolicyEvaluator(BaseEvaluator): 23 """Validate and evaluate python-based expressions""" 24 25 _messages: list[str] 26 27 policy: ExpressionPolicy | None = None 28 29 def __init__(self, policy_name: str | None = None): 30 super().__init__(policy_name or "PolicyEvaluator") 31 self._messages = [] 32 # update website/docs/expressions/_objects.md 33 # update website/docs/expressions/_functions.md 34 self._context["ak_message"] = self.expr_func_message 35 self._context["ak_user_has_authenticator"] = self.expr_func_user_has_authenticator 36 37 def expr_func_message(self, message: str): 38 """Wrapper to append to messages list, which is returned with PolicyResult""" 39 self._messages.append(message) 40 41 def set_policy_request(self, request: PolicyRequest): 42 """Update context based on policy request (if http request is given, update that too)""" 43 # update website/docs/expressions/_objects.md 44 # update website/docs/expressions/_functions.md 45 self._context["ak_is_sso_flow"] = request.context.get(PLAN_CONTEXT_SSO, False) 46 if request.http_request: 47 self.set_http_request(request.http_request) 48 self._context["request"] = request 49 self._context["context"] = request.context 50 if request.obj and isinstance(request.obj, Event): 51 self._context["ak_client_ip"] = ip_address( 52 request.obj.client_ip or ClientIPMiddleware.default_ip 53 ) 54 55 def set_http_request(self, request: HttpRequest): 56 """Update context based on http request""" 57 # update website/docs/expressions/_objects.md 58 # update website/docs/expressions/_functions.md 59 self._context["ak_client_ip"] = ip_address(ClientIPMiddleware.get_client_ip(request)) 60 self._context["http_request"] = request 61 62 def handle_error(self, exc: Exception, expression_source: str): 63 """Exception Handler""" 64 raise PolicyException(exc) 65 66 def evaluate(self, expression_source: str) -> PolicyResult: 67 """Parse and evaluate expression. Policy is expected to return a truthy object. 68 Messages can be added using 'do ak_message()'.""" 69 try: 70 result = super().evaluate(expression_source) 71 except PolicyException as exc: 72 # PolicyExceptions should be propagated back to the process, 73 # which handles recording and returning a correct result 74 raise exc 75 except Exception as exc: # noqa 76 LOGGER.warning("Expression error", exc=exc) 77 return PolicyResult(False, str(exc)) 78 else: 79 policy_result = PolicyResult(False, *self._messages) 80 policy_result.raw_result = result 81 if result is None: 82 LOGGER.warning( 83 "Expression policy returned None", 84 src=expression_source, 85 policy=self._filename, 86 ) 87 policy_result.passing = False 88 if result: 89 policy_result.passing = bool(result) 90 return policy_result
Validate and evaluate python-based expressions
PolicyEvaluator(policy_name: str | None = None)
29 def __init__(self, policy_name: str | None = None): 30 super().__init__(policy_name or "PolicyEvaluator") 31 self._messages = [] 32 # update website/docs/expressions/_objects.md 33 # update website/docs/expressions/_functions.md 34 self._context["ak_message"] = self.expr_func_message 35 self._context["ak_user_has_authenticator"] = self.expr_func_user_has_authenticator
def
expr_func_message(self, message: str):
37 def expr_func_message(self, message: str): 38 """Wrapper to append to messages list, which is returned with PolicyResult""" 39 self._messages.append(message)
Wrapper to append to messages list, which is returned with PolicyResult
41 def set_policy_request(self, request: PolicyRequest): 42 """Update context based on policy request (if http request is given, update that too)""" 43 # update website/docs/expressions/_objects.md 44 # update website/docs/expressions/_functions.md 45 self._context["ak_is_sso_flow"] = request.context.get(PLAN_CONTEXT_SSO, False) 46 if request.http_request: 47 self.set_http_request(request.http_request) 48 self._context["request"] = request 49 self._context["context"] = request.context 50 if request.obj and isinstance(request.obj, Event): 51 self._context["ak_client_ip"] = ip_address( 52 request.obj.client_ip or ClientIPMiddleware.default_ip 53 )
Update context based on policy request (if http request is given, update that too)
def
set_http_request(self, request: django.http.request.HttpRequest):
55 def set_http_request(self, request: HttpRequest): 56 """Update context based on http request""" 57 # update website/docs/expressions/_objects.md 58 # update website/docs/expressions/_functions.md 59 self._context["ak_client_ip"] = ip_address(ClientIPMiddleware.get_client_ip(request)) 60 self._context["http_request"] = request
Update context based on http request
def
handle_error(self, exc: Exception, expression_source: str):
62 def handle_error(self, exc: Exception, expression_source: str): 63 """Exception Handler""" 64 raise PolicyException(exc)
Exception Handler
66 def evaluate(self, expression_source: str) -> PolicyResult: 67 """Parse and evaluate expression. Policy is expected to return a truthy object. 68 Messages can be added using 'do ak_message()'.""" 69 try: 70 result = super().evaluate(expression_source) 71 except PolicyException as exc: 72 # PolicyExceptions should be propagated back to the process, 73 # which handles recording and returning a correct result 74 raise exc 75 except Exception as exc: # noqa 76 LOGGER.warning("Expression error", exc=exc) 77 return PolicyResult(False, str(exc)) 78 else: 79 policy_result = PolicyResult(False, *self._messages) 80 policy_result.raw_result = result 81 if result is None: 82 LOGGER.warning( 83 "Expression policy returned None", 84 src=expression_source, 85 policy=self._filename, 86 ) 87 policy_result.passing = False 88 if result: 89 policy_result.passing = bool(result) 90 return policy_result
Parse and evaluate expression. Policy is expected to return a truthy object. Messages can be added using 'do ak_message()'.
Inherited Members
- authentik.lib.expression.evaluator.BaseEvaluator
- expr_resolve_dns
- expr_reverse_dns
- expr_flatten
- expr_regex_match
- expr_regex_replace
- expr_is_group_member
- expr_user_by
- expr_func_user_has_authenticator
- expr_event_create
- expr_func_call_policy
- expr_create_jwt
- expr_create_jwt_raw
- expr_send_email
- wrap_expression
- compile
- validate