authentik.policies.expression.evaluator

authentik expression policy evaluator

 1"""authentik expression policy evaluator"""
 2
 3from ipaddress import ip_address
 4from typing import TYPE_CHECKING
 5
 6from django.http import HttpRequest
 7from structlog.stdlib import get_logger
 8
 9from authentik.events.models import Event
10from authentik.flows.planner import PLAN_CONTEXT_SSO
11from authentik.lib.expression.evaluator import BaseEvaluator
12from authentik.policies.exceptions import PolicyException
13from authentik.policies.types import PolicyRequest, PolicyResult
14from authentik.root.middleware import ClientIPMiddleware
15
16LOGGER = get_logger()
17if TYPE_CHECKING:
18    from authentik.policies.expression.models import ExpressionPolicy
19
20
21class PolicyEvaluator(BaseEvaluator):
22    """Validate and evaluate python-based expressions"""
23
24    _messages: list[str]
25
26    policy: ExpressionPolicy | None = None
27
28    def __init__(self, policy_name: str | None = None):
29        super().__init__(policy_name or "PolicyEvaluator")
30        self._messages = []
31        # update website/docs/expressions/_objects.md
32        # update website/docs/expressions/_functions.md
33        self._context["ak_message"] = self.expr_func_message
34        self._context["ak_user_has_authenticator"] = self.expr_func_user_has_authenticator
35
36    def expr_func_message(self, message: str):
37        """Wrapper to append to messages list, which is returned with PolicyResult"""
38        self._messages.append(message)
39
40    def set_policy_request(self, request: PolicyRequest):
41        """Update context based on policy request (if http request is given, update that too)"""
42        # update website/docs/expressions/_objects.md
43        # update website/docs/expressions/_functions.md
44        self._context["ak_is_sso_flow"] = request.context.get(PLAN_CONTEXT_SSO, False)
45        if request.http_request:
46            self.set_http_request(request.http_request)
47        self._context["request"] = request
48        self._context["context"] = request.context
49        if request.obj and isinstance(request.obj, Event):
50            self._context["ak_client_ip"] = ip_address(
51                request.obj.client_ip or ClientIPMiddleware.default_ip
52            )
53
54    def set_http_request(self, request: HttpRequest):
55        """Update context based on http request"""
56        # update website/docs/expressions/_objects.md
57        # update website/docs/expressions/_functions.md
58        self._context["ak_client_ip"] = ip_address(ClientIPMiddleware.get_client_ip(request))
59        self._context["http_request"] = request
60
61    def handle_error(self, exc: Exception, expression_source: str):
62        """Exception Handler"""
63        raise PolicyException(exc)
64
65    def evaluate(self, expression_source: str) -> PolicyResult:
66        """Parse and evaluate expression. Policy is expected to return a truthy object.
67        Messages can be added using 'do ak_message()'."""
68        try:
69            result = super().evaluate(expression_source)
70        except PolicyException as exc:
71            # PolicyExceptions should be propagated back to the process,
72            # which handles recording and returning a correct result
73            raise exc
74        except Exception as exc:  # noqa
75            LOGGER.warning("Expression error", exc=exc)
76            return PolicyResult(False, str(exc))
77        else:
78            policy_result = PolicyResult(False, *self._messages)
79            policy_result.raw_result = result
80            if result is None:
81                LOGGER.warning(
82                    "Expression policy returned None",
83                    src=expression_source,
84                    policy=self._filename,
85                )
86                policy_result.passing = False
87            if result:
88                policy_result.passing = bool(result)
89            return policy_result
LOGGER = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
class PolicyEvaluator(authentik.lib.expression.evaluator.BaseEvaluator):
22class PolicyEvaluator(BaseEvaluator):
23    """Validate and evaluate python-based expressions"""
24
25    _messages: list[str]
26
27    policy: ExpressionPolicy | None = None
28
29    def __init__(self, policy_name: str | None = None):
30        super().__init__(policy_name or "PolicyEvaluator")
31        self._messages = []
32        # update website/docs/expressions/_objects.md
33        # update website/docs/expressions/_functions.md
34        self._context["ak_message"] = self.expr_func_message
35        self._context["ak_user_has_authenticator"] = self.expr_func_user_has_authenticator
36
37    def expr_func_message(self, message: str):
38        """Wrapper to append to messages list, which is returned with PolicyResult"""
39        self._messages.append(message)
40
41    def set_policy_request(self, request: PolicyRequest):
42        """Update context based on policy request (if http request is given, update that too)"""
43        # update website/docs/expressions/_objects.md
44        # update website/docs/expressions/_functions.md
45        self._context["ak_is_sso_flow"] = request.context.get(PLAN_CONTEXT_SSO, False)
46        if request.http_request:
47            self.set_http_request(request.http_request)
48        self._context["request"] = request
49        self._context["context"] = request.context
50        if request.obj and isinstance(request.obj, Event):
51            self._context["ak_client_ip"] = ip_address(
52                request.obj.client_ip or ClientIPMiddleware.default_ip
53            )
54
55    def set_http_request(self, request: HttpRequest):
56        """Update context based on http request"""
57        # update website/docs/expressions/_objects.md
58        # update website/docs/expressions/_functions.md
59        self._context["ak_client_ip"] = ip_address(ClientIPMiddleware.get_client_ip(request))
60        self._context["http_request"] = request
61
62    def handle_error(self, exc: Exception, expression_source: str):
63        """Exception Handler"""
64        raise PolicyException(exc)
65
66    def evaluate(self, expression_source: str) -> PolicyResult:
67        """Parse and evaluate expression. Policy is expected to return a truthy object.
68        Messages can be added using 'do ak_message()'."""
69        try:
70            result = super().evaluate(expression_source)
71        except PolicyException as exc:
72            # PolicyExceptions should be propagated back to the process,
73            # which handles recording and returning a correct result
74            raise exc
75        except Exception as exc:  # noqa
76            LOGGER.warning("Expression error", exc=exc)
77            return PolicyResult(False, str(exc))
78        else:
79            policy_result = PolicyResult(False, *self._messages)
80            policy_result.raw_result = result
81            if result is None:
82                LOGGER.warning(
83                    "Expression policy returned None",
84                    src=expression_source,
85                    policy=self._filename,
86                )
87                policy_result.passing = False
88            if result:
89                policy_result.passing = bool(result)
90            return policy_result

Validate and evaluate python-based expressions

PolicyEvaluator(policy_name: str | None = None)
29    def __init__(self, policy_name: str | None = None):
30        super().__init__(policy_name or "PolicyEvaluator")
31        self._messages = []
32        # update website/docs/expressions/_objects.md
33        # update website/docs/expressions/_functions.md
34        self._context["ak_message"] = self.expr_func_message
35        self._context["ak_user_has_authenticator"] = self.expr_func_user_has_authenticator
def expr_func_message(self, message: str):
37    def expr_func_message(self, message: str):
38        """Wrapper to append to messages list, which is returned with PolicyResult"""
39        self._messages.append(message)

Wrapper to append to messages list, which is returned with PolicyResult

def set_policy_request(self, request: authentik.policies.types.PolicyRequest):
41    def set_policy_request(self, request: PolicyRequest):
42        """Update context based on policy request (if http request is given, update that too)"""
43        # update website/docs/expressions/_objects.md
44        # update website/docs/expressions/_functions.md
45        self._context["ak_is_sso_flow"] = request.context.get(PLAN_CONTEXT_SSO, False)
46        if request.http_request:
47            self.set_http_request(request.http_request)
48        self._context["request"] = request
49        self._context["context"] = request.context
50        if request.obj and isinstance(request.obj, Event):
51            self._context["ak_client_ip"] = ip_address(
52                request.obj.client_ip or ClientIPMiddleware.default_ip
53            )

Update context based on policy request (if http request is given, update that too)

def set_http_request(self, request: django.http.request.HttpRequest):
55    def set_http_request(self, request: HttpRequest):
56        """Update context based on http request"""
57        # update website/docs/expressions/_objects.md
58        # update website/docs/expressions/_functions.md
59        self._context["ak_client_ip"] = ip_address(ClientIPMiddleware.get_client_ip(request))
60        self._context["http_request"] = request

Update context based on http request

def handle_error(self, exc: Exception, expression_source: str):
62    def handle_error(self, exc: Exception, expression_source: str):
63        """Exception Handler"""
64        raise PolicyException(exc)

Exception Handler

def evaluate(self, expression_source: str) -> authentik.policies.types.PolicyResult:
66    def evaluate(self, expression_source: str) -> PolicyResult:
67        """Parse and evaluate expression. Policy is expected to return a truthy object.
68        Messages can be added using 'do ak_message()'."""
69        try:
70            result = super().evaluate(expression_source)
71        except PolicyException as exc:
72            # PolicyExceptions should be propagated back to the process,
73            # which handles recording and returning a correct result
74            raise exc
75        except Exception as exc:  # noqa
76            LOGGER.warning("Expression error", exc=exc)
77            return PolicyResult(False, str(exc))
78        else:
79            policy_result = PolicyResult(False, *self._messages)
80            policy_result.raw_result = result
81            if result is None:
82                LOGGER.warning(
83                    "Expression policy returned None",
84                    src=expression_source,
85                    policy=self._filename,
86                )
87                policy_result.passing = False
88            if result:
89                policy_result.passing = bool(result)
90            return policy_result

Parse and evaluate expression. Policy is expected to return a truthy object. Messages can be added using 'do ak_message()'.